Chris Pomeroy
2026-01-17 ee9deff6fecf1354ea3e529dda3b7b850a81050f
aaxConvert.py
@@ -1,33 +1,275 @@
#!/usr/bin/python
import argparse
import os
import glob
import subprocess
import json
# arguments
# activation_key, file name, codec(default to mp3)
parser = argparse.ArgumentParser()
parser.add_argument("--code", help="Activation Code from audible to decrypt files")
parser.add_argument("filename", help="Filename to convert")
args = parser.parse_args()
act_byte = ""
if args.code:
    act_byte = args.code
elif os.path.isfile('./.authcode'):
    act_byte = open('./.authcode').readline()
if act_byte == "":
    print "Please provide an activation code from audible. Either at the command line or the .authcode file"
else:
    print act_byte
for rfile in glob.glob(args.filename):
    if rfile.find("aax") != -1 and os.path.isfile(rfile):
    #print os.path.abspath(rfile)
        ret = subprocess.check_output(["ffprobe", "-v", "quiet", "-hide_banner", "-show_format", "-show_chapters", "-print_format", "json", os.path.abspath(rfile)])
        metadata = json.load(ret)
        print ret
#!/usr/bin/env -S python -u
import argparse
import os
import subprocess
import json
import re
import httpx
import sys
# arguments
# activation_key, file name, codec(default to mp3)
# Globals
_act_byte = ""
_metadata = ""
mode = ""
stats = ""
path = ""
def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-s", "--single", help="Use this option to create a\
                        single file. This is false by default",
                        action="store_true")
    parser.add_argument("-d", "--dpath", help="Use this to set the destination\
                        path. Otherwise I will use the current directory")
    parser.add_argument("-v", "--verbose", help="Send output to stdout",
                        action="store_true")
    parser.add_argument("filename", help="Filename to convert, or directory to\
                        look in")
    parser.add_argument("-a", "--actvation_bytes", help='The activate bytes\
                        provided by audible')
    args = parser.parse_args()
    if args.dpath:
        path = args.dpath
    else:
        path = os.getcwd()
    if args.single:
        mode = 'single'
    else:
        mode = 'chapter'
    if args.verbose:
        stats = "-stats"
    else:
        stats = "-nostats"
    return args
def sendtodiscord(message, audiofile):
    '''
    Send a message to the discord group
    Arg:
        message (str):  Message to send.
        audiofile (str): File name that was processed
    Ret:
        Boolean
    '''
    webhookurl = "https://discord.com/api/webhooks/764667082272145418/vorf2JdFG47WAmQP3yZhgHH12wW_qUXG0bS0SG8INLYVwU0HcDFajq9doaDgi_hnI00-"  # noqa E501
    data = {
        "content": f"{message}, {audiofile}",
        "username": "Captain Audio",
        }
    with httpx.Client() as r:
        resp = r.post(webhookurl, json=data)
        if resp.status_code == 204:
            return True
        else:
            return False
def get_metadata(aaxfile):
    # Returns the _metadata from an aax file
    try:
        ret = subprocess.run(["ffprobe", "-v", "info", "-hide_banner",
                              "-show_format", "-show_chapters",
                              "-print_format", "json",
                              os.path.abspath(aaxfile)], capture_output=True)
        if ret.returncode == 0:
            mdata = json.loads(ret.stdout)
            aret = ret.stderr.decode().split('\n')[0]
            print(aret)
            mdata["checksum"] = aret.split()[-1]
            return mdata
        else:
            sys.stderr.write(ret.stderr)
            return "None"
    except Exception as err:
        sys.stderr.write("Error processing _metadata: {}\n".format(err))
        return "None"
def get_bitrate():
    # Return the bitrate of the media
    bit_rate = _metadata['format']['bit_rate']
    return bit_rate[:2]
def get_copyright():
    # Return normalized copyright data
    copyright = normalize_data(_metadata['format']['tags']['copyright'])
    return copyright
def get_metadata_tags(key):
    # get specific data
    tag = normalize_data(_metadata['format']['tags'][key])
    return " ".join(tag.split())
def normalize_data(data):
    # Return a normalized title
    data = data.replace("'", "").replace('(', '').replace(')', '').replace(':', '').replace('/', '-').replace('\\', '-').replace('"', '').replace('?', '').replace('!', '').strip()
    # pattern = re.compile('[^\p{Latin}]', data)
    return data
def reencode(aaxfile: str,
             outpath: str,
             act_byte: str = None,
             key_iv: dict = None
             ):
    # decrypt and reencode to mp3
    if get_metadata_tags('major_brand') == 'aaxc':
        decrypt = f"-audible_key {key_iv['key']} -audible_iv {key_iv['iv']}"
    else:
        decrypt = f"-activation_bytes = {act_byte}"
    command = (
        "ffmpeg -loglevel error {} {} -i"
        " {} -vn -codec:a libmp3lame -qscale:a 2 -map_metadata -1"
        " -metadata \"title={}\" -metadata 'artist={}' -metadata"
        " 'album_artist={}' -metadata \"album={}\" -metadata 'date={}' "
        " -metadata track=1/1 -metadata 'genre={}' -metadata "
        "'copyright={}' \"{}\" ").format(
            stats, decrypt, aaxfile,
            get_metadata_tags('title'), get_metadata_tags('artist'),
            get_metadata_tags('album_artist'), get_metadata_tags('album'),
            get_metadata_tags('date'), get_metadata_tags('genre'),
            get_copyright(), outpath)
    process = subprocess.run(command, shell=True)
def getchaptercount():
    # Get the number of chapters
    ccount = _metadata['chapters']
    return len(ccount)
def get_chapter_metadata(cid, key):
    # get the Chapter _metadata
    for i in _metadata['chapters']:
        if i['id'] == cid:
            return i[key]
def movetochapters(path, outpath, chapter, title, start, end):
    # Creating individual chapters
    outfile = "{}/Ch-{} {}.mp3".format(
        outpath, chapter, title)
    command = "ffmpeg -loglevel error {} -i \"{}\" -ss {} -to {} -codec:a copy -metadata 'track={}' \"{}\"".format(stats, path,
                                                                                                                   start, end,
                                                                                                                   chapter, outfile)
    subprocess.run(command, shell=True)
    return
def getcoverart(path, outpath):
    # Pull the coverart from the file
    command = "ffmpeg -loglevel error -activation_bytes {} -i \"{}\" -an -codec:v copy \"{}/cover.jpg\"".format(act_byte,
                                                                                                                path, outpath)
    subprocess.run(command, shell=True)
def convert_aax(
                rfile: str,
                activation_bytes: str = None,
                key_iv: dict = None
                ) -> bool:
    '''
    Do the work of convering the file to mp3.
    Args:
        rfile (str): The file to convert
        activation_bytes Optional(str): The unique activation bytes from audible. # noqa E501
    Ret:
        Boolean if successful
    '''
    chapter = 0
    global _metadata
    _metadata = get_metadata(rfile)
    global act_byte
    act_byte = activation_bytes
    if _metadata == "None":
        sendtodiscord("There was a problem processing ", rfile)
    else:
        # artist = normalize_data(_metadata['format']['tags']['artist'])
        # title = normalize_data(_metadata['format']['tags']['title'])
        artist = normalize_data(_metadata['format']['tags']['artist'])
        title = normalize_data(_metadata['format']['tags']['title'])
        if activation_bytes is None or activation_bytes == '':
            sendtodiscord("We couldn't get the activation bytes", rfile)
            return False
        else:
            dest_dir = f"/workspaces/audiobooks/{artist}/{title}"
            processing_file = f"/tmp/{title}.mp3"
            if not os.path.exists(dest_dir):
                os.makedirs(dest_dir)
            reencode(rfile, processing_file, activation_bytes, key_iv)
            print(f"Splitting {title} into chapters")
            numchapters = len(_metadata['chapters'])
            while (numchapters > 0):
                cstart = get_chapter_metadata(chapter, 'start_time')
                cend = get_chapter_metadata(chapter, 'end_time')
                chapter += 1
                numchapters -= 1
                schap = str(chapter).zfill(2)
                movetochapters(processing_file, dest_dir, schap, title,
                               cstart, cend)
            os.remove(processing_file)
            getcoverart(rfile, dest_dir)
            sendtodiscord(f"We have processed the book {title} ", rfile)
            return True
def main():
    '''
    Main excution
    '''
    args = get_args()
    rfile = args.filename
    _metadata = get_metadata(rfile)
    if _metadata == "None":
        sendtodiscord("There was a problem processing ", rfile)
    else:
        artist = normalize_data(get_metadata_tags('artist'))
        title = normalize_data(get_metadata_tags('title'))
        act_byte = args.activation_bytes
        if act_byte is None or act_byte == '':
            sendtodiscord("We couldn't get the activation bytes", rfile)
            sys.exit(f"We couldn't get the activation bytes {rfile}")
        else:
            ddir = "%s/%s/%s" % (path, artist, title)
            single_file_path = "/processing/%s.mp3" % (title)
            if not os.path.exists(ddir):
                os.makedirs(ddir)
            print(ddir)
            reencode(rfile, single_file_path)
            if mode == 'chapter':
                chapter = 0
                numchapters = getchaptercount()
                while (numchapters > 0):
                    cstart = get_chapter_metadata(chapter, 'start_time')
                    cend = get_chapter_metadata(chapter, 'end_time')
                    chapter += 1
                    numchapters -= 1
                    schap = str(chapter).zfill(2)
                    movetochapters(single_file_path, ddir, schap, title,
                                   cstart, cend)
                os.remove(single_file_path)
            getcoverart(rfile, ddir)
            sendtodiscord(f"We have added the book {title} ", rfile)
if __name__ == "__main__":
    main()