Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/aws/http/private/h1_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,16 @@ struct aws_h1_connection {
/* If non-zero, reason to immediately reject new streams. (ex: closing) */
int new_stream_error_code;

/* If true, user has called connection_close() or stream_cancel(),
* but the cross_thread_work_task hasn't processed it yet */
bool shutdown_requested;
int shutdown_requested_error_code;

/* See `cross_thread_work_task` */
bool is_cross_thread_work_task_scheduled : 1;

/* For checking status from outside the event-loop thread. */
bool is_open : 1;

} synced_data;
};

Expand Down
71 changes: 56 additions & 15 deletions source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ void aws_h1_connection_unlock_synced_data(struct aws_h1_connection *connection)
* - Channel is shutting down in the read direction.
* - Channel is shutting down in the write direction.
* - An error occurs.
* - User wishes to close the connection (this is the only case where the function may run off-thread).
*/
static void s_stop(
struct aws_h1_connection *connection,
Expand All @@ -139,15 +138,14 @@ static void s_stop(
bool schedule_shutdown,
int error_code) {

AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */

if (stop_reading) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
connection->thread_data.is_reading_stopped = true;
}

if (stop_writing) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
connection->thread_data.is_writing_stopped = true;
}
{ /* BEGIN CRITICAL SECTION */
Expand All @@ -169,6 +167,11 @@ static void s_stop(
aws_error_name(error_code));

aws_channel_shutdown(connection->base.channel_slot->channel, error_code);
if (stop_reading) {
/* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the TLS
* handler. */
aws_channel_slot_increment_read_window(connection->base.channel_slot, SIZE_MAX);
}
}
}

Expand All @@ -189,14 +192,45 @@ static void s_shutdown_due_to_error(struct aws_h1_connection *connection, int er
s_stop(connection, true /*stop_reading*/, true /*stop_writing*/, true /*schedule_shutdown*/, error_code);
}

/**
* Helper to shutdown the connection from non-channel thread. (User wishes to close the connection)
**/
static void s_shutdown_from_off_thread(struct aws_h1_connection *connection, int error_code) {
bool should_schedule_task = false;
{ /* BEGIN CRITICAL SECTION */
aws_h1_connection_lock_synced_data(connection);
if (!connection->synced_data.is_cross_thread_work_task_scheduled) {
connection->synced_data.is_cross_thread_work_task_scheduled = true;
should_schedule_task = true;
}
if (!connection->synced_data.shutdown_requested) {
connection->synced_data.shutdown_requested = true;
connection->synced_data.shutdown_requested_error_code = error_code;
}
/* Connection has shutdown, new streams should not be allowed. */
connection->synced_data.is_open = false;
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
aws_h1_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */

if (should_schedule_task) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION, "id=%p: Scheduling connection cross-thread work task.", (void *)&connection->base);
aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
} else {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Connection cross-thread work task was already scheduled",
(void *)&connection->base);
}
}

/**
* Public function for closing connection.
*/
static void s_connection_close(struct aws_http_connection *connection_base) {
struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);

/* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
s_shutdown_from_off_thread(connection, AWS_ERROR_SUCCESS);
}

static void s_connection_stop_new_request(struct aws_http_connection *connection_base) {
Expand Down Expand Up @@ -412,8 +446,7 @@ void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code) {
(void *)stream,
error_code,
aws_error_name(error_code));

s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, error_code);
s_shutdown_from_off_thread(connection, error_code);
}

struct aws_http_stream *s_make_request(
Expand Down Expand Up @@ -495,10 +528,17 @@ static void s_cross_thread_work_task(struct aws_channel_task *channel_task, void
bool has_new_client_streams = !aws_linked_list_empty(&connection->synced_data.new_client_stream_list);
aws_linked_list_move_all_back(
&connection->thread_data.stream_list, &connection->synced_data.new_client_stream_list);
bool shutdown_requested = connection->synced_data.shutdown_requested;
int shutdown_error = connection->synced_data.shutdown_requested_error_code;
connection->synced_data.shutdown_requested = false;
connection->synced_data.shutdown_requested_error_code = 0;

aws_h1_connection_unlock_synced_data(connection);
/* END CRITICAL SECTION */

if (shutdown_requested) {
s_stop(connection, true /*stop_reading*/, true /*stop_writing*/, true /*schedule_shutdown*/, shutdown_error);
}
/* Kick off outgoing-stream task if necessary */
if (has_new_client_streams) {
aws_h1_connection_try_write_outgoing_stream(connection);
Expand Down Expand Up @@ -785,13 +825,8 @@ static void s_http_stream_response_first_byte_timeout_task(
(void *)connection_base,
response_first_byte_timeout_ms);

/* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
s_stop(
connection,
false /*stop_reading*/,
false /*stop_writing*/,
true /*schedule_shutdown*/,
AWS_ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT);
/* Shutdown the connection. */
s_shutdown_due_to_error(connection, AWS_ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT);
}

static void s_set_outgoing_message_done(struct aws_h1_stream *stream) {
Expand Down Expand Up @@ -1804,6 +1839,12 @@ static int s_handler_process_read_message(

AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION, "id=%p: Incoming message of size %zu.", (void *)&connection->base, message_size);
if (connection->thread_data.is_reading_stopped) {
/* Read has stopped, ignore the data, shutdown the channel incase it has not started yet. */
aws_mem_release(message->allocator, message); /* Release the message as we return success. */
s_shutdown_due_to_error(connection, AWS_ERROR_HTTP_CONNECTION_CLOSED);
return AWS_OP_SUCCESS;
}

/* Shrink connection window by amount of data received. See comments at variable's
* declaration site on why we use this instead of the official `aws_channel_slot.window_size`. */
Expand Down
5 changes: 5 additions & 0 deletions source/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,13 @@ static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, en

s_unlock_synced_data(websocket);
/* END CRITICAL SECTION */
websocket->thread_data.is_reading_stopped = true;
websocket->thread_data.is_writing_stopped = true;

aws_channel_shutdown(websocket->channel_slot->channel, error_code);
/* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the upstream
* handler. */
aws_channel_slot_increment_read_window(websocket->channel_slot, SIZE_MAX);
}

/* Tell the channel to shut down. It is safe to call this multiple times.
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ add_test_case(strutil_is_http_pseudo_header_name)

add_net_test_case(tls_download_medium_file_h1)
add_net_test_case(tls_download_medium_file_h2)
add_net_test_case(test_tls_download_shutdown_with_window_size_0)

add_test_case(websocket_decoder_sanity_check)
add_test_case(websocket_decoder_simplest_frame)
Expand Down
3 changes: 0 additions & 3 deletions tests/test_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,6 @@ AWS_TEST_CASE(connection_setup_shutdown, s_test_connection_setup_shutdown);
static int s_test_connection_setup_shutdown_tls(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

#ifdef __APPLE__ /* Something is wrong with APPLE */
return AWS_OP_SUCCESS;
#endif
struct tester_options options = {
.alloc = allocator,
.tls = true,
Expand Down
Loading