Skip to content

Commit

Permalink
daemon/defer: defer stream EOF if data are deferred
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Nov 14, 2024
1 parent 471bb7a commit 8658217
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
54 changes: 48 additions & 6 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
}

VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix);
}

/// Determine priority of the request in [-1, QUEUES_CNT - 1].
Expand Down Expand Up @@ -299,13 +299,14 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
static inline void process_single_deferred(void)
{
struct protolayer_iter_ctx *ctx = pop_query();
if (ctx == NULL) return;
if (kr_fails_assert(ctx)) return;

defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
phase_accounting = true;

struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
struct session2 *session = ctx->session;
uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;

VERBOSE_LOG(" %s POP from %d after %4.3f ms\n",
Expand All @@ -332,17 +333,34 @@ static inline void process_single_deferred(void)
return;
}

bool eof = false;
if (ctx->session->stream) {
kr_assert(queue_head(sdata->queue) == ctx);
queue_pop(sdata->queue);
while ((queue_len(sdata->queue) > 0) && (queue_head(sdata->queue) == NULL)) { // EOF event
eof = true;
queue_pop(sdata->queue);
}
if (queue_len(sdata->queue) > 0) {
VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority);
push_query(queue_head(sdata->queue), priority, true);
}
}

if (eof) {
// Keep session alive even if it is somehow force-closed during continuation.
// TODO Is it needed?
session->ref_count++;
}

VERBOSE_LOG(" CONTINUE\n");
protolayer_continue(ctx);

if (eof) {
VERBOSE_LOG(" CONTINUE EOF event\n");
session2_event_after(session, PROTOLAYER_TYPE_DEFER, PROTOLAYER_EVENT_EOF, NULL);
session2_unhandle(session); // decrease ref_count
}
}

/// Break expired requests at the beginning of queues, uses current stamp.
Expand Down Expand Up @@ -371,10 +389,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
if (!defer)
return protolayer_continue(ctx);

if (ctx->session->outgoing)
if (!defer || ctx->session->outgoing)
return protolayer_continue(ctx);

defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
Expand Down Expand Up @@ -413,6 +428,32 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
return protolayer_async();
}

/// Unwrap event: EOF event may be deferred here, other events pass synchronously.
static enum protolayer_event_cb_result pl_defer_event_unwrap(
enum protolayer_event_type event, void **baton,
struct session2 *session, void *sess_data)
{
if (!defer || session->outgoing)
return PROTOLAYER_EVENT_PROPAGATE;

struct pl_defer_sess_data *sdata = sess_data;
if ((event == PROTOLAYER_EVENT_EOF) && (queue_len(sdata->queue) > 0) && kr_fails_assert(*baton == NULL)) {
// defer EOF event if unprocessed data remain, baton has to be NULL
queue_push(sdata->queue, NULL);
VERBOSE_LOG(" %s event %s deferred\n",
session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
protolayer_event_name(event));
return PROTOLAYER_EVENT_CONSUME;
}

VERBOSE_LOG(" %s event %s passes through synchronously%s%s\n",
session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
protolayer_event_name(event),
queue_len(sdata->queue) > 0 ? " ahead of deferred data" : "",
*baton ? " (with baton)" : "");
return PROTOLAYER_EVENT_PROPAGATE;
}

/// Idle: continue processing deferred requests.
static void defer_queues_idle(uv_idle_t *handle)
{
Expand Down Expand Up @@ -535,5 +576,6 @@ static void defer_protolayers_init(void)
.sess_size = sizeof(struct pl_defer_sess_data),
.sess_init = pl_defer_sess_init,
.unwrap = pl_defer_unwrap,
.event_unwrap = pl_defer_event_unwrap,
};
}
2 changes: 1 addition & 1 deletion daemon/session2.c
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ static int session2_submit(
{
if (session->closing)
return kr_error(ECANCELED);
if (session->ref_count >= INT_MAX)
if (session->ref_count >= INT_MAX - 1)
return kr_error(ETOOMANYREFS);
if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
return kr_error(EFAULT);
Expand Down

0 comments on commit 8658217

Please sign in to comment.