Skip to content

Commit

Permalink
daemon/session2: add half-closed TCP connection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Nov 14, 2024
1 parent 1ea3f10 commit 471bb7a
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 21 deletions.
12 changes: 12 additions & 0 deletions daemon/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,18 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
return;
}

// allow deferring EOF for incoming connections to send answer even if half-closed
if (!s->outgoing && (nread == UV_EOF)) {
if (kr_log_is_debug(IO, NULL)) {
struct sockaddr *peer = session2_get_peer(s);
char *peer_str = kr_straddr(peer);
kr_log_debug(IO, "=> connection to '%s' half-closed by peer (EOF)\n",
peer_str ? peer_str : "");
}
session2_event(s, PROTOLAYER_EVENT_EOF, NULL);
return;
}

if (nread < 0 || !buf->base) {
if (kr_log_is_debug(IO, NULL)) {
struct sockaddr *peer = session2_get_peer(s);
Expand Down
39 changes: 26 additions & 13 deletions daemon/session2.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue)
return false;
}

static inline ssize_t session2_get_protocol(
struct session2 *s, enum protolayer_type protocol)
{
const struct protolayer_grp *grp = &protolayer_grps[s->proto];
for (ssize_t i = 0; i < grp->num_layers; i++) {
enum protolayer_type found = grp->layers[i];
if (protocol == found)
return i;
}

return -1;
}

/** Gets layer-specific session data for the layer with the specified index
* from the manager. */
Expand All @@ -333,6 +345,14 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx)
return protolayer_sess_data_get(ctx->session, ctx->layer_ix);
}

void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol) {
ssize_t layer_ix = session2_get_protocol(s, protocol);
if (layer_ix < 0)
return NULL;

return protolayer_sess_data_get(s, layer_ix);
}

/** Gets layer-specific iteration data for the layer with the specified index
* from the context. */
static inline struct protolayer_data *protolayer_iter_data_get(
Expand All @@ -358,19 +378,6 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx)
return protolayer_iter_data_get(ctx, ctx->layer_ix);
}

static inline ssize_t session2_get_protocol(
struct session2 *s, enum protolayer_type protocol)
{
const struct protolayer_grp *grp = &protolayer_grps[s->proto];
for (ssize_t i = 0; i < grp->num_layers; i++) {
enum protolayer_type found = grp->layers[i];
if (protocol == found)
return i;
}

return -1;
}

static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx)
{
unsigned int last_ix = (ctx->direction == PROTOLAYER_UNWRAP)
Expand Down Expand Up @@ -1684,6 +1691,12 @@ static int session2_transport_event(struct session2 *s,
if (s->closing)
return kr_ok();

if (event == PROTOLAYER_EVENT_EOF) {
// no layer wanted to retain TCP half-closed state
session2_force_close(s);
return kr_ok();
}

bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE ||
event == PROTOLAYER_EVENT_FORCE_CLOSE);
if (is_close_event) {
Expand Down
6 changes: 6 additions & 0 deletions daemon/session2.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
XX(MALFORMED) \
/** Signal that a connection has ended. */\
XX(DISCONNECT) \
/** Signal EOF from peer (e.g. half-closed TCP connection). */\
XX(EOF) \
/** Failed task send - update stats. */\
XX(STATS_SEND_ERR) \
/** Outgoing query submission - update stats. */\
Expand Down Expand Up @@ -535,6 +537,10 @@ size_t protolayer_queue_count_payload(const protolayer_iter_ctx_queue_t *queue);
* queue iterators, as it does not need to iterate through the whole queue. */
bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue);

/** Gets layer-specific session data for the specified protocol layer.
* Returns NULL if the layer is not present in the session. */
void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol);

/** Gets layer-specific session data for the last processed layer.
* To be used after returning from its callback for async continuation but before calling protolayer_continue. */
void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx);
Expand Down
6 changes: 6 additions & 0 deletions daemon/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,12 @@ static enum protolayer_event_cb_result pl_tls_event_unwrap(
return PROTOLAYER_EVENT_PROPAGATE;
}

if (event == PROTOLAYER_EVENT_EOF) {
// TCP half-closed state not allowed
session2_force_close(s);
return PROTOLAYER_EVENT_CONSUME;
}

if (tls->client_side) {
if (event == PROTOLAYER_EVENT_CONNECT)
return pl_tls_client_connect_start(tls, s);
Expand Down
41 changes: 33 additions & 8 deletions daemon/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ struct qr_task
qr_task_free((task)); \
} while (0)

struct pl_dns_stream_sess_data {
struct protolayer_data h;
bool single : 1; /**< True: Stream only allows a single packet */
bool produced : 1; /**< True: At least one packet has been produced */
bool connected : 1; /**< True: The stream is connected */
bool half_closed : 1; /**< True: EOF was received, the stream is half-closed */
};

/* Forward decls */
static void qr_task_free(struct qr_task *task);
static int qr_task_step(struct qr_task *task,
Expand All @@ -122,7 +130,6 @@ static struct session2* worker_find_tcp_waiting(const struct sockaddr* addr);

static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt);


struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
struct worker_ctx *the_worker = NULL;

Expand Down Expand Up @@ -995,6 +1002,18 @@ static int qr_task_finalize(struct qr_task *task, int state)
session2_close(source_session);
}

if (source_session->stream && !source_session->closing) {
struct pl_dns_stream_sess_data *stream =
protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_MULTI_STREAM);
if (!stream)
stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_UNSIZED_STREAM);
if (!stream)
stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_SINGLE_STREAM);
if (stream && stream->half_closed) {
session2_force_close(source_session);
}
}

qr_task_unref(task);

if (ret != kr_ok() || state != KR_STATE_DONE)
Expand Down Expand Up @@ -1804,13 +1823,6 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
}
}

struct pl_dns_stream_sess_data {
struct protolayer_data h;
bool single : 1; /**< True: Stream only allows a single packet */
bool produced : 1; /**< True: At least one packet has been produced */
bool connected : 1; /**< True: The stream is connected */
};

static int pl_dns_stream_sess_init(struct session2 *session,
void *sess_data, void *param)
{
Expand Down Expand Up @@ -2015,6 +2027,16 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected(
return PROTOLAYER_EVENT_PROPAGATE;
}

static enum protolayer_event_cb_result pl_dns_stream_eof(
struct session2 *session, struct pl_dns_stream_sess_data *stream)
{
if (!session2_is_empty(session)) {
stream->half_closed = true;
return PROTOLAYER_EVENT_CONSUME;
}
return PROTOLAYER_EVENT_PROPAGATE;
}

static enum protolayer_event_cb_result pl_dns_stream_event_unwrap(
enum protolayer_event_type event, void **baton,
struct session2 *session, void *sess_data)
Expand Down Expand Up @@ -2046,6 +2068,9 @@ static enum protolayer_event_cb_result pl_dns_stream_event_unwrap(
case PROTOLAYER_EVENT_FORCE_CLOSE:
return pl_dns_stream_disconnected(session, stream);

case PROTOLAYER_EVENT_EOF:
return pl_dns_stream_eof(session, stream);

default:
return PROTOLAYER_EVENT_PROPAGATE;
}
Expand Down

0 comments on commit 471bb7a

Please sign in to comment.