Skip to content

Commit

Permalink
files: use tmp folder to process videos
Browse files Browse the repository at this point in the history
* copy video file to process in a temporary folder and use it for
  extract metadata and frames to avoid to copy it multiple times
* fix a layout issue of thumbnails for portrait videos
* fix an exception happening when an old video do not have the
  celery_task_id field in the metadata
* closes #1891
  • Loading branch information
ntarocco committed Jun 14, 2022
1 parent aba1d8f commit fcaab22
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 57 deletions.
8 changes: 6 additions & 2 deletions cds/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ def _parse_env_bool(var_name, default=None):
# Every 12 minutes, not to be at the same time as the others
"schedule": timedelta(minutes=12),
},
"clean-tmp-videos": {
"task": "cds.modules.maintenance.tasks.clean_tmp_videos",
"schedule": crontab(minute=0, hour=3), # at 3 am
},
# 'file-integrity-report': {
# 'task': 'cds.modules.records.tasks.file_integrity_report',
# 'schedule': crontab(minute=0, hour=7), # Every day at 07:00 UTC
Expand Down Expand Up @@ -183,8 +187,8 @@ def _parse_env_bool(var_name, default=None):
# IIIF
###############################################################################

IIIF_CACHE_REDIS_URL = "redis://localhost:6379/0"
IIIF_CACHE_TIME = "604800" # 7 days
IIIF_CACHE_REDIS_URL = "redis://localhost:16379/0"
IIIF_CACHE_TIME = "36000" # 10 hours
IIIF_CACHE_HANDLER = "flask_iiif.cache.redis:ImageRedisCache"

###############################################################################
Expand Down
22 changes: 6 additions & 16 deletions cds/modules/fixtures/data/videos/ATLAS-VIDEO-2017-013.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
{
"$schema": "https://cds.cern.ch/schemas/records/videos/project/project-v1.0.0.json",
"$schema": "https://localhost:5000/schemas/records/videos/project/project-v1.0.0.json",
"_access": {
"update": []
},
"category": "ATLAS",
"contributors": [
{
"affiliations": [
"Universita degli Studi di Udine (IT)"
],
"affiliations": ["Universita degli Studi di Udine (IT)"],
"email": "emma.ward@cern.ch",
"ids": [
{
Expand All @@ -27,7 +25,6 @@
"name": "ATLAS Collaboration",
"role": "Producer"
}

],
"date": "2017-05-19",
"description": "Tales of the tile calorimeter.",
Expand Down Expand Up @@ -63,23 +60,19 @@
],
"publication_date": "2017-06-02",
"recid": 1,
"report_number": [
"ATLAS-VIDEO-2017-013"
],
"report_number": ["ATLAS-VIDEO-2017-013"],
"title": {
"title": "Tales of the tile calorimeter"
},
"translations": [],
"type": "VIDEO",
"videos": [
{
"$schema": "https://cds.cern.ch/schemas/records/videos/video/video-v1.0.0.json",
"$schema": "https://localhost:5000/schemas/records/videos/video/video-v1.0.0.json",
"category": "ATLAS",
"contributors": [
{
"affiliations": [
"Universita degli Studi di Udine (IT)"
],
"affiliations": ["Universita degli Studi di Udine (IT)"],
"email": "emma.ward@cern.ch",
"ids": [
{
Expand Down Expand Up @@ -143,9 +136,7 @@
"publication_date": "2017-06-02",
"recid": 2,
"related_links": [],
"report_number": [
"ATLAS-VIDEO-2017-013-001"
],
"report_number": ["ATLAS-VIDEO-2017-013-001"],
"title": {
"title": "Tales of the Tile Calorimeter"
},
Expand Down Expand Up @@ -345,7 +336,6 @@
}
]
}

}
]
}
9 changes: 5 additions & 4 deletions cds/modules/flows/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from collections import defaultdict

from celery import chain as celery_chain
from celery.result import AsyncResult
from invenio_db import db
from sqlalchemy.orm.attributes import flag_modified as db_flag_modified

Expand Down Expand Up @@ -111,7 +110,8 @@ def _build_chain(cls, payload):
file_download_task = cls.create_task(DownloadTask, payload)
celery_tasks.append(file_download_task)

metadata_extract_task = cls.create_task(ExtractMetadataTask, payload)
metadata_extract_task = cls.create_task(ExtractMetadataTask, payload,
delete_copied=False)
celery_tasks.append(metadata_extract_task)

frames_extract_task = cls.create_task(ExtractFramesTask, payload)
Expand Down Expand Up @@ -267,8 +267,9 @@ def stop(self):
"""Stop the flow."""
for task in self.flow_metadata.tasks:
if task.status in [FlowTaskStatus.STARTED, FlowTaskStatus.PENDING]:
celery_task_id = task.payload["celery_task_id"]
CeleryTask.stop_task(celery_task_id)
celery_task_id = task.payload.get("celery_task_id")
if celery_task_id:
CeleryTask.stop_task(celery_task_id)
task.status = FlowTaskStatus.CANCELLED

deposit_id = self.flow_metadata.deposit_id
Expand Down
33 changes: 14 additions & 19 deletions cds/modules/flows/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

from __future__ import absolute_import

import os
import shutil
import tempfile
Expand All @@ -39,6 +37,9 @@
from ..xrootd.utils import file_opener_xrootd


VIDEOS_FILES_TMP_FOLDER = os.path.join(tempfile.gettempdir(), "videos")


def _rename_key(object_version):
"""Renames the object_version key to avoid issues with subformats
objectVersions key.
Expand Down Expand Up @@ -104,27 +105,21 @@ def move_file_into_local(obj, delete=True):
if os.path.exists(obj.file.uri):
yield obj.file.uri
else:
temp_location = obj.get_tags().get("temp_location", None)
if not temp_location:
temp_folder = tempfile.mkdtemp()
temp_location = os.path.join(temp_folder, "data")

with open(temp_location, "wb") as dst:
tmp_path = os.path.join(VIDEOS_FILES_TMP_FOLDER, str(obj.file_id))
if not os.path.exists(tmp_path):
os.makedirs(tmp_path)

filepath = os.path.join(tmp_path, "data")
if not os.path.exists(filepath):
# copy the file locally
with open(filepath, "wb") as dst:
shutil.copyfileobj(file_opener_xrootd(obj.file.uri, "rb"), dst)

ObjectVersionTag.create(obj, "temp_location", temp_location)
db.session.commit()
else:
temp_folder = os.path.dirname(temp_location)
try:
yield temp_location
yield filepath
except:
shutil.rmtree(temp_folder)
ObjectVersionTag.delete(obj, "temp_location")
db.session.commit()
shutil.rmtree(tmp_path)
raise
else:
if delete:
shutil.rmtree(temp_folder)
ObjectVersionTag.delete(obj, "temp_location")
db.session.commit()
shutil.rmtree(tmp_path)
25 changes: 14 additions & 11 deletions cds/modules/flows/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
from __future__ import absolute_import

import json
import logging
import os
import shutil
import signal
import tempfile
import time

import jsonpatch
import requests
Expand Down Expand Up @@ -390,20 +388,19 @@ def clean(self, deposit_id, version_id, *args, **kwargs):

# Delete tmp file if any
obj = as_object_version(version_id)
temp_location = obj.get_tags().get("temp_location", None)
if temp_location:
shutil.rmtree(temp_location)
ObjectVersionTag.delete(obj, "temp_location")
db.session.commit()
tmp_path = os.path.join(tempfile.gettempdir(), obj.file_id)
if os.path.exists(tmp_path):
shutil.rmtree(tmp_path)

@classmethod
def get_metadata_from_video_file(cls, object_=None, uri=None):
def get_metadata_from_video_file(cls, object_=None, uri=None,
delete_copied=True):
"""Get metadata from video file."""
# Extract video's metadata using `ff_probe`
if uri:
metadata = ff_probe_all(uri)
else:
with move_file_into_local(object_) as url:
with move_file_into_local(object_, delete=delete_copied) as url:
metadata = ff_probe_all(url)
return dict(metadata["format"], **metadata["streams"][0])

Expand All @@ -428,6 +425,11 @@ def run(self, uri=None, *args, **kwargs):
:param self: reference to instance of task base class
"""
# delete copied video file after metadata extraction.
# when chained with the extract frames, it should be False to speed
# up the process
delete_copied = kwargs.pop("delete_copied", True)

pid = PersistentIdentifier.get("depid", self.deposit_id)
recid = str(pid.object_uuid)

Expand All @@ -444,7 +446,8 @@ def run(self, uri=None, *args, **kwargs):
self.log("Started task {0}".format(kwargs["task_id"]))

metadata = self.get_metadata_from_video_file(
object_=self.object_version, uri=uri
object_=self.object_version, uri=uri,
delete_copied=delete_copied
)
try:
extracted_dict = self.create_metadata_tags(
Expand Down Expand Up @@ -639,7 +642,7 @@ def _create_tmp_frames(
):
"""Create frames in temporary files."""
# Generate frames
with move_file_into_local(object_) as url:
with move_file_into_local(object_, delete=True) as url:
ff_frames(
input_file=url,
start=start_time,
Expand Down
48 changes: 48 additions & 0 deletions cds/modules/maintenance/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2022 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Maintenance tasks."""

import os
import shutil
import time

from celery import shared_task

from cds.modules.flows.files import VIDEOS_FILES_TMP_FOLDER


@shared_task(ignore_result=True)
def clean_tmp_videos():
"""Delete old processed videos in the tmp folder."""
now = time.time()
SEVEN_DAYS_AGO = now - 7 * 60 * 60 * 24

if not os.path.exists(VIDEOS_FILES_TMP_FOLDER):
return

for folder in os.listdir(VIDEOS_FILES_TMP_FOLDER):
path = os.path.join(VIDEOS_FILES_TMP_FOLDER, folder)
if not os.path.isdir(path):
continue

last_modification_time = os.stat(path).st_mtime
to_delete = last_modification_time < SEVEN_DAYS_AGO
if to_delete:
shutil.rmtree(path)
15 changes: 12 additions & 3 deletions cds/modules/theme/static/scss/cds.scss
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,18 @@ a.cds-anchor:hover{
}
}

// ensure that portrait images are sized correctly
div[cds-search-results] {
.cds-video-title {
img {
width: 100%;
height: 13em;
object-fit: contain;
background-color: black;
}
}
}

.cds-video-title-with-play-button {
position: relative;
width: 100%;
Expand Down Expand Up @@ -858,9 +870,6 @@ a.cds-anchor:hover{
}

// Brief
.cds-video-title {
position: relative;
}

.cds-video-duration {
position: absolute;
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ services:
extends:
file: docker-services.yml
service: cache
cache-iiif:
extends:
file: docker-services.yml
service: cache-iiif
db:
extends:
file: docker-services.yml
Expand Down
9 changes: 8 additions & 1 deletion docker-services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ services:
- "80"
- "443"
cache:
image: redis
image: redis:6
restart: "always"
read_only: true
ports:
- "6379:6379"
cache-iiif:
image: redis:6
restart: "always"
read_only: true
ports:
- "16379:6379"
command: redis-server --save ""
db:
image: postgres:12.5
restart: "always"
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@
'cds_deposit_tasks = cds.modules.deposit',
'cds_opencast_tasks = cds.modules.opencast.tasks',
'cds_records_tasks = cds.modules.records',
'cds_maintenance = cds.modules.maintenance.tasks',
],
'invenio_previewer.previewers': [
'cds_video = cds.modules.previewer.extensions.video:video',
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_celery_beat(app):
beats = [task['schedule']
for task in app.config['CELERY_BEAT_SCHEDULE'].values()
if isinstance(task['schedule'], crontab)]
assert len(beats) == 2
assert len(beats) == 3
for beat in beats:
[hour] = beat.hour
assert hour <= 7

0 comments on commit fcaab22

Please sign in to comment.