#!/usr/bin/env -S python -u
|
import argparse
|
import os
|
import subprocess
|
import json
|
import re
|
import httpx
|
import sys
|
|
# arguments
|
# activation_key, file name, codec(default to mp3)
|
# Globals
|
|
_act_byte = ""
|
_metadata = ""
|
mode = ""
|
stats = ""
|
path = ""
|
|
|
def get_args():
|
parser = argparse.ArgumentParser()
|
parser.add_argument("-s", "--single", help="Use this option to create a\
|
single file. This is false by default",
|
action="store_true")
|
parser.add_argument("-d", "--dpath", help="Use this to set the destination\
|
path. Otherwise I will use the current directory")
|
parser.add_argument("-v", "--verbose", help="Send output to stdout",
|
action="store_true")
|
parser.add_argument("filename", help="Filename to convert, or directory to\
|
look in")
|
parser.add_argument("-a", "--actvation_bytes", help='The activate bytes\
|
provided by audible')
|
|
args = parser.parse_args()
|
|
if args.dpath:
|
path = args.dpath
|
else:
|
path = os.getcwd()
|
|
if args.single:
|
mode = 'single'
|
else:
|
mode = 'chapter'
|
|
if args.verbose:
|
stats = "-stats"
|
else:
|
stats = "-nostats"
|
|
return args
|
|
|
def sendtodiscord(message, audiofile):
|
'''
|
Send a message to the discord group
|
|
Arg:
|
message (str): Message to send.
|
audiofile (str): File name that was processed
|
|
Ret:
|
Boolean
|
'''
|
webhookurl = "https://discord.com/api/webhooks/764667082272145418/vorf2JdFG47WAmQP3yZhgHH12wW_qUXG0bS0SG8INLYVwU0HcDFajq9doaDgi_hnI00-" # noqa E501
|
data = {
|
"content": f"{message}, {audiofile}",
|
"username": "Captain Audio",
|
}
|
|
with httpx.Client() as r:
|
resp = r.post(webhookurl, json=data)
|
if resp.status_code == 204:
|
return True
|
else:
|
return False
|
|
|
def get_metadata(aaxfile):
|
# Returns the _metadata from an aax file
|
try:
|
ret = subprocess.run(["ffprobe", "-v", "info", "-hide_banner",
|
"-show_format", "-show_chapters",
|
"-print_format", "json",
|
os.path.abspath(aaxfile)], capture_output=True)
|
if ret.returncode == 0:
|
mdata = json.loads(ret.stdout)
|
aret = ret.stderr.decode().split('\n')[0]
|
print(aret)
|
mdata["checksum"] = aret.split()[-1]
|
return mdata
|
else:
|
sys.stderr.write(ret.stderr)
|
return "None"
|
except Exception as err:
|
sys.stderr.write("Error processing _metadata: {}\n".format(err))
|
return "None"
|
|
|
def get_bitrate():
|
# Return the bitrate of the media
|
bit_rate = _metadata['format']['bit_rate']
|
return bit_rate[:2]
|
|
|
def get_copyright():
|
# Return normalized copyright data
|
copyright = normalize_data(_metadata['format']['tags']['copyright'])
|
return copyright
|
|
|
def get_metadata_tags(key):
|
# get specific data
|
tag = normalize_data(_metadata['format']['tags'][key])
|
return " ".join(tag.split())
|
|
|
def normalize_data(data):
|
# Return a normalized title
|
data = data.replace("'", "").replace('(', '').replace(')', '').replace(':', '').replace('/', '-').replace('\\', '-').replace('"', '').replace('?', '').replace('!', '').strip()
|
# pattern = re.compile('[^\p{Latin}]', data)
|
return data
|
|
|
def reencode(aaxfile: str,
|
outpath: str,
|
act_byte: str = None,
|
key_iv: dict = None
|
):
|
# decrypt and reencode to mp3
|
if get_metadata_tags('major_brand') == 'aaxc':
|
decrypt = f"-audible_key {key_iv['key']} -audible_iv {key_iv['iv']}"
|
else:
|
decrypt = f"-activation_bytes = {act_byte}"
|
command = (
|
"ffmpeg -loglevel error {} {} -i"
|
" {} -vn -codec:a libmp3lame -qscale:a 2 -map_metadata -1"
|
" -metadata \"title={}\" -metadata 'artist={}' -metadata"
|
" 'album_artist={}' -metadata \"album={}\" -metadata 'date={}' "
|
" -metadata track=1/1 -metadata 'genre={}' -metadata "
|
"'copyright={}' \"{}\" ").format(
|
stats, decrypt, aaxfile,
|
get_metadata_tags('title'), get_metadata_tags('artist'),
|
get_metadata_tags('album_artist'), get_metadata_tags('album'),
|
get_metadata_tags('date'), get_metadata_tags('genre'),
|
get_copyright(), outpath)
|
process = subprocess.run(command, shell=True)
|
|
|
def getchaptercount():
|
# Get the number of chapters
|
ccount = _metadata['chapters']
|
return len(ccount)
|
|
|
def get_chapter_metadata(cid, key):
|
# get the Chapter _metadata
|
for i in _metadata['chapters']:
|
if i['id'] == cid:
|
return i[key]
|
|
|
def movetochapters(path, outpath, chapter, title, start, end):
|
# Creating individual chapters
|
|
outfile = "{}/Ch-{} {}.mp3".format(
|
outpath, chapter, title)
|
command = "ffmpeg -loglevel error {} -i \"{}\" -ss {} -to {} -codec:a copy -metadata 'track={}' \"{}\"".format(stats, path,
|
start, end,
|
chapter, outfile)
|
subprocess.run(command, shell=True)
|
return
|
|
|
def getcoverart(path, outpath):
|
# Pull the coverart from the file
|
command = "ffmpeg -loglevel error -activation_bytes {} -i \"{}\" -an -codec:v copy \"{}/cover.jpg\"".format(act_byte,
|
path, outpath)
|
subprocess.run(command, shell=True)
|
|
|
def convert_aax(
|
rfile: str,
|
activation_bytes: str = None,
|
key_iv: dict = None
|
) -> bool:
|
'''
|
Do the work of convering the file to mp3.
|
|
Args:
|
rfile (str): The file to convert
|
activation_bytes Optional(str): The unique activation bytes from audible. # noqa E501
|
|
Ret:
|
Boolean if successful
|
'''
|
chapter = 0
|
global _metadata
|
_metadata = get_metadata(rfile)
|
global act_byte
|
act_byte = activation_bytes
|
if _metadata == "None":
|
sendtodiscord("There was a problem processing ", rfile)
|
else:
|
# artist = normalize_data(_metadata['format']['tags']['artist'])
|
# title = normalize_data(_metadata['format']['tags']['title'])
|
artist = normalize_data(_metadata['format']['tags']['artist'])
|
title = normalize_data(_metadata['format']['tags']['title'])
|
if activation_bytes is None or activation_bytes == '':
|
sendtodiscord("We couldn't get the activation bytes", rfile)
|
return False
|
else:
|
dest_dir = f"/workspaces/audiobooks/{artist}/{title}"
|
processing_file = f"/tmp/{title}.mp3"
|
if not os.path.exists(dest_dir):
|
os.makedirs(dest_dir)
|
reencode(rfile, processing_file, activation_bytes, key_iv)
|
print(f"Splitting {title} into chapters")
|
numchapters = len(_metadata['chapters'])
|
while (numchapters > 0):
|
cstart = get_chapter_metadata(chapter, 'start_time')
|
cend = get_chapter_metadata(chapter, 'end_time')
|
chapter += 1
|
numchapters -= 1
|
schap = str(chapter).zfill(2)
|
movetochapters(processing_file, dest_dir, schap, title,
|
cstart, cend)
|
os.remove(processing_file)
|
getcoverart(rfile, dest_dir)
|
sendtodiscord(f"We have processed the book {title} ", rfile)
|
return True
|
|
|
def main():
|
'''
|
Main excution
|
'''
|
args = get_args()
|
rfile = args.filename
|
_metadata = get_metadata(rfile)
|
if _metadata == "None":
|
sendtodiscord("There was a problem processing ", rfile)
|
else:
|
artist = normalize_data(get_metadata_tags('artist'))
|
title = normalize_data(get_metadata_tags('title'))
|
act_byte = args.activation_bytes
|
if act_byte is None or act_byte == '':
|
sendtodiscord("We couldn't get the activation bytes", rfile)
|
sys.exit(f"We couldn't get the activation bytes {rfile}")
|
else:
|
ddir = "%s/%s/%s" % (path, artist, title)
|
single_file_path = "/processing/%s.mp3" % (title)
|
if not os.path.exists(ddir):
|
os.makedirs(ddir)
|
print(ddir)
|
reencode(rfile, single_file_path)
|
if mode == 'chapter':
|
chapter = 0
|
numchapters = getchaptercount()
|
while (numchapters > 0):
|
cstart = get_chapter_metadata(chapter, 'start_time')
|
cend = get_chapter_metadata(chapter, 'end_time')
|
chapter += 1
|
numchapters -= 1
|
schap = str(chapter).zfill(2)
|
movetochapters(single_file_path, ddir, schap, title,
|
cstart, cend)
|
os.remove(single_file_path)
|
getcoverart(rfile, ddir)
|
sendtodiscord(f"We have added the book {title} ", rfile)
|
|
|
if __name__ == "__main__":
|
main()
|