Chris Pomeroy
2026-01-17 ee9deff6fecf1354ea3e529dda3b7b850a81050f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env -S python -u
import argparse
import os
import subprocess
import json
import re
import httpx
import sys
 
# arguments
# activation_key, file name, codec(default to mp3)
# Globals
 
_act_byte = ""
_metadata = ""
mode = ""
stats = ""
path = ""
 
 
def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-s", "--single", help="Use this option to create a\
                        single file. This is false by default",
                        action="store_true")
    parser.add_argument("-d", "--dpath", help="Use this to set the destination\
                        path. Otherwise I will use the current directory")
    parser.add_argument("-v", "--verbose", help="Send output to stdout",
                        action="store_true")
    parser.add_argument("filename", help="Filename to convert, or directory to\
                        look in")
    parser.add_argument("-a", "--actvation_bytes", help='The activate bytes\
                        provided by audible')
 
    args = parser.parse_args()
 
    if args.dpath:
        path = args.dpath
    else:
        path = os.getcwd()
 
    if args.single:
        mode = 'single'
    else:
        mode = 'chapter'
 
    if args.verbose:
        stats = "-stats"
    else:
        stats = "-nostats"
 
    return args
 
 
def sendtodiscord(message, audiofile):
    '''
    Send a message to the discord group
 
    Arg:
        message (str):  Message to send.
        audiofile (str): File name that was processed
 
    Ret:
        Boolean
    '''
    webhookurl = "https://discord.com/api/webhooks/764667082272145418/vorf2JdFG47WAmQP3yZhgHH12wW_qUXG0bS0SG8INLYVwU0HcDFajq9doaDgi_hnI00-"  # noqa E501
    data = {
        "content": f"{message}, {audiofile}",
        "username": "Captain Audio",
        }
 
    with httpx.Client() as r:
        resp = r.post(webhookurl, json=data)
        if resp.status_code == 204:
            return True
        else:
            return False
 
 
def get_metadata(aaxfile):
    # Returns the _metadata from an aax file
    try:
        ret = subprocess.run(["ffprobe", "-v", "info", "-hide_banner",
                              "-show_format", "-show_chapters",
                              "-print_format", "json",
                              os.path.abspath(aaxfile)], capture_output=True)
        if ret.returncode == 0:
            mdata = json.loads(ret.stdout)
            aret = ret.stderr.decode().split('\n')[0]
            print(aret)
            mdata["checksum"] = aret.split()[-1]
            return mdata
        else:
            sys.stderr.write(ret.stderr)
            return "None"
    except Exception as err:
        sys.stderr.write("Error processing _metadata: {}\n".format(err))
        return "None"
 
 
def get_bitrate():
    # Return the bitrate of the media
    bit_rate = _metadata['format']['bit_rate']
    return bit_rate[:2]
 
 
def get_copyright():
    # Return normalized copyright data
    copyright = normalize_data(_metadata['format']['tags']['copyright'])
    return copyright
 
 
def get_metadata_tags(key):
    # get specific data
    tag = normalize_data(_metadata['format']['tags'][key])
    return " ".join(tag.split())
 
 
def normalize_data(data):
    # Return a normalized title
    data = data.replace("'", "").replace('(', '').replace(')', '').replace(':', '').replace('/', '-').replace('\\', '-').replace('"', '').replace('?', '').replace('!', '').strip()
    # pattern = re.compile('[^\p{Latin}]', data)
    return data
 
 
def reencode(aaxfile: str,
             outpath: str,
             act_byte: str = None,
             key_iv: dict = None
             ):
    # decrypt and reencode to mp3
    if get_metadata_tags('major_brand') == 'aaxc':
        decrypt = f"-audible_key {key_iv['key']} -audible_iv {key_iv['iv']}"
    else:
        decrypt = f"-activation_bytes = {act_byte}"
    command = (
        "ffmpeg -loglevel error {} {} -i"
        " {} -vn -codec:a libmp3lame -qscale:a 2 -map_metadata -1"
        " -metadata \"title={}\" -metadata 'artist={}' -metadata"
        " 'album_artist={}' -metadata \"album={}\" -metadata 'date={}' "
        " -metadata track=1/1 -metadata 'genre={}' -metadata "
        "'copyright={}' \"{}\" ").format(
            stats, decrypt, aaxfile,
            get_metadata_tags('title'), get_metadata_tags('artist'),
            get_metadata_tags('album_artist'), get_metadata_tags('album'),
            get_metadata_tags('date'), get_metadata_tags('genre'),
            get_copyright(), outpath)
    process = subprocess.run(command, shell=True)
 
 
def getchaptercount():
    # Get the number of chapters
    ccount = _metadata['chapters']
    return len(ccount)
 
 
def get_chapter_metadata(cid, key):
    # get the Chapter _metadata
    for i in _metadata['chapters']:
        if i['id'] == cid:
            return i[key]
 
 
def movetochapters(path, outpath, chapter, title, start, end):
    # Creating individual chapters
 
    outfile = "{}/Ch-{} {}.mp3".format(
        outpath, chapter, title)
    command = "ffmpeg -loglevel error {} -i \"{}\" -ss {} -to {} -codec:a copy -metadata 'track={}' \"{}\"".format(stats, path,
                                                                                                                   start, end,
                                                                                                                   chapter, outfile)
    subprocess.run(command, shell=True)
    return
 
 
def getcoverart(path, outpath):
    # Pull the coverart from the file
    command = "ffmpeg -loglevel error -activation_bytes {} -i \"{}\" -an -codec:v copy \"{}/cover.jpg\"".format(act_byte,
                                                                                                                path, outpath)
    subprocess.run(command, shell=True)
 
 
def convert_aax(
                rfile: str,
                activation_bytes: str = None,
                key_iv: dict = None
                ) -> bool:
    '''
    Do the work of convering the file to mp3.
 
    Args:
        rfile (str): The file to convert
        activation_bytes Optional(str): The unique activation bytes from audible. # noqa E501
 
    Ret:
        Boolean if successful
    '''
    chapter = 0
    global _metadata
    _metadata = get_metadata(rfile)
    global act_byte
    act_byte = activation_bytes
    if _metadata == "None":
        sendtodiscord("There was a problem processing ", rfile)
    else:
        # artist = normalize_data(_metadata['format']['tags']['artist'])
        # title = normalize_data(_metadata['format']['tags']['title'])
        artist = normalize_data(_metadata['format']['tags']['artist'])
        title = normalize_data(_metadata['format']['tags']['title'])
        if activation_bytes is None or activation_bytes == '':
            sendtodiscord("We couldn't get the activation bytes", rfile)
            return False
        else:
            dest_dir = f"/workspaces/audiobooks/{artist}/{title}"
            processing_file = f"/tmp/{title}.mp3"
            if not os.path.exists(dest_dir):
                os.makedirs(dest_dir)
            reencode(rfile, processing_file, activation_bytes, key_iv)
            print(f"Splitting {title} into chapters")
            numchapters = len(_metadata['chapters'])
            while (numchapters > 0):
                cstart = get_chapter_metadata(chapter, 'start_time')
                cend = get_chapter_metadata(chapter, 'end_time')
                chapter += 1
                numchapters -= 1
                schap = str(chapter).zfill(2)
                movetochapters(processing_file, dest_dir, schap, title,
                               cstart, cend)
            os.remove(processing_file)
            getcoverart(rfile, dest_dir)
            sendtodiscord(f"We have processed the book {title} ", rfile)
            return True
 
 
def main():
    '''
    Main excution
    '''
    args = get_args()
    rfile = args.filename
    _metadata = get_metadata(rfile)
    if _metadata == "None":
        sendtodiscord("There was a problem processing ", rfile)
    else:
        artist = normalize_data(get_metadata_tags('artist'))
        title = normalize_data(get_metadata_tags('title'))
        act_byte = args.activation_bytes
        if act_byte is None or act_byte == '':
            sendtodiscord("We couldn't get the activation bytes", rfile)
            sys.exit(f"We couldn't get the activation bytes {rfile}")
        else:
            ddir = "%s/%s/%s" % (path, artist, title)
            single_file_path = "/processing/%s.mp3" % (title)
            if not os.path.exists(ddir):
                os.makedirs(ddir)
            print(ddir)
            reencode(rfile, single_file_path)
            if mode == 'chapter':
                chapter = 0
                numchapters = getchaptercount()
                while (numchapters > 0):
                    cstart = get_chapter_metadata(chapter, 'start_time')
                    cend = get_chapter_metadata(chapter, 'end_time')
                    chapter += 1
                    numchapters -= 1
                    schap = str(chapter).zfill(2)
                    movetochapters(single_file_path, ddir, schap, title,
                                   cstart, cend)
                os.remove(single_file_path)
            getcoverart(rfile, ddir)
            sendtodiscord(f"We have added the book {title} ", rfile)
 
 
if __name__ == "__main__":
    main()