Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/aws/http/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
enum aws_http_errors {
AWS_ERROR_HTTP_UNKNOWN = 0x0800,
AWS_ERROR_HTTP_PARSE,
AWS_ERROR_HTTP_USER_CALLBACK_EXIT,
AWS_ERROR_HTTP_INVALID_PARSE_STATE,
AWS_ERROR_HTTP_CONNECTION_CLOSED,
AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL,
Expand Down
1 change: 1 addition & 0 deletions include/aws/http/private/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct aws_http_connection_vtable {
struct aws_channel_handler_vtable channel_handler_vtable;

struct aws_http_stream *(*new_client_request_stream)(const struct aws_http_request_options *options);
void (*close)(struct aws_http_connection *connection);
};

/**
Expand Down
14 changes: 5 additions & 9 deletions include/aws/http/private/decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,25 @@ struct aws_http_decoded_header {
/**
* Called from `aws_http_decode` when an http header has been received.
* All pointers are strictly *read only*; any data that needs to persist must be copied out into user-owned memory.
* Return true to keep decoding, or false to immediately stop decoding and place the decoder in an invalid state, where
* the only valid operation is to destroy the decoder with `aws_http_decoder_destroy`.
*/
typedef bool(aws_http_decoder_on_header_fn)(const struct aws_http_decoded_header *header, void *user_data);
typedef int(aws_http_decoder_on_header_fn)(const struct aws_http_decoded_header *header, void *user_data);

/**
* Called from `aws_http_decode` when a portion of the http body has been received.
* `finished` is true if this is the last section of the http body, and false if more body data is yet to be received.
* All pointers are strictly *read only*; any data that needs to persist must be copied out into user-owned memory.
* Return true to keep decoding, or false to immediately stop decoding and place the decoder in an invalid state, where
* the only valid operation is to destroy the decoder with `aws_http_decoder_destroy`.
*/
typedef bool(aws_http_decoder_on_body_fn)(const struct aws_byte_cursor *data, bool finished, void *user_data);
typedef int(aws_http_decoder_on_body_fn)(const struct aws_byte_cursor *data, bool finished, void *user_data);

typedef void(aws_http_decoder_on_request_fn)(
typedef int(aws_http_decoder_on_request_fn)(
enum aws_http_method method_enum,
const struct aws_byte_cursor *method_str,
const struct aws_byte_cursor *uri,
void *user_data);

typedef void(aws_http_decoder_on_response_fn)(int status_code, void *user_data);
typedef int(aws_http_decoder_on_response_fn)(int status_code, void *user_data);

typedef void(aws_http_decoder_done_fn)(void *user_data);
typedef int(aws_http_decoder_done_fn)(void *user_data);

struct aws_http_decoder_vtable {
aws_http_decoder_on_header_fn *on_header;
Expand Down
3 changes: 2 additions & 1 deletion source/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ static struct aws_http_connection *s_connection_new(
}

void aws_http_connection_close(struct aws_http_connection *connection) {
aws_channel_shutdown(connection->channel_slot->channel, AWS_ERROR_SUCCESS);
assert(connection);
connection->vtable->close(connection);
}

void aws_http_connection_release(struct aws_http_connection *connection) {
Expand Down
118 changes: 91 additions & 27 deletions source/connection_h1.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,18 @@ static size_t s_handler_initial_window_size(struct aws_channel_handler *handler)
static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
static void s_handler_destroy(struct aws_channel_handler *handler);
static struct aws_http_stream *s_new_client_request_stream(const struct aws_http_request_options *options);
static void s_connection_close(struct aws_http_connection *connection_base);
static void s_stream_destroy(struct aws_http_stream *stream_base);
static void s_stream_update_window(struct aws_http_stream *stream, size_t increment_size);
static void s_decoder_on_request(
static int s_decoder_on_request(
enum aws_http_method method_enum,
const struct aws_byte_cursor *method_str,
const struct aws_byte_cursor *uri,
void *user_data);
static void s_decoder_on_response(int status_code, void *user_data);
static bool s_decoder_on_header(const struct aws_http_decoded_header *header, void *user_data);
static bool s_decoder_on_body(const struct aws_byte_cursor *data, bool finished, void *user_data);
static void s_decoder_on_done(void *user_data);
static int s_decoder_on_response(int status_code, void *user_data);
static int s_decoder_on_header(const struct aws_http_decoded_header *header, void *user_data);
static int s_decoder_on_body(const struct aws_byte_cursor *data, bool finished, void *user_data);
static int s_decoder_on_done(void *user_data);

static struct aws_http_connection_vtable s_connection_vtable = {
.channel_handler_vtable =
Expand All @@ -84,6 +85,7 @@ static struct aws_http_connection_vtable s_connection_vtable = {
},

.new_client_request_stream = s_new_client_request_stream,
.close = s_connection_close,
};

static const struct aws_http_stream_vtable s_stream_vtable = {
Expand All @@ -108,6 +110,9 @@ struct h1_connection {
/* Single task used for issuing window updates from off-thread */
struct aws_channel_task window_update_task;

/* Task used once during shutdown. */
struct aws_channel_task shutdown_delay_task;

/* Only the event-loop thread may touch this data */
struct {
/* List of streams being worked on. */
Expand Down Expand Up @@ -183,7 +188,8 @@ struct h1_stream {
};

/**
* Called when something goes wrong internally which should result in the channel shutting down.
* Internal function for shutting down the connection.
* If connection is already shutting down, this call has no effect.
*/
static void s_shutdown_connection(struct h1_connection *connection, int error_code) {
assert(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
Expand All @@ -209,8 +215,36 @@ static void s_shutdown_connection(struct h1_connection *connection, int error_co
connection->thread_data.is_shutting_down = true;
connection->thread_data.shutdown_error_code = error_code;

/* Delay the call to aws_channel_shutdown().
* This ensures that a user calling aws_http_connection_close() won't have completion callbacks
* firing before aws_http_connection_close() has even returned. */
aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->shutdown_delay_task);
}
}

static void s_shutdown_delay_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
(void)task;
struct h1_connection *connection = arg;

if (status == AWS_TASK_STATUS_RUN_READY) {
/* If channel is already shutting down, this call has no effect */
aws_channel_shutdown(connection->base.channel_slot->channel, error_code);
aws_channel_shutdown(connection->base.channel_slot->channel, connection->thread_data.shutdown_error_code);
}
}

/**
* Public function for closing connection.
* If connection is already shutting down, this call has no effect.
*/
static void s_connection_close(struct aws_http_connection *connection_base) {
struct h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct h1_connection, base);

if (aws_channel_thread_is_callers_thread(connection_base->channel_slot->channel)) {
/* Invoke internal function so connection ceases work immediately */
s_shutdown_connection(connection, AWS_ERROR_SUCCESS);
} else {
/* Not on thread, so tell channel to shut down, which will result in connection shutting down. */
aws_channel_shutdown(connection_base->channel_slot->channel, AWS_ERROR_SUCCESS);
}
}

Expand Down Expand Up @@ -841,7 +875,7 @@ static void s_outgoing_stream_task(struct aws_channel_task *task, void *arg, enu
s_shutdown_connection(connection, aws_last_error());
}

static void s_decoder_on_request(
static int s_decoder_on_request(
enum aws_http_method method_enum,
const struct aws_byte_cursor *method_str,
const struct aws_byte_cursor *uri,
Expand Down Expand Up @@ -886,17 +920,16 @@ static void s_decoder_on_request(

incoming_stream->base.incoming_request_method = method_enum;

return;
error:

/* TODO: all decoder callbacks should be able to stop decoder, so we don't keep churning in the case of errors.
* There's some fishy stuff where callbacks assume current_incoming_stream is a valid ptr, but that's only the case
* while things are working */
/* No user callbacks, so we're not checking for shutdown */
return AWS_OP_SUCCESS;

s_shutdown_connection(connection, aws_last_error());
error:
err = aws_last_error();
s_shutdown_connection(connection, err);
return aws_raise_error(err);
}

static void s_decoder_on_response(int status_code, void *user_data) {
static int s_decoder_on_response(int status_code, void *user_data) {
struct h1_connection *connection = user_data;

AWS_LOGF_TRACE(
Expand All @@ -907,9 +940,12 @@ static void s_decoder_on_response(int status_code, void *user_data) {
aws_http_status_text(status_code));

connection->thread_data.incoming_stream->base.incoming_response_status = status_code;

/* No user callbacks, so we're not checking for shutdown */
return AWS_OP_SUCCESS;
}

static bool s_decoder_on_header(const struct aws_http_decoded_header *header, void *user_data) {
static int s_decoder_on_header(const struct aws_http_decoded_header *header, void *user_data) {
struct h1_connection *connection = user_data;
struct h1_stream *incoming_stream = connection->thread_data.incoming_stream;

Expand All @@ -920,8 +956,6 @@ static bool s_decoder_on_header(const struct aws_http_decoded_header *header, vo
AWS_BYTE_CURSOR_PRI(header->name_data),
AWS_BYTE_CURSOR_PRI(header->value_data));

/* TODO: worth buffering up headers and delivering all at once? In clumps? */

/* TODO? how to support trailing headers? distinct cb? invoke same cb again? */

if (incoming_stream->base.on_incoming_headers) {
Expand All @@ -933,13 +967,18 @@ static bool s_decoder_on_header(const struct aws_http_decoded_header *header, vo
incoming_stream->base.on_incoming_headers(&incoming_stream->base, &deliver, 1, incoming_stream->base.user_data);
}

return true;
/* Stop decoding if user callback shut down the connection. */
if (connection->thread_data.is_shutting_down) {
return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
}

return AWS_OP_SUCCESS;
}

static void s_mark_head_done(struct h1_stream *incoming_stream) {
static int s_mark_head_done(struct h1_stream *incoming_stream) {
/* Bail out if we've already done this */
if (incoming_stream->is_incoming_head_done) {
return;
return AWS_OP_SUCCESS;
}

incoming_stream->is_incoming_head_done = true;
Expand All @@ -964,16 +1003,26 @@ static void s_mark_head_done(struct h1_stream *incoming_stream) {
incoming_stream->base.on_incoming_header_block_done(
&incoming_stream->base, has_incoming_body, incoming_stream->base.user_data);
}

/* Stop decoding if user callback shut down the connection. */
if (connection->thread_data.is_shutting_down) {
return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
}

return AWS_OP_SUCCESS;
}

static bool s_decoder_on_body(const struct aws_byte_cursor *data, bool finished, void *user_data) {
static int s_decoder_on_body(const struct aws_byte_cursor *data, bool finished, void *user_data) {
(void)finished;

struct h1_connection *connection = user_data;
struct h1_stream *incoming_stream = connection->thread_data.incoming_stream;
assert(incoming_stream);

s_mark_head_done(incoming_stream);
int err = s_mark_head_done(incoming_stream);
if (err) {
return AWS_OP_ERR;
}

AWS_LOGF_TRACE(
AWS_LS_HTTP_STREAM, "id=%p: Incoming body: %zu bytes received.", (void *)&incoming_stream->base, data->len);
Expand All @@ -998,16 +1047,24 @@ static bool s_decoder_on_body(const struct aws_byte_cursor *data, bool finished,
}
}

return true;
/* Stop decoding if user callback shut down the connection. */
if (connection->thread_data.is_shutting_down) {
return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
}

return AWS_OP_SUCCESS;
}

static void s_decoder_on_done(void *user_data) {
static int s_decoder_on_done(void *user_data) {
struct h1_connection *connection = user_data;
struct h1_stream *incoming_stream = connection->thread_data.incoming_stream;
assert(incoming_stream);

/* Ensure head was marked done */
s_mark_head_done(incoming_stream);
int err = s_mark_head_done(incoming_stream);
if (err) {
return AWS_OP_ERR;
}

incoming_stream->is_incoming_message_done = true;

Expand All @@ -1018,6 +1075,12 @@ static void s_decoder_on_done(void *user_data) {

s_update_incoming_stream_ptr(connection);
}

/* Report success even if user's on_complete() callback shuts down on the connection.
* We don't want it to look like something went wrong while decoding.
* The decode() function returns after each message completes,
* and we won't call decode() again if the connection has been shut down */
return AWS_OP_SUCCESS;
}

/* Common new() logic for server & client */
Expand All @@ -1040,6 +1103,7 @@ static struct h1_connection *s_connection_new(struct aws_allocator *alloc) {

aws_channel_task_init(&connection->outgoing_stream_task, s_outgoing_stream_task, connection);
aws_channel_task_init(&connection->window_update_task, s_update_window_task, connection);
aws_channel_task_init(&connection->shutdown_delay_task, s_shutdown_delay_task, connection);
aws_linked_list_init(&connection->thread_data.stream_list);

int err = aws_mutex_init(&connection->synced_data.lock);
Expand Down
Loading