Skip to content

gvfs-helper: auto-retry after network errors, resource throttling, split GET and POST semantics #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 149 additions & 76 deletions gvfs-helper-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

static struct oidset gh_client__oidset_queued = OIDSET_INIT;
static unsigned long gh_client__oidset_count;
static int gh_client__includes_immediate;

struct gh_server__process {
struct subprocess_entry subprocess; /* must be first */
Expand All @@ -24,13 +23,20 @@ static int gh_server__subprocess_map_initialized;
static struct hashmap gh_server__subprocess_map;
static struct object_directory *gh_client__chosen_odb;

#define CAP_GET (1u<<1)
/*
* The "objects" capability has 2 verbs: "get" and "post".
*/
#define CAP_OBJECTS (1u<<1)
#define CAP_OBJECTS_NAME "objects"

#define CAP_OBJECTS__VERB_GET1_NAME "get"
#define CAP_OBJECTS__VERB_POST_NAME "post"

static int gh_client__start_fn(struct subprocess_entry *subprocess)
{
static int versions[] = {1, 0};
static struct subprocess_capability capabilities[] = {
{ "get", CAP_GET },
{ CAP_OBJECTS_NAME, CAP_OBJECTS },
{ NULL, 0 }
};

Expand All @@ -42,14 +48,16 @@ static int gh_client__start_fn(struct subprocess_entry *subprocess)
}

/*
* Send:
* Send the queued OIDs in the OIDSET to gvfs-helper for it to
* fetch from the cache-server or main Git server using "/gvfs/objects"
* POST semantics.
*
* get LF
* objects.post LF
* (<hex-oid> LF)*
* <flush>
*
*/
static int gh_client__get__send_command(struct child_process *process)
static int gh_client__send__objects_post(struct child_process *process)
{
struct oidset_iter iter;
struct object_id *oid;
Expand All @@ -60,7 +68,9 @@ static int gh_client__get__send_command(struct child_process *process)
* so that we don't have to.
*/

err = packet_write_fmt_gently(process->in, "get\n");
err = packet_write_fmt_gently(
process->in,
(CAP_OBJECTS_NAME "." CAP_OBJECTS__VERB_POST_NAME "\n"));
if (err)
return err;

Expand All @@ -79,6 +89,46 @@ static int gh_client__get__send_command(struct child_process *process)
return 0;
}

/*
* Send the given OID to gvfs-helper for it to fetch from the
* cache-server or main Git server using "/gvfs/objects" GET
* semantics.
*
* This ignores any queued OIDs.
*
* objects.get LF
* <hex-oid> LF
* <flush>
*
*/
static int gh_client__send__objects_get(struct child_process *process,
const struct object_id *oid)
{
int err;

/*
* We assume that all of the packet_ routines call error()
* so that we don't have to.
*/

err = packet_write_fmt_gently(
process->in,
(CAP_OBJECTS_NAME "." CAP_OBJECTS__VERB_GET1_NAME "\n"));
if (err)
return err;

err = packet_write_fmt_gently(process->in, "%s\n",
oid_to_hex(oid));
if (err)
return err;

err = packet_flush_gently(process->in);
if (err)
return err;

return 0;
}

/*
* Verify that the pathname found in the "odb" response line matches
* what we requested.
Expand Down Expand Up @@ -148,7 +198,7 @@ static void gh_client__update_packed_git(const char *line)
}

/*
* We expect:
* Both CAP_OBJECTS verbs return the same format response:
*
* <odb>
* <data>*
Expand Down Expand Up @@ -179,7 +229,7 @@ static void gh_client__update_packed_git(const char *line)
* grouped with a queued request for a blob. The tree-walk *might* be
* able to continue and let the 404 blob be handled later.
*/
static int gh_client__get__receive_response(
static int gh_client__objects__receive_response(
struct child_process *process,
enum gh_client__created *p_ghc,
int *p_nr_loose, int *p_nr_packfile)
Expand Down Expand Up @@ -259,17 +309,12 @@ static void gh_client__choose_odb(void)
}
}

static int gh_client__get(enum gh_client__created *p_ghc)
static struct gh_server__process *gh_client__find_long_running_process(
unsigned int cap_needed)
{
struct gh_server__process *entry;
struct child_process *process;
struct argv_array argv = ARGV_ARRAY_INIT;
struct strbuf quoted = STRBUF_INIT;
int nr_loose = 0;
int nr_packfile = 0;
int err = 0;

trace2_region_enter("gh-client", "get", the_repository);

gh_client__choose_odb();

Expand All @@ -285,6 +330,11 @@ static int gh_client__get(enum gh_client__created *p_ghc)

sq_quote_argv_pretty(&quoted, argv.argv);

/*
* Find an existing long-running process with the above command
* line -or- create a new long-running process for this and
* subsequent 'get' requests.
*/
if (!gh_server__subprocess_map_initialized) {
gh_server__subprocess_map_initialized = 1;
hashmap_init(&gh_server__subprocess_map,
Expand All @@ -298,70 +348,24 @@ static int gh_client__get(enum gh_client__created *p_ghc)
entry = xmalloc(sizeof(*entry));
entry->supported_capabilities = 0;

err = subprocess_start_argv(
&gh_server__subprocess_map, &entry->subprocess, 1,
&argv, gh_client__start_fn);
if (err) {
free(entry);
goto leave_region;
}
if (subprocess_start_argv(&gh_server__subprocess_map,
&entry->subprocess, 1,
&argv, gh_client__start_fn))
FREE_AND_NULL(entry);
}

process = &entry->subprocess.process;

if (!(CAP_GET & entry->supported_capabilities)) {
error("gvfs-helper: does not support GET");
if (entry &&
(entry->supported_capabilities & cap_needed) != cap_needed) {
error("gvfs-helper: does not support needed capabilities");
subprocess_stop(&gh_server__subprocess_map,
(struct subprocess_entry *)entry);
free(entry);
err = -1;
goto leave_region;
FREE_AND_NULL(entry);
}

sigchain_push(SIGPIPE, SIG_IGN);

err = gh_client__get__send_command(process);
if (!err)
err = gh_client__get__receive_response(process, p_ghc,
&nr_loose, &nr_packfile);

sigchain_pop(SIGPIPE);

if (err) {
subprocess_stop(&gh_server__subprocess_map,
(struct subprocess_entry *)entry);
free(entry);
}

leave_region:
argv_array_clear(&argv);
strbuf_release(&quoted);

trace2_data_intmax("gh-client", the_repository,
"get/immediate", gh_client__includes_immediate);

trace2_data_intmax("gh-client", the_repository,
"get/nr_objects", gh_client__oidset_count);

if (nr_loose)
trace2_data_intmax("gh-client", the_repository,
"get/nr_loose", nr_loose);

if (nr_packfile)
trace2_data_intmax("gh-client", the_repository,
"get/nr_packfile", nr_packfile);

if (err)
trace2_data_intmax("gh-client", the_repository,
"get/error", err);

trace2_region_leave("gh-client", "get", the_repository);

oidset_clear(&gh_client__oidset_queued);
gh_client__oidset_count = 0;
gh_client__includes_immediate = 0;

return err;
return entry;
}

void gh_client__queue_oid(const struct object_id *oid)
Expand All @@ -388,28 +392,97 @@ void gh_client__queue_oid_array(const struct object_id *oids, int oid_nr)
gh_client__queue_oid(&oids[k]);
}

/*
* Bulk fetch all of the queued OIDs in the OIDSET.
*/
int gh_client__drain_queue(enum gh_client__created *p_ghc)
{
struct gh_server__process *entry;
struct child_process *process;
int nr_loose = 0;
int nr_packfile = 0;
int err = 0;

*p_ghc = GHC__CREATED__NOTHING;

if (!gh_client__oidset_count)
return 0;

return gh_client__get(p_ghc);
entry = gh_client__find_long_running_process(CAP_OBJECTS);
if (!entry)
return -1;

trace2_region_enter("gh-client", "objects/post", the_repository);

process = &entry->subprocess.process;

sigchain_push(SIGPIPE, SIG_IGN);

err = gh_client__send__objects_post(process);
if (!err)
err = gh_client__objects__receive_response(
process, p_ghc, &nr_loose, &nr_packfile);

sigchain_pop(SIGPIPE);

if (err) {
subprocess_stop(&gh_server__subprocess_map,
(struct subprocess_entry *)entry);
FREE_AND_NULL(entry);
}

trace2_data_intmax("gh-client", the_repository,
"objects/post/nr_objects", gh_client__oidset_count);
trace2_region_leave("gh-client", "objects/post", the_repository);

oidset_clear(&gh_client__oidset_queued);
gh_client__oidset_count = 0;

return err;
}

/*
* Get exactly 1 object immediately.
* Ignore any queued objects.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we are assuming that any queued objects will get a flush request eventually? That sounds reasonable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've split the queued and immediate usage now. The dry-run/pre-scan loops already handle the queue and drain (for missing blobs usually). The main difference now is that any missing trees (or commits) during those loops will be immediately fetched in isolation, but the queue will remain.

*/
int gh_client__get_immediate(const struct object_id *oid,
enum gh_client__created *p_ghc)
{
gh_client__includes_immediate = 1;
struct gh_server__process *entry;
struct child_process *process;
int nr_loose = 0;
int nr_packfile = 0;
int err = 0;

// TODO consider removing this trace2. it is useful for interactive
// TODO debugging, but may generate way too much noise for a data
// TODO event.
trace2_printf("gh_client__get_immediate: %s", oid_to_hex(oid));

if (!oidset_insert(&gh_client__oidset_queued, oid))
gh_client__oidset_count++;
entry = gh_client__find_long_running_process(CAP_OBJECTS);
if (!entry)
return -1;

trace2_region_enter("gh-client", "objects/get", the_repository);

return gh_client__drain_queue(p_ghc);
process = &entry->subprocess.process;

sigchain_push(SIGPIPE, SIG_IGN);

err = gh_client__send__objects_get(process, oid);
if (!err)
err = gh_client__objects__receive_response(
process, p_ghc, &nr_loose, &nr_packfile);

sigchain_pop(SIGPIPE);

if (err) {
subprocess_stop(&gh_server__subprocess_map,
(struct subprocess_entry *)entry);
FREE_AND_NULL(entry);
}

trace2_region_leave("gh-client", "objects/get", the_repository);

return err;
}
26 changes: 16 additions & 10 deletions gvfs-helper-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ enum gh_client__created {
};

/*
* Ask `gvfs-helper server` to immediately fetch an object.
* Wait for the response.
* Ask `gvfs-helper server` to immediately fetch a single object
* using "/gvfs/objects" GET semantics.
*
* This may also fetch any queued (non-immediate) objects and
* so may create one or more loose objects and/or packfiles.
* It is undefined whether the requested OID will be loose or
* in a packfile.
* A long-running background process is used to make subsequent
* requests more efficient.
*
* A loose object will be created in the shared-cache ODB and
* in-memory cache updated.
*/
int gh_client__get_immediate(const struct object_id *oid,
enum gh_client__created *p_ghc);
Expand All @@ -47,16 +48,21 @@ int gh_client__get_immediate(const struct object_id *oid,
* Queue this OID for a future fetch using `gvfs-helper service`.
* It does not wait.
*
* The GHC layer is free to process this queue in any way it wants,
* including individual fetches, bulk fetches, and batching. And
* it may add queued objects to immediate requests.
*
* Callers should not rely on the queued object being on disk until
* the queue has been drained.
*/
void gh_client__queue_oid(const struct object_id *oid);
void gh_client__queue_oid_array(const struct object_id *oids, int oid_nr);

/*
* Ask `gvfs-helper server` to fetch the set of queued OIDs using
* "/gvfs/objects" POST semantics.
*
* A long-running background process is used to subsequent requests
* more efficient.
*
* One or more packfiles will be created in the shared-cache ODB.
*/
int gh_client__drain_queue(enum gh_client__created *p_ghc);

#endif /* GVFS_HELPER_CLIENT_H */
Loading