-
-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Problems decrypting some files with escape characters #149
Comments
Maybe this is OS related? Do you use Windows? |
No this is running on a debian server. Could it be related to this issue, where because |
The issue seams to be the If you use at least Python 3.8 you can try the following script as a replacement for the """Removes encryption of aax and aaxc files.
This is a proof-of-concept and for testing purposes only.
No error handling.
Need further work. Some options do not work or options are missing.
Needs at least ffmpeg 4.4
"""
import json
import operator
import pathlib
import re
import subprocess # noqa: S404
import tempfile
import typing as t
from enum import Enum
from functools import reduce
from glob import glob
from shlex import quote
from shutil import which
import click
from click import echo, secho
from audible_cli.decorators import pass_session
from audible_cli.exceptions import AudibleCliException
class ChapterError(AudibleCliException):
"""Base class for all chapter errors."""
class SupportedFiles(Enum):
AAX = ".aax"
AAXC = ".aaxc"
@classmethod
def get_supported_list(cls):
return list(set(item.value for item in cls))
@classmethod
def is_supported_suffix(cls, value):
return value in cls.get_supported_list()
@classmethod
def is_supported_file(cls, value):
return pathlib.PurePath(value).suffix in cls.get_supported_list()
def _get_input_files(
files: t.Union[t.Tuple[str], t.List[str]],
recursive: bool = True
) -> t.List[pathlib.Path]:
filenames = []
for filename in files:
# if the shell does not do filename globbing
expanded = list(glob(filename, recursive=recursive))
if (
len(expanded) == 0
and '*' not in filename
and not SupportedFiles.is_supported_file(filename)
):
raise(click.BadParameter("{filename}: file not found or supported."))
expanded_filter = filter(
lambda x: SupportedFiles.is_supported_file(x), expanded
)
expanded = list(map(lambda x: pathlib.Path(x).resolve(), expanded_filter))
filenames.extend(expanded)
return filenames
def recursive_lookup_dict(key: str, dictionary: t.Dict[str, t.Any]) -> t.Any:
if key in dictionary:
return dictionary[key]
for value in dictionary.values():
if isinstance(value, dict):
try:
item = recursive_lookup_dict(key, value)
except KeyError:
continue
else:
return item
raise KeyError
def get_aaxc_credentials(voucher_file: pathlib.Path):
if not voucher_file.exists() or not voucher_file.is_file():
raise AudibleCliException(f"Voucher file {voucher_file} not found.")
voucher_dict = json.loads(voucher_file.read_text())
try:
key = recursive_lookup_dict("key", voucher_dict)
iv = recursive_lookup_dict("iv", voucher_dict)
except KeyError:
raise AudibleCliException(f"No key/iv found in file {voucher_file}.") from None
return key, iv
class ApiChapterInfo:
def __init__(self, content_metadata: t.Dict[str, t.Any]) -> None:
chapter_info = self._parse(content_metadata)
self._chapter_info = chapter_info
@classmethod
def from_file(cls, file: t.Union[pathlib.Path, str]) -> "ApiChapterInfo":
file = pathlib.Path(file)
if not file.exists() or not file.is_file():
raise ChapterError(f"Chapter file {file} not found.")
content_string = pathlib.Path(file).read_text("utf-8")
content_json = json.loads(content_string)
return cls(content_json)
@staticmethod
def _parse(content_metadata: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
if "chapters" in content_metadata:
return content_metadata
try:
return recursive_lookup_dict("chapter_info", content_metadata)
except KeyError:
raise ChapterError("No chapter info found.") from None
def count_chapters(self):
return len(self.get_chapters())
def get_chapters(self, separate_intro_outro=False):
def extract_chapters(initial, current):
if "chapters" in current:
return initial + [current] + current["chapters"]
else:
return initial + [current]
chapters = list(
reduce(
extract_chapters,
self._chapter_info["chapters"],
[],
)
)
if separate_intro_outro:
return self._separate_intro_outro(chapters)
return chapters
def get_intro_duration_ms(self):
return self._chapter_info["brandIntroDurationMs"]
def get_outro_duration_ms(self):
return self._chapter_info["brandOutroDurationMs"]
def get_runtime_length_ms(self):
return self._chapter_info["runtime_length_ms"]
def is_accurate(self):
return self._chapter_info["is_accurate"]
def _separate_intro_outro(self, chapters):
echo("Separate Audible Brand Intro and Outro to own Chapter.")
chapters.sort(key=operator.itemgetter("start_offset_ms"))
first = chapters[0]
intro_dur_ms = self.get_intro_duration_ms()
first["start_offset_ms"] = intro_dur_ms
first["start_offset_sec"] = round(first["start_offset_ms"] / 1000)
first["length_ms"] -= intro_dur_ms
last = chapters[-1]
outro_dur_ms = self.get_outro_duration_ms()
last["length_ms"] -= outro_dur_ms
chapters.append(
{
"length_ms": intro_dur_ms,
"start_offset_ms": 0,
"start_offset_sec": 0,
"title": "Intro",
}
)
chapters.append(
{
"length_ms": outro_dur_ms,
"start_offset_ms": self.get_runtime_length_ms() - outro_dur_ms,
"start_offset_sec": round(
(self.get_runtime_length_ms() - outro_dur_ms) / 1000
),
"title": "Outro",
}
)
chapters.sort(key=operator.itemgetter("start_offset_ms"))
return chapters
class FFMeta:
SECTION = re.compile(r"\[(?P<header>[^]]+)\]")
OPTION = re.compile(r"(?P<option>.*?)\s*(?:(?P<vi>=)\s*(?P<value>.*))?$")
def __init__(self, ffmeta_file: t.Union[str, pathlib.Path]) -> None:
self._ffmeta_raw = pathlib.Path(ffmeta_file).read_text("utf-8")
self._ffmeta_parsed = self._parse_ffmeta()
def _parse_ffmeta(self):
parsed_dict = {}
start_section = "_"
cursec = parsed_dict[start_section] = {}
num_chap = 0
for line in iter(self._ffmeta_raw.splitlines()):
mo = self.SECTION.match(line)
if mo:
sec_name = mo.group("header")
if sec_name == "CHAPTER":
num_chap += 1
if sec_name not in parsed_dict:
parsed_dict[sec_name] = {}
cursec = parsed_dict[sec_name][num_chap] = {}
else:
cursec = parsed_dict[sec_name] = {}
else:
match = self.OPTION.match(line)
cursec.update({match.group("option"): match.group("value")})
return parsed_dict
def count_chapters(self):
return len(self._ffmeta_parsed["CHAPTER"])
def set_chapter_option(self, num, option, value):
chapter = self._ffmeta_parsed["CHAPTER"][num]
for chapter_option in chapter:
if chapter_option == option:
chapter[chapter_option] = value
def write(self, filename):
fp = pathlib.Path(filename).open("w", encoding="utf-8")
d = "="
for section in self._ffmeta_parsed:
if section == "_":
self._write_section(fp, None, self._ffmeta_parsed[section], d)
elif section == "CHAPTER":
# TODO: Tue etwas
for chapter in self._ffmeta_parsed[section]:
self._write_section(
fp, section, self._ffmeta_parsed[section][chapter], d
)
else:
self._write_section(fp, section, self._ffmeta_parsed[section], d)
@staticmethod
def _write_section(fp, section_name, section_items, delimiter):
"""Write a single section to the specified `fp`."""
if section_name is not None:
fp.write(f"[{section_name}]\n")
for key, value in section_items.items():
if value is None:
fp.write(f"{key}\n")
else:
fp.write(f"{key}{delimiter}{value}\n")
def update_chapters_from_chapter_info(
self,
chapter_info: ApiChapterInfo,
separate_intro_outro: bool = False
) -> None:
if not chapter_info.is_accurate():
echo("Metadata from API is not accurate. Skip.")
return
if chapter_info.count_chapters() != self.count_chapters():
raise ChapterError("Chapter mismatch")
echo(f"Found {self.count_chapters()} chapters to prepare.")
api_chapters = chapter_info.get_chapters(separate_intro_outro)
num_chap = 0
new_chapters = {}
for chapter in api_chapters:
chap_start = chapter["start_offset_ms"]
chap_end = chap_start + chapter["length_ms"]
num_chap += 1
new_chapters[num_chap] = {
"TIMEBASE": "1/1000",
"START": chap_start,
"END": chap_end,
"title": chapter["title"],
}
self._ffmeta_parsed["CHAPTER"] = new_chapters
def _get_voucher_filename(file: pathlib.Path) -> pathlib.Path:
return file.with_suffix(".voucher")
def _get_chapter_filename(file: pathlib.Path) -> pathlib.Path:
base_filename = file.stem.rsplit("-", 1)[0]
return file.with_name(base_filename + "-chapters.json")
def _get_ffmeta_file(file: pathlib.Path, tempdir: pathlib.Path) -> pathlib.Path:
metaname = file.with_suffix(".meta").name
metafile = tempdir / metaname
return metafile
class FfmpegFileDecrypter:
def __init__(
self,
file: pathlib.Path,
target_dir: pathlib.Path,
tempdir: pathlib.Path,
activation_bytes: t.Optional[str],
rebuild_chapters: bool,
ignore_missing_chapters: bool,
separate_intro_outro: bool
) -> None:
file_type = SupportedFiles(file.suffix)
credentials = None
if file_type == SupportedFiles.AAX:
if activation_bytes is None:
raise AudibleCliException(
"No activation bytes found. Do you ever run "
"`audible activation-bytes`?"
)
credentials = activation_bytes
elif file_type == SupportedFiles.AAXC:
voucher_filename = _get_voucher_filename(file)
credentials = get_aaxc_credentials(voucher_filename)
self._source = file
self._credentials: t.Optional[t.Union[str, t.Tuple[str]]] = credentials
self._target_dir = target_dir
self._tempdir = tempdir
self._rebuild_chapters = rebuild_chapters
self._ignore_missing_chapters = ignore_missing_chapters
self._separate_intro_outro = separate_intro_outro
self._api_chapter: t.Optional[ApiChapterInfo] = None
self._ffmeta: t.Optional[FFMeta] = None
self._is_rebuilded: bool = False
@property
def api_chapter(self) -> ApiChapterInfo:
if self._api_chapter is None:
try:
voucher_filename = _get_voucher_filename(self._source)
self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
except ChapterError:
voucher_filename = _get_chapter_filename(self._source)
self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
echo(f"Using chapters from {voucher_filename}")
return self._api_chapter
@property
def ffmeta(self) -> FFMeta:
if self._ffmeta is None:
metafile = _get_ffmeta_file(self._source, self._tempdir)
base_cmd = [
"ffmpeg",
"-v",
"quiet",
"-stats",
]
if isinstance(self._credentials, tuple):
key, iv = self._credentials
credentials_cmd = [
"-audible_key",
quote(key),
"-audible_iv",
quote(iv),
]
else:
credentials_cmd = [
"-activation_bytes",
quote(self._credentials),
]
base_cmd.extend(credentials_cmd)
extract_cmd = [
"-i",
self._source,
"-f",
"ffmetadata",
metafile,
]
base_cmd.extend(extract_cmd)
subprocess.check_output(base_cmd, text=True) # noqa: S603
self._ffmeta = FFMeta(metafile)
return self._ffmeta
def rebuild_chapters(self) -> None:
if not self._is_rebuilded:
self.ffmeta.update_chapters_from_chapter_info(
self.api_chapter, self._separate_intro_outro
)
self._is_rebuilded = True
def run(self):
oname = self._source.with_suffix(".m4b").name
outfile = self._target_dir / oname
if outfile.exists():
secho(f"Skip {outfile}: already exists", fg="blue")
return
base_cmd = [
"ffmpeg",
"-v",
"quiet",
"-stats",
]
if isinstance(self._credentials, tuple):
key, iv = self._credentials
credentials_cmd = [
"-audible_key",
quote(key),
"-audible_iv",
quote(iv),
]
else:
credentials_cmd = [
"-activation_bytes",
quote(self._credentials),
]
base_cmd.extend(credentials_cmd)
base_cmd.extend(
[
"-i",
self._source,
]
)
if self._rebuild_chapters:
metafile = _get_ffmeta_file(self._source, self._tempdir)
try:
self.rebuild_chapters()
self.ffmeta.write(metafile)
except ChapterError:
if not self._ignore_missing_chapters:
raise
else:
base_cmd.extend(
[
"-i",
metafile,
"-map_metadata",
"0",
"-map_chapters",
"1",
]
)
base_cmd.extend(
[
"-c",
"copy",
outfile,
]
)
subprocess.check_output(base_cmd, text=True) # noqa: S603
echo(f"File decryption successful: {outfile}")
@click.command("decrypt")
@click.argument("files", nargs=-1)
@click.option(
"--dir",
"-d",
"directory",
type=click.Path(exists=True, dir_okay=True),
default=pathlib.Path.cwd(),
help="Folder where the decrypted files should be saved.",
show_default=True
)
@click.option(
"--all",
"-a",
"all_",
is_flag=True,
help="Decrypt all aax and aaxc files in current folder."
)
@click.option("--overwrite", is_flag=True, help="Overwrite existing files.")
@click.option(
"--rebuild-chapters",
"-r",
is_flag=True,
help="Rebuild chapters with chapters from voucher or chapter file."
)
@click.option(
"--separate-intro-outro",
"-s",
is_flag=True,
help=(
"Separate Audible Brand Intro and Outro to own Chapter. "
"Only use with `--rebuild-chapters`."
),
)
@click.option(
"--ignore-missing-chapters",
"-t",
is_flag=True,
help=(
"Decrypt without rebuilding chapters when chapters are not present. "
"Otherwise an item is skipped when this option is not provided. "
"Only use with `--rebuild-chapters`."
),
)
@pass_session
def cli(
session,
files: str,
directory: t.Union[pathlib.Path, str],
all_: bool,
overwrite: bool,
rebuild_chapters: bool,
separate_intro_outro: bool,
ignore_missing_chapters: bool
):
"""Decrypt audiobooks downloaded with audible-cli.
FILES are the names of the file to decrypt.
Wildcards `*` and recursive lookup with `**` are supported.
Only FILES with `aax` or `aaxc` suffix are processed.
Other files are skipped silently.
"""
if not which("ffmpeg"):
ctx = click.get_current_context()
ctx.fail("ffmpeg not found")
if (separate_intro_outro or ignore_missing_chapters) and not rebuild_chapters:
raise click.BadOptionUsage(
"`--separate-intro-outro` and `--ignore-missing-chapters` can "
"only be used together with `--rebuild-chapters`"
)
if all_:
if files:
raise click.BadOptionUsage(
"If using `--all`, no FILES arguments can be used."
)
files = [f"*{suffix}" for suffix in SupportedFiles.get_supported_list()]
files = _get_input_files(files, recursive=True)
with tempfile.TemporaryDirectory() as tempdir:
for file in files:
decrypter = FfmpegFileDecrypter(
file=file,
target_dir=pathlib.Path(directory).resolve(),
tempdir=pathlib.Path(tempdir).resolve(),
activation_bytes=session.auth.activation_bytes,
rebuild_chapters=rebuild_chapters,
ignore_missing_chapters=ignore_missing_chapters,
separate_intro_outro=separate_intro_outro
)
decrypter.run() |
Ok it seems to be related to the post I linked above, subprocess.call(['ffmpeg', '-v', 'quiet', '-stats', '-activation_bytes', '*******', '-i', "'/audiobooks/...(....aax'", '-c', 'copy', "'/audiobooks/...(....m4b'"]) doesn't work, while subprocess.call(['ffmpeg', '-v', 'quiet', '-stats', '-activation_bytes', '*******', '-i', '/audiobooks/...(....aax', '-c', 'copy', '/audiobooks/...(....m4b']) works. I think that somewhere in the code, Python automatically adds the additional apostrophes to escape characters. This is a bit strange, but seems to be an issue with how |
We are reply at the same time ;)! |
Hahaha yeah, I was just testing that command when I saw your reply pop up! I'm testing out the new code you posted above, and so far it seems to work :D I'll update here if something new goes wrong. |
Quick update, your new code works! |
Good to hear that. The dark side is the reduced security due to the missing FYI: |
Right, is this an inherent problem with the
I can absolutely recommend it! The book is written by an American journalist who lived in Germany during large parts of the rise of the nazis and the initial few years of the war, and he had access to a lot of interesting information during and after the war. |
I tested my idea, and it did not work straight forward as I thought. Could a safer solution be to use Otherwise, having a flag to disable the |
Since Python 3.8 the The security risk in using unquoted strings in subprocess is the possibility of code injection via a manipulated file name. But as far as I read and wrote above they have done this in Python 3.8. |
For security purposes you should use Edit: |
Ah I see, that makes sense. (also we are soooo in sync hahaha) I'm on Python 3.11, so I guess this is still a bug? I'll try passing directly in a |
I can confirm that using |
I'm using |
I‘m think we can go with that security risks. If you don’t use these script with other files from external sources these should be safe enough. |
Oh wow! And what a jump in version number haha.
Yeah this is just a part of a script for a crontab I'm running, so it should be pretty safe. I'll just use the modified version you posted, and this might be part of a rewrite in the future 👍 Thank you for all the help! |
Yeah, this should be absolutely safe.
No problem at all!!! |
Fixes the issue with decrypting files with special characters, such as parentheses. This fix was posted in the issue discussion by @mkb79
I've had problems using the
decrypt
plugin on certain files. It seems to specifically be for files with characters which might be escaped, such as '(', however when I'm trying the exact same command which is executed bysubprocess
in the terminal, I get the expected output.Here is the output from the plugin:
Trying the same command in python without the
quiet
flag, I get this errorAnd finally trying this command in the terminal, ffmpeg converts the file fine. This also means that I have the same single apostrophes around the path, and no escape characters. Also, changing the name of the file seems to let the
decrypt
plugin work again.The text was updated successfully, but these errors were encountered: