Skip to content

Commit

Permalink
Generate content manifest files in exportcontent
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanmccall committed May 27, 2022
1 parent 7136d74 commit d459fc5
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 50 deletions.
39 changes: 33 additions & 6 deletions kolibri/core/content/management/commands/exportcontent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os

Expand All @@ -6,7 +7,10 @@
from ...utils import paths
from ...utils import transfer
from kolibri.core.content.errors import InvalidStorageFilenameError
from kolibri.core.content.utils.import_export_content import get_import_export_data
from kolibri.core.content.serializers import ContentManifestSerializer
from kolibri.core.content.utils.import_export_content import get_content_nodes_data
from kolibri.core.content.utils.import_export_content import get_content_nodes_selectors
from kolibri.core.content.utils.import_export_content import get_import_export_nodes
from kolibri.core.content.utils.paths import get_content_file_name
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.core.tasks.utils import get_current_job
Expand Down Expand Up @@ -76,11 +80,13 @@ def handle_async(self, *args, **options):
"Exporting content for channel id {} to {}".format(channel_id, data_dir)
)

(
total_resource_count,
files,
total_bytes_to_transfer,
) = get_import_export_data(channel_id, node_ids, exclude_node_ids, True)
nodes_queries_list = get_import_export_nodes(
channel_id, node_ids, exclude_node_ids, available=True
)

(total_resource_count, files, total_bytes_to_transfer) = get_content_nodes_data(
channel_id, nodes_queries_list, available=True
)

self.update_job_metadata(total_bytes_to_transfer, total_resource_count)

Expand All @@ -101,6 +107,13 @@ def handle_async(self, *args, **options):
if self.is_cancelled():
self.cancel()

logger.info(
"Exporting manifest for channel id {} to {}".format(channel_id, data_dir)
)

nodes_selectors = get_content_nodes_selectors(channel_id, nodes_queries_list)
self.update_content_manifest(nodes_selectors)

def export_file(self, f, data_dir, overall_progress_update):
filename = get_content_file_name(f)
try:
Expand Down Expand Up @@ -133,3 +146,17 @@ def export_file(self, f, data_dir, overall_progress_update):
job.save_meta()
return
return dest

def update_content_manifest(self, data_dir, nodes_selectors):
manifest_file = os.path.join(data_dir, "manifest.json")

with open(manifest_file, "r") as fp:
manifest_data = json.load(fp)

if not isinstance(manifest_data, list):
manifest_data = []

manifest_data.append(nodes_selectors)

with open(manifest_file, "w") as fp:
manifest_data = json.dump(manifest_data, fp)
184 changes: 140 additions & 44 deletions kolibri/core/content/utils/import_export_content.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import itertools
from math import ceil

from django.db.models import Max
Expand All @@ -10,6 +11,7 @@
from requests.exceptions import HTTPError
from requests.exceptions import Timeout

from kolibri.core.content.models import ChannelMetadata
from kolibri.core.content.models import ContentNode
from kolibri.core.content.models import LocalFile
from kolibri.core.content.utils.content_types_tools import (
Expand Down Expand Up @@ -112,15 +114,42 @@ def filter_by_file_availability(nodes_to_include, channel_id, drive_id, peer_id)

def get_import_export_data( # noqa: C901
channel_id,
node_ids,
exclude_node_ids,
available,
node_ids=None,
exclude_node_ids=None,
available=None,
drive_id=None,
peer_id=None,
renderable_only=True,
topic_thumbnails=True,
):
# Helper function that calls get_import_export_nodes followed by get_content_nodes_data.
nodes_queries_list = get_import_export_nodes(
channel_id,
node_ids,
exclude_node_ids,
available=available,
drive_id=drive_id,
peer_id=peer_id,
renderable_only=renderable_only,
)
return get_content_nodes_data(
channel_id,
nodes_queries_list,
available=available,
topic_thumbnails=topic_thumbnails,
)


def get_import_export_nodes( # noqa: C901
channel_id,
node_ids=None,
exclude_node_ids=None,
available=None,
drive_id=None,
peer_id=None,
renderable_only=True,
):
nodes_queries_list = []
min_boundary = 1

max_rght, dynamic_chunksize = _calculate_batch_params(
Expand All @@ -138,30 +167,25 @@ def get_import_export_data( # noqa: C901
nodes_to_include, channel_id, drive_id, peer_id
)

queried_file_objects = {}
number_of_resources = 0

while min_boundary < max_rght:

max_boundary = min_boundary + dynamic_chunksize

nodes_segment = nodes_to_include
nodes_query = nodes_to_include

# if requested, filter down to only include particular topics/nodes
if node_ids:
nodes_segment = nodes_segment.filter_by_uuids(
nodes_query = nodes_query.filter_by_uuids(
_mptt_descendant_ids(
channel_id, node_ids, min_boundary, min_boundary + dynamic_chunksize
)
)

# if requested, filter out nodes we're not able to render
if renderable_only:
nodes_segment = nodes_segment.filter(renderable_contentnodes_q_filter)
nodes_query = nodes_query.filter(renderable_contentnodes_q_filter)

# filter down the query to remove files associated with nodes we've specifically been asked to exclude
if exclude_node_ids:
nodes_segment = nodes_segment.order_by().exclude_by_uuids(
nodes_query = nodes_query.order_by().exclude_by_uuids(
_mptt_descendant_ids(
channel_id,
exclude_node_ids,
Expand All @@ -170,52 +194,124 @@ def get_import_export_data( # noqa: C901
)
)

count_content_ids = nodes_segment.count()
min_boundary += dynamic_chunksize

# Only bother with this query if there were any resources returned above.
if count_content_ids:
number_of_resources = number_of_resources + count_content_ids

if nodes_query.count() > 0:
nodes_queries_list.append(nodes_query)

return nodes_queries_list


def get_content_nodes_data(
channel_id, nodes_queries_list, available=None, topic_thumbnails=True
):
queried_file_objects = {}
number_of_resources = 0

for nodes_query in nodes_queries_list:
number_of_resources = number_of_resources + nodes_query.count()

file_objects = LocalFile.objects.filter(
files__contentnode__in=nodes_query
).values("id", "file_size", "extension")
if available is not None:
file_objects = file_objects.filter(available=available)
for f in file_objects:
queried_file_objects[f["id"]] = f

if topic_thumbnails:
# Do a query to get all the descendant and ancestor topics for this segment
segment_boundaries = nodes_query.aggregate(
min_boundary=Min("lft"), max_boundary=Max("rght")
)
segment_topics = ContentNode.objects.filter(
channel_id=channel_id, kind=content_kinds.TOPIC
).filter(
Q(
lft__lte=segment_boundaries["min_boundary"],
rght__gte=segment_boundaries["max_boundary"],
)
| Q(
lft__lte=segment_boundaries["max_boundary"],
rght__gte=segment_boundaries["min_boundary"],
)
)

file_objects = LocalFile.objects.filter(
files__contentnode__in=nodes_segment
files__contentnode__in=segment_topics,
).values("id", "file_size", "extension")
if available is not None:
file_objects = file_objects.filter(available=available)
for f in file_objects:
queried_file_objects[f["id"]] = f

if topic_thumbnails:
# Do a query to get all the descendant and ancestor topics for this segment
segment_boundaries = nodes_segment.aggregate(
min_boundary=Min("lft"), max_boundary=Max("rght")
)
segment_topics = ContentNode.objects.filter(
channel_id=channel_id, kind=content_kinds.TOPIC
).filter(
Q(
lft__lte=segment_boundaries["min_boundary"],
rght__gte=segment_boundaries["max_boundary"],
)
| Q(
lft__lte=segment_boundaries["max_boundary"],
rght__gte=segment_boundaries["min_boundary"],
)
)

file_objects = LocalFile.objects.filter(
files__contentnode__in=segment_topics,
).values("id", "file_size", "extension")
if available is not None:
file_objects = file_objects.filter(available=available)
for f in file_objects:
queried_file_objects[f["id"]] = f

min_boundary += dynamic_chunksize

files_to_download = list(queried_file_objects.values())

total_bytes_to_transfer = sum(map(lambda x: x["file_size"] or 0, files_to_download))
return number_of_resources, files_to_download, total_bytes_to_transfer


def get_content_nodes_selectors(channel_id, nodes_queries_list):
"""
Returns a dictionary with a set of include_node_ids and exclude_node_ids
that can be used with the given channel_id to import the nodes contained
in nodes_queries_list.
"""

include_node_ids = list()
exclude_node_ids = list()

channel_metadata = ChannelMetadata.objects.get(id=channel_id)

available_node_ids = set(
itertools.chain.from_iterable(
nodes_query.values_list("id", flat=True)
for nodes_query in nodes_queries_list
)
)

available_nodes_queue = [channel_metadata.root]

while len(available_nodes_queue) > 0:
node = available_nodes_queue.pop(0)

# We could add nodes to exclude_node_ids when less than half of the
# sibling nodes are missing. However, it is unclear if this would
# be useful.

if node.kind == "topic":
leaf_node_ids = _get_leaf_node_ids(node)
matching_leaf_nodes = leaf_node_ids.intersection(available_node_ids)
missing_leaf_nodes = leaf_node_ids.difference(available_node_ids)
if len(missing_leaf_nodes) == 0:
assert node.id not in include_node_ids
include_node_ids.append(node.id)
elif len(matching_leaf_nodes) > 0:
available_nodes_queue.extend(node.children.all())
elif node.id in available_node_ids:
assert node.id not in include_node_ids
include_node_ids.append(node.id)

return {
"id": channel_id,
"version": channel_metadata.version,
"include_node_ids": include_node_ids,
"exclude_node_ids": exclude_node_ids,
}


def _get_leaf_node_ids(node):
return set(
ContentNode.objects.filter(
lft__gte=node.lft, lft__lte=node.rght, channel_id=node.channel_id
)
.exclude(kind="topic")
.values_list("id", flat=True)
)


def retry_import(e):
"""
When an exception occurs during channel/content import, if
Expand Down

0 comments on commit d459fc5

Please sign in to comment.