#!/usr/bin/env -S python -u import argparse import os import subprocess import json import re import httpx import sys # arguments # activation_key, file name, codec(default to mp3) # Globals _act_byte = "" _metadata = "" mode = "" stats = "" path = "" def get_args(): parser = argparse.ArgumentParser() parser.add_argument("-s", "--single", help="Use this option to create a\ single file. This is false by default", action="store_true") parser.add_argument("-d", "--dpath", help="Use this to set the destination\ path. Otherwise I will use the current directory") parser.add_argument("-v", "--verbose", help="Send output to stdout", action="store_true") parser.add_argument("filename", help="Filename to convert, or directory to\ look in") parser.add_argument("-a", "--actvation_bytes", help='The activate bytes\ provided by audible') args = parser.parse_args() if args.dpath: path = args.dpath else: path = os.getcwd() if args.single: mode = 'single' else: mode = 'chapter' if args.verbose: stats = "-stats" else: stats = "-nostats" return args def sendtodiscord(message, audiofile): ''' Send a message to the discord group Arg: message (str): Message to send. audiofile (str): File name that was processed Ret: Boolean ''' webhookurl = "https://discord.com/api/webhooks/764667082272145418/vorf2JdFG47WAmQP3yZhgHH12wW_qUXG0bS0SG8INLYVwU0HcDFajq9doaDgi_hnI00-" # noqa E501 data = { "content": f"{message}, {audiofile}", "username": "Captain Audio", } with httpx.Client() as r: resp = r.post(webhookurl, json=data) if resp.status_code == 204: return True else: return False def get_metadata(aaxfile): # Returns the _metadata from an aax file try: ret = subprocess.run(["ffprobe", "-v", "info", "-hide_banner", "-show_format", "-show_chapters", "-print_format", "json", os.path.abspath(aaxfile)], capture_output=True) if ret.returncode == 0: mdata = json.loads(ret.stdout) aret = ret.stderr.decode().split('\n')[0] print(aret) mdata["checksum"] = aret.split()[-1] return mdata else: sys.stderr.write(ret.stderr) return "None" except Exception as err: sys.stderr.write("Error processing _metadata: {}\n".format(err)) return "None" def get_bitrate(): # Return the bitrate of the media bit_rate = _metadata['format']['bit_rate'] return bit_rate[:2] def get_copyright(): # Return normalized copyright data copyright = normalize_data(_metadata['format']['tags']['copyright']) return copyright def get_metadata_tags(key): # get specific data tag = normalize_data(_metadata['format']['tags'][key]) return " ".join(tag.split()) def normalize_data(data): # Return a normalized title data = data.replace("'", "").replace('(', '').replace(')', '').replace(':', '').replace('/', '-').replace('\\', '-').replace('"', '').replace('?', '').replace('!', '').strip() # pattern = re.compile('[^\p{Latin}]', data) return data def reencode(aaxfile: str, outpath: str, act_byte: str = None, key_iv: dict = None ): # decrypt and reencode to mp3 if get_metadata_tags('major_brand') == 'aaxc': decrypt = f"-audible_key {key_iv['key']} -audible_iv {key_iv['iv']}" else: decrypt = f"-activation_bytes = {act_byte}" command = ( "ffmpeg -loglevel error {} {} -i" " {} -vn -codec:a libmp3lame -qscale:a 2 -map_metadata -1" " -metadata \"title={}\" -metadata 'artist={}' -metadata" " 'album_artist={}' -metadata \"album={}\" -metadata 'date={}' " " -metadata track=1/1 -metadata 'genre={}' -metadata " "'copyright={}' \"{}\" ").format( stats, decrypt, aaxfile, get_metadata_tags('title'), get_metadata_tags('artist'), get_metadata_tags('album_artist'), get_metadata_tags('album'), get_metadata_tags('date'), get_metadata_tags('genre'), get_copyright(), outpath) process = subprocess.run(command, shell=True) def getchaptercount(): # Get the number of chapters ccount = _metadata['chapters'] return len(ccount) def get_chapter_metadata(cid, key): # get the Chapter _metadata for i in _metadata['chapters']: if i['id'] == cid: return i[key] def movetochapters(path, outpath, chapter, title, start, end): # Creating individual chapters outfile = "{}/Ch-{} {}.mp3".format( outpath, chapter, title) command = "ffmpeg -loglevel error {} -i \"{}\" -ss {} -to {} -codec:a copy -metadata 'track={}' \"{}\"".format(stats, path, start, end, chapter, outfile) subprocess.run(command, shell=True) return def getcoverart(path, outpath): # Pull the coverart from the file command = "ffmpeg -loglevel error -activation_bytes {} -i \"{}\" -an -codec:v copy \"{}/cover.jpg\"".format(act_byte, path, outpath) subprocess.run(command, shell=True) def convert_aax( rfile: str, activation_bytes: str = None, key_iv: dict = None ) -> bool: ''' Do the work of convering the file to mp3. Args: rfile (str): The file to convert activation_bytes Optional(str): The unique activation bytes from audible. # noqa E501 Ret: Boolean if successful ''' chapter = 0 global _metadata _metadata = get_metadata(rfile) global act_byte act_byte = activation_bytes if _metadata == "None": sendtodiscord("There was a problem processing ", rfile) else: # artist = normalize_data(_metadata['format']['tags']['artist']) # title = normalize_data(_metadata['format']['tags']['title']) artist = normalize_data(_metadata['format']['tags']['artist']) title = normalize_data(_metadata['format']['tags']['title']) if activation_bytes is None or activation_bytes == '': sendtodiscord("We couldn't get the activation bytes", rfile) return False else: dest_dir = f"/workspaces/audiobooks/{artist}/{title}" processing_file = f"/tmp/{title}.mp3" if not os.path.exists(dest_dir): os.makedirs(dest_dir) reencode(rfile, processing_file, activation_bytes, key_iv) print(f"Splitting {title} into chapters") numchapters = len(_metadata['chapters']) while (numchapters > 0): cstart = get_chapter_metadata(chapter, 'start_time') cend = get_chapter_metadata(chapter, 'end_time') chapter += 1 numchapters -= 1 schap = str(chapter).zfill(2) movetochapters(processing_file, dest_dir, schap, title, cstart, cend) os.remove(processing_file) getcoverart(rfile, dest_dir) sendtodiscord(f"We have processed the book {title} ", rfile) return True def main(): ''' Main excution ''' args = get_args() rfile = args.filename _metadata = get_metadata(rfile) if _metadata == "None": sendtodiscord("There was a problem processing ", rfile) else: artist = normalize_data(get_metadata_tags('artist')) title = normalize_data(get_metadata_tags('title')) act_byte = args.activation_bytes if act_byte is None or act_byte == '': sendtodiscord("We couldn't get the activation bytes", rfile) sys.exit(f"We couldn't get the activation bytes {rfile}") else: ddir = "%s/%s/%s" % (path, artist, title) single_file_path = "/processing/%s.mp3" % (title) if not os.path.exists(ddir): os.makedirs(ddir) print(ddir) reencode(rfile, single_file_path) if mode == 'chapter': chapter = 0 numchapters = getchaptercount() while (numchapters > 0): cstart = get_chapter_metadata(chapter, 'start_time') cend = get_chapter_metadata(chapter, 'end_time') chapter += 1 numchapters -= 1 schap = str(chapter).zfill(2) movetochapters(single_file_path, ddir, schap, title, cstart, cend) os.remove(single_file_path) getcoverart(rfile, ddir) sendtodiscord(f"We have added the book {title} ", rfile) if __name__ == "__main__": main()