Skip to content

Commit

Permalink
Simplify processing by removing future callback
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Feb 2, 2024
1 parent 78f29ef commit 97f2f32
Showing 1 changed file with 53 additions and 35 deletions.
88 changes: 53 additions & 35 deletions src/kolibri2zim/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import tempfile
import threading
import zipfile
from functools import partial
from pathlib import Path

import jinja2
Expand Down Expand Up @@ -378,7 +377,7 @@ def add_video_node(self, node_id):
dst = src.with_suffix(".webm")

# request conversion
self.convert_and_add_video_aside(vfid, src, vchk, dst, path, preset)
self.create_video_processing_job(vfid, src, vchk, dst, path, preset)

# we want low-q but no webm yet don't have low_res file, let's reencode
elif self.low_quality and alt_video_file is None:
Expand All @@ -401,7 +400,7 @@ def add_video_node(self, node_id):
src = src_

# request conversion
self.convert_and_add_video_aside(vfid, src, vchk, dst, path, preset)
self.create_video_processing_job(vfid, src, vchk, dst, path, preset)

# we want mp4, either in high-q or we have a low_res file to use
else:
Expand Down Expand Up @@ -448,24 +447,10 @@ def add_video_node(self, node_id):
)
logger.debug(f"Added video #{node_id}")

def add_video_upon_completion(self, s3_key, s3_meta, future):
def add_video_upon_completion(self, src_fname, dst_fpath, path, s3_key, s3_meta):
"""adds the converted video inside this future to the zim
logs error in case of failure"""
if future.cancelled():
return

try:
src_fname, dst_fpath, path = self.videos_futures[future]
except KeyError:
return

try:
future.result()
except Exception as exc:
logger.error(f"Error re-encoding {src_fname}: {exc}")
logger.exception(exc)
return

logger.debug(f"Re-encoded {src_fname} successfuly")

Expand All @@ -484,28 +469,61 @@ def add_video_upon_completion(self, s3_key, s3_meta, future):
)
logger.debug(f"Added {path} from re-encoded file")

def convert_and_add_video_aside(
def create_video_processing_job(
self, file_id, src_fpath, src_checksum, dest_fpath, path, preset
):
"""add video to the process-based conversion queue"""
"""add asynchronous job to process video in conversion queue"""

future = self.videos_executor.submit(
safer_reencode,
src_path=src_fpath,
dst_path=dest_fpath,
ffmpeg_args=preset.to_ffmpeg_args(),
delete_src=True,
with_process=False,
failsafe=False,
self.convert_and_add_video,
file_id=file_id,
src_fpath=src_fpath,
src_checksum=src_checksum,
dest_fpath=dest_fpath,
path=path,
preset=preset,
)
future.add_done_callback(
partial(
self.add_video_upon_completion,
self.s3_key_for(file_id, preset),
{"checksum": src_checksum, "encoder_version": str(preset.VERSION)},
# Keep future to track status
self.videos_futures.append(future)

def convert_and_add_video(
self, file_id, src_fpath, src_checksum, dest_fpath, path, preset
):
"""process video: reencode, upload to S3 (if needed) and add to ZIM"""
try:
safer_reencode(
src_path=src_fpath,
dst_path=dest_fpath,
ffmpeg_args=preset.to_ffmpeg_args(),
delete_src=True,
with_process=False,
failsafe=False,
)
except Exception as exc:
logger.error(f"Error re-encoding {src_fpath.name}: {exc}")
logger.exception(exc)
return

logger.debug(f"Re-encoded {src_fpath.name} successfuly")

self.upload_to_s3(
fpath=dest_fpath,
key=self.s3_key_for(file_id, preset),
meta={"checksum": src_checksum, "encoder_version": str(preset.VERSION)},
)
self.videos_futures.update({future: (src_fpath.name, dest_fpath, path)})

with self.creator_lock:
self.creator.add_item(
StaticItem(
path=path,
filepath=dest_fpath,
mimetype=get_file_mimetype(
dest_fpath
), # pyright: ignore [reportGeneralTypeIssues]
remove=True, # pyright: ignore [reportGeneralTypeIssues]
)
)
logger.debug(f"Added {path} from re-encoded file")

def add_audio_node(self, node_id):
"""Add content from this `audio` node to zim
Expand Down Expand Up @@ -831,15 +849,15 @@ def run(self):
self.nodes_executor = cf.ThreadPoolExecutor(max_workers=self.nb_threads)

# setup a dedicated queue for videos to convert
self.videos_futures = {} # future: src_fname, dst_fpath, path
self.videos_futures = [] # list of futures
self.videos_executor = cf.ProcessPoolExecutor(max_workers=self.nb_processes)

logger.info("Starting nodes processing")
self.populate_nodes_executor()

# await completion of all futures (nodes and videos)
result = cf.wait(
self.videos_futures.keys() | self.nodes_futures.keys(),
self.videos_futures | self.nodes_futures.keys(),
return_when=cf.FIRST_EXCEPTION,
)
self.nodes_executor.shutdown()
Expand Down

0 comments on commit 97f2f32

Please sign in to comment.