Skip to content

Commit

Permalink
pull: Add queuing into the higher level logic
Browse files Browse the repository at this point in the history
Working on the libcurl backend, I didn't want to reimplement another queue. I
think the queue logic is really better done at the high level, since the fetcher
knows how we want to prioritize metadata over content, etc.

Adding another queue here is duplication, but things will look nicer when we can
actually delete the libsoup one in the next commit.

Closes: #654
Approved by: jlebon
  • Loading branch information
cgwalters authored and rh-atomic-bot committed Feb 7, 2017
1 parent 3d38f03 commit c18628e
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 35 deletions.
3 changes: 3 additions & 0 deletions src/libostree/ostree-repo-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ G_BEGIN_DECLS
#define _OSTREE_SUMMARY_CACHE_DIR "summaries"
#define _OSTREE_CACHE_DIR "cache"

#define _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS 8
#define _OSTREE_MAX_OUTSTANDING_DELTAPART_REQUESTS 2

typedef enum {
OSTREE_REPO_TEST_ERROR_PRE_COMMIT = (1 << 0)
} OstreeRepoTestErrorFlags;
Expand Down
223 changes: 188 additions & 35 deletions src/libostree/ostree-repo-pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ typedef struct {
GHashTable *scanned_metadata; /* Maps object name to itself */
GHashTable *requested_metadata; /* Maps object name to itself */
GHashTable *requested_content; /* Maps checksum to itself */
GHashTable *pending_fetch_metadata; /* Map<ObjectName,FetchObjectData> */
GHashTable *pending_fetch_content; /* Map<checksum,FetchObjectData> */
GHashTable *pending_fetch_deltaparts; /* Set<FetchStaticDeltaData> */
guint n_outstanding_metadata_fetches;
guint n_outstanding_metadata_write_requests;
guint n_outstanding_content_fetches;
Expand Down Expand Up @@ -133,6 +136,10 @@ typedef struct {
OtPullData *pull_data;
GVariant *objects;
char *expected_checksum;
char *from_revision;
char *to_revision;
guint i;
guint64 size;
} FetchStaticDeltaData;

typedef struct {
Expand All @@ -142,6 +149,10 @@ typedef struct {
guint recursion_depth;
} ScanObjectQueueData;

static void start_fetch (OtPullData *pull_data, FetchObjectData *fetch);
static void start_fetch_deltapart (OtPullData *pull_data,
FetchStaticDeltaData *fetch);
static gboolean fetcher_queue_is_full (OtPullData *pull_data);
static void queue_scan_one_metadata_object (OtPullData *pull_data,
const char *csum,
OstreeObjectType objtype,
Expand Down Expand Up @@ -271,6 +282,77 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
g_error_free (error);
}
}
else
{
GHashTableIter hiter;
gpointer key, value;

/* We may have just completed an async fetch operation. Now we look at
* possibly enqueuing more requests. The goal of queuing is to both avoid
* overloading the fetcher backend with HTTP requests, but also to
* prioritize metadata fetches over content, so we have accurate
* reporting. Hence here, we process metadata fetches first.
*/

/* Try filling the queue with metadata we need to fetch */
g_hash_table_iter_init (&hiter, pull_data->pending_fetch_metadata);
while (!fetcher_queue_is_full (pull_data) &&
g_hash_table_iter_next (&hiter, &key, &value))
{
GVariant *objname = key;
FetchObjectData *fetch = value;

/* Steal both key and value */
g_hash_table_iter_steal (&hiter);

/* This takes ownership of the value */
start_fetch (pull_data, fetch);
/* And unref the key */
g_variant_unref (objname);
}

/* Now, process deltapart requests */
g_hash_table_iter_init (&hiter, pull_data->pending_fetch_deltaparts);
while (!fetcher_queue_is_full (pull_data) &&
g_hash_table_iter_next (&hiter, &key, &value))
{
FetchStaticDeltaData *fetch = key;
g_hash_table_iter_steal (&hiter);
/* Takes ownership */
start_fetch_deltapart (pull_data, fetch);
}

/* Next, fill the queue with content */
g_hash_table_iter_init (&hiter, pull_data->pending_fetch_content);
while (!fetcher_queue_is_full (pull_data) &&
g_hash_table_iter_next (&hiter, &key, &value))
{
char *checksum = key;
FetchObjectData *fetch = value;

/* Steal both key and value */
g_hash_table_iter_steal (&hiter);

/* This takes ownership of the value */
start_fetch (pull_data, fetch);
/* And unref the key */
g_free (checksum);
}

}
}

/* We have a total-request limit, as well has a hardcoded max of 2 for delta
* parts. The logic for the delta one is that processing them is expensive, and
* doing multiple simultaneously could risk space/memory on smaller devices.
*/
static gboolean
fetcher_queue_is_full (OtPullData *pull_data)
{
return (pull_data->n_outstanding_metadata_fetches +
pull_data->n_outstanding_content_fetches +
pull_data->n_outstanding_deltapart_fetches) == _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS ||
pull_data->n_outstanding_deltapart_fetches == _OSTREE_MAX_OUTSTANDING_DELTAPART_REQUESTS;
}

static gboolean
Expand Down Expand Up @@ -942,6 +1024,8 @@ fetch_static_delta_data_free (gpointer data)
FetchStaticDeltaData *fetch_data = data;
g_free (fetch_data->expected_checksum);
g_variant_unref (fetch_data->objects);
g_free (fetch_data->from_revision);
g_free (fetch_data->to_revision);
g_free (fetch_data);
}

Expand Down Expand Up @@ -1343,52 +1427,92 @@ enqueue_one_object_request (OtPullData *pull_data,
gboolean is_detached_meta,
gboolean object_is_stored)
{
g_autofree char *obj_subpath = NULL;
gboolean is_meta;
FetchObjectData *fetch_data;
guint64 *expected_max_size_p;
guint64 expected_max_size;
GPtrArray *mirrorlist = NULL;

g_debug ("queuing fetch of %s.%s%s", checksum,
ostree_object_type_to_string (objtype),
is_detached_meta ? " (detached)" : "");
is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);

fetch_data = g_new0 (FetchObjectData, 1);
fetch_data->pull_data = pull_data;
fetch_data->object = ostree_object_name_serialize (checksum, objtype);
fetch_data->path = g_strdup (path);
fetch_data->is_detached_meta = is_detached_meta;
fetch_data->object_is_stored = object_is_stored;

if (is_detached_meta)
if (is_meta)
pull_data->n_requested_metadata++;
else
pull_data->n_requested_content++;

/* Are too many requests are in flight? */
if (fetcher_queue_is_full (pull_data))
{
char buf[_OSTREE_LOOSE_PATH_MAX];
_ostree_loose_path (buf, checksum, OSTREE_OBJECT_TYPE_COMMIT_META, pull_data->remote_mode);
obj_subpath = g_build_filename ("objects", buf, NULL);
mirrorlist = pull_data->meta_mirrorlist;
g_debug ("queuing fetch of %s.%s%s", checksum,
ostree_object_type_to_string (objtype),
is_detached_meta ? " (detached)" : "");

if (is_meta)
{
GVariant *objname = ostree_object_name_serialize (checksum, objtype);
g_hash_table_insert (pull_data->pending_fetch_metadata, objname, fetch_data);
}
else
{
g_hash_table_insert (pull_data->pending_fetch_content, g_strdup (checksum), fetch_data);
}
}
else
{
obj_subpath = _ostree_get_relative_object_path (checksum, objtype, TRUE);
mirrorlist = pull_data->content_mirrorlist;
start_fetch (pull_data, fetch_data);
}
}

static void
start_fetch (OtPullData *pull_data,
FetchObjectData *fetch)
{
gboolean is_meta;
g_autofree char *obj_subpath = NULL;
guint64 *expected_max_size_p;
guint64 expected_max_size;
const char *expected_checksum;
OstreeObjectType objtype;
GPtrArray *mirrorlist = NULL;

ostree_object_name_deserialize (fetch->object, &expected_checksum, &objtype);
is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);

g_debug ("starting fetch of %s.%s%s", expected_checksum,
ostree_object_type_to_string (objtype),
fetch->is_detached_meta ? " (detached)" : "");

is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);
if (is_meta)
pull_data->n_outstanding_metadata_fetches++;
else
pull_data->n_outstanding_content_fetches++;

/* Override the path if we're trying to fetch the .commitmeta file first */
if (fetch->is_detached_meta)
{
pull_data->n_outstanding_metadata_fetches++;
pull_data->n_requested_metadata++;
char buf[_OSTREE_LOOSE_PATH_MAX];
_ostree_loose_path (buf, expected_checksum, OSTREE_OBJECT_TYPE_COMMIT_META, pull_data->remote_mode);
obj_subpath = g_build_filename ("objects", buf, NULL);
mirrorlist = pull_data->meta_mirrorlist;
}
else
{
pull_data->n_outstanding_content_fetches++;
pull_data->n_requested_content++;
obj_subpath = _ostree_get_relative_object_path (expected_checksum, objtype, TRUE);
mirrorlist = pull_data->content_mirrorlist;
}
fetch_data = g_new0 (FetchObjectData, 1);
fetch_data->pull_data = pull_data;
fetch_data->object = ostree_object_name_serialize (checksum, objtype);
fetch_data->path = g_strdup (path);
fetch_data->is_detached_meta = is_detached_meta;
fetch_data->object_is_stored = object_is_stored;

expected_max_size_p = is_detached_meta ? NULL : g_hash_table_lookup (pull_data->expected_commit_sizes, checksum);
/* We may have determined maximum sizes from the summary file content; if so,
* honor it. Otherwise, metadata has a baseline max size.
*/
expected_max_size_p = fetch->is_detached_meta ? NULL : g_hash_table_lookup (pull_data->expected_commit_sizes, expected_checksum);
if (expected_max_size_p)
expected_max_size = *expected_max_size_p;
else if (is_meta)
else if (OSTREE_OBJECT_TYPE_IS_META (objtype))
expected_max_size = OSTREE_MAX_METADATA_SIZE;
else
expected_max_size = 0;
Expand All @@ -1398,7 +1522,7 @@ enqueue_one_object_request (OtPullData *pull_data,
is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY
: OSTREE_REPO_PULL_CONTENT_PRIORITY,
pull_data->cancellable,
is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch);
}

static gboolean
Expand Down Expand Up @@ -1502,6 +1626,22 @@ process_one_static_delta_fallback (OtPullData *pull_data,
return ret;
}

static void
start_fetch_deltapart (OtPullData *pull_data,
FetchStaticDeltaData *fetch)
{
g_autofree char *deltapart_path = _ostree_get_relative_static_delta_part_path (fetch->from_revision, fetch->to_revision, fetch->i);
pull_data->n_outstanding_deltapart_fetches++;
g_assert_cmpint (pull_data->n_outstanding_deltapart_fetches, <=, _OSTREE_MAX_OUTSTANDING_DELTAPART_REQUESTS);
_ostree_fetcher_request_to_tmpfile (pull_data->fetcher,
pull_data->content_mirrorlist,
deltapart_path, fetch->size,
OSTREE_FETCHER_DEFAULT_PRIORITY,
pull_data->cancellable,
static_deltapart_fetch_on_complete,
fetch);
}

static gboolean
process_one_static_delta (OtPullData *pull_data,
const char *from_revision,
Expand Down Expand Up @@ -1652,9 +1792,13 @@ process_one_static_delta (OtPullData *pull_data,
continue;

fetch_data = g_new0 (FetchStaticDeltaData, 1);
fetch_data->from_revision = g_strdup (from_revision);
fetch_data->to_revision = g_strdup (to_revision);
fetch_data->pull_data = pull_data;
fetch_data->objects = g_variant_ref (objects);
fetch_data->expected_checksum = ostree_checksum_from_bytes_v (csum_v);
fetch_data->size = size;
fetch_data->i = i;

if (inline_part_bytes != NULL)
{
Expand All @@ -1678,14 +1822,12 @@ process_one_static_delta (OtPullData *pull_data,
}
else
{
_ostree_fetcher_request_to_tmpfile (pull_data->fetcher,
pull_data->content_mirrorlist,
deltapart_path, size,
OSTREE_FETCHER_DEFAULT_PRIORITY,
pull_data->cancellable,
static_deltapart_fetch_on_complete,
fetch_data);
pull_data->n_outstanding_deltapart_fetches++;
if (!fetcher_queue_is_full (pull_data))
start_fetch_deltapart (pull_data, fetch_data);
else
{
g_hash_table_add (pull_data->pending_fetch_deltaparts, fetch_data);
}
}
}

Expand Down Expand Up @@ -2446,6 +2588,14 @@ ostree_repo_pull_with_options (OstreeRepo *self,
(GDestroyNotify)g_free, NULL);
pull_data->requested_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
(GDestroyNotify)g_variant_unref, NULL);
pull_data->pending_fetch_content = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify)g_free,
(GDestroyNotify)fetch_object_data_free);
pull_data->pending_fetch_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
(GDestroyNotify)g_variant_unref,
(GDestroyNotify)fetch_object_data_free);
pull_data->pending_fetch_deltaparts = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)fetch_static_delta_data_free, NULL);

if (dir_to_pull != NULL || dirs_to_pull != NULL)
{
pull_data->dirs = g_ptr_array_new_with_free_func (g_free);
Expand Down Expand Up @@ -3157,6 +3307,9 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_clear_pointer (&pull_data->summary_deltas_checksums, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_content, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_deltaparts, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->idle_src, (GDestroyNotify) g_source_destroy);
g_clear_pointer (&pull_data->dirs, (GDestroyNotify) g_ptr_array_unref);
g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref);
Expand Down

0 comments on commit c18628e

Please sign in to comment.