@@ -142,7 +142,17 @@ static void s_stop(
142142 AWS_ASSERT (stop_reading || stop_writing || schedule_shutdown ); /* You are required to stop at least 1 thing */
143143
144144 if (stop_reading ) {
145- connection -> thread_data .is_reading_stopped = true;
145+ if (connection -> thread_data .read_state == AWS_CONNECTION_READ_OPEN ) {
146+ connection -> thread_data .read_state = AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE ;
147+ } else if (connection -> thread_data .read_state == AWS_CONNECTION_READ_SHUTTING_DOWN ) {
148+ /* Shutdown after pending */
149+ if (connection -> thread_data .pending_shutdown_error_code != 0 ) {
150+ error_code = connection -> thread_data .pending_shutdown_error_code ;
151+ }
152+ connection -> thread_data .read_state = AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE ;
153+ aws_channel_slot_on_handler_shutdown_complete (
154+ connection -> base .channel_slot , AWS_CHANNEL_DIR_READ , error_code , false);
155+ }
146156 }
147157
148158 if (stop_writing ) {
@@ -167,6 +177,7 @@ static void s_stop(
167177 aws_error_name (error_code ));
168178
169179 aws_channel_shutdown (connection -> base .channel_slot -> channel , error_code );
180+
170181 if (stop_reading ) {
171182 /* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the TLS
172183 * handler. */
@@ -324,7 +335,7 @@ static size_t s_calculate_stream_mode_desired_connection_window(struct aws_h1_co
324335static int s_update_connection_window (struct aws_h1_connection * connection ) {
325336 AWS_ASSERT (aws_channel_thread_is_callers_thread (connection -> base .channel_slot -> channel ));
326337
327- if (connection -> thread_data .is_reading_stopped ) {
338+ if (connection -> thread_data .read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE ) {
328339 return AWS_OP_SUCCESS ;
329340 }
330341
@@ -778,7 +789,7 @@ static void s_set_incoming_stream_ptr(
778789static void s_client_update_incoming_stream_ptr (struct aws_h1_connection * connection ) {
779790 struct aws_linked_list * list = & connection -> thread_data .stream_list ;
780791 struct aws_h1_stream * desired ;
781- if (connection -> thread_data .is_reading_stopped ) {
792+ if (connection -> thread_data .read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE ) {
782793 desired = NULL ;
783794 } else if (aws_linked_list_empty (list )) {
784795 desired = NULL ;
@@ -1663,7 +1674,7 @@ static void s_handler_installed(struct aws_channel_handler *handler, struct aws_
16631674static int s_try_process_next_midchannel_read_message (struct aws_h1_connection * connection , bool * out_stop_processing ) {
16641675 AWS_ASSERT (aws_channel_thread_is_callers_thread (connection -> base .channel_slot -> channel ));
16651676 AWS_ASSERT (connection -> thread_data .has_switched_protocols );
1666- AWS_ASSERT (! connection -> thread_data .is_reading_stopped );
1677+ AWS_ASSERT (connection -> thread_data .read_state != AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE );
16671678 AWS_ASSERT (!aws_linked_list_empty (& connection -> thread_data .read_buffer .messages ));
16681679
16691680 * out_stop_processing = false;
@@ -1839,7 +1850,7 @@ static int s_handler_process_read_message(
18391850
18401851 AWS_LOGF_TRACE (
18411852 AWS_LS_HTTP_CONNECTION , "id=%p: Incoming message of size %zu." , (void * )& connection -> base , message_size );
1842- if (connection -> thread_data .is_reading_stopped ) {
1853+ if (connection -> thread_data .read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE ) {
18431854 /* Read has stopped, ignore the data, shutdown the channel incase it has not started yet. */
18441855 aws_mem_release (message -> allocator , message ); /* Release the message as we return success. */
18451856 s_shutdown_due_to_error (connection , AWS_ERROR_HTTP_CONNECTION_CLOSED );
@@ -1868,7 +1879,7 @@ static int s_handler_process_read_message(
18681879}
18691880
18701881void aws_h1_connection_try_process_read_messages (struct aws_h1_connection * connection ) {
1871-
1882+ int error_code = 0 ;
18721883 /* Protect against this function being called recursively. */
18731884 if (connection -> thread_data .is_processing_read_messages ) {
18741885 return ;
@@ -1877,7 +1888,7 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne
18771888
18781889 /* Process queued messages */
18791890 while (!aws_linked_list_empty (& connection -> thread_data .read_buffer .messages )) {
1880- if (connection -> thread_data .is_reading_stopped ) {
1891+ if (connection -> thread_data .read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE ) {
18811892 AWS_LOGF_ERROR (
18821893 AWS_LS_HTTP_CONNECTION ,
18831894 "id=%p: Cannot process message because connection is shutting down." ,
@@ -1908,6 +1919,13 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne
19081919 }
19091920 }
19101921
1922+ if (connection -> thread_data .read_state == AWS_CONNECTION_READ_SHUTTING_DOWN &&
1923+ connection -> thread_data .read_buffer .pending_bytes == 0 ) {
1924+ /* Done processing the pending buffer. */
1925+ aws_raise_error (connection -> thread_data .pending_shutdown_error_code );
1926+ goto shutdown ;
1927+ }
1928+
19111929 /* Increment connection window, if necessary */
19121930 if (s_update_connection_window (connection )) {
19131931 goto shutdown ;
@@ -1917,15 +1935,25 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne
19171935 return ;
19181936
19191937shutdown :
1920- s_shutdown_due_to_error (connection , aws_last_error ());
1938+ error_code = aws_last_error ();
1939+ if (connection -> thread_data .read_state == AWS_CONNECTION_READ_SHUTTING_DOWN &&
1940+ connection -> thread_data .pending_shutdown_error_code != 0 ) {
1941+ error_code = connection -> thread_data .pending_shutdown_error_code ;
1942+ }
1943+ if (error_code == 0 ) {
1944+ /* Graceful shutdown, don't stop writing yet. */
1945+ s_stop (connection , true /*stop_reading*/ , false /*stop_writing*/ , true /*schedule_shutdown*/ , error_code );
1946+ } else {
1947+ s_shutdown_due_to_error (connection , aws_last_error ());
1948+ }
19211949}
19221950
19231951/* Try to process the next queued aws_io_message as normal HTTP data for an aws_http_stream.
19241952 * This MUST NOT be called if the connection has switched protocols and become a midchannel handler. */
19251953static int s_try_process_next_stream_read_message (struct aws_h1_connection * connection , bool * out_stop_processing ) {
19261954 AWS_ASSERT (aws_channel_thread_is_callers_thread (connection -> base .channel_slot -> channel ));
19271955 AWS_ASSERT (!connection -> thread_data .has_switched_protocols );
1928- AWS_ASSERT (! connection -> thread_data .is_reading_stopped );
1956+ AWS_ASSERT (connection -> thread_data .read_state != AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE );
19291957 AWS_ASSERT (!aws_linked_list_empty (& connection -> thread_data .read_buffer .messages ));
19301958
19311959 * out_stop_processing = false;
@@ -2122,6 +2150,31 @@ static int s_handler_increment_read_window(
21222150 return AWS_OP_SUCCESS ;
21232151}
21242152
2153+ static void s_initialize_read_delay_shutdown (struct aws_h1_connection * connection , int error_code ) {
2154+
2155+ AWS_LOGF_DEBUG (
2156+ AWS_LS_HTTP_CONNECTION ,
2157+ "id=%p: Connection still have pending data to be delivered during shutdown. Wait until downstream "
2158+ "reads the data." ,
2159+ (void * )& connection -> base );
2160+
2161+ AWS_LOGF_TRACE (
2162+ AWS_LS_HTTP_CONNECTION ,
2163+ "id=%p: Current window stats: connection=%zu, stream=%" PRIu64 " buffer=%zu/%zu" ,
2164+ (void * )& connection -> base ,
2165+ connection -> thread_data .connection_window ,
2166+ connection -> thread_data .incoming_stream ? connection -> thread_data .incoming_stream -> thread_data .stream_window
2167+ : 0 ,
2168+ connection -> thread_data .read_buffer .pending_bytes ,
2169+ connection -> thread_data .read_buffer .capacity );
2170+
2171+ /* Still have data buffered in connection, wait for it to be processed */
2172+ connection -> thread_data .read_state = AWS_CONNECTION_READ_SHUTTING_DOWN ;
2173+ connection -> thread_data .pending_shutdown_error_code = error_code ;
2174+ /* Try to process messages in queue */
2175+ aws_h1_connection_try_process_read_messages (connection );
2176+ }
2177+
21252178static int s_handler_shutdown (
21262179 struct aws_channel_handler * handler ,
21272180 struct aws_channel_slot * slot ,
@@ -2142,6 +2195,12 @@ static int s_handler_shutdown(
21422195
21432196 if (dir == AWS_CHANNEL_DIR_READ ) {
21442197 /* This call ensures that no further streams will be created or worked on. */
2198+ if (!free_scarce_resources_immediately && connection -> thread_data .read_state == AWS_CONNECTION_READ_OPEN &&
2199+ connection -> thread_data .read_buffer .pending_bytes > 0 ) {
2200+ s_initialize_read_delay_shutdown (connection , error_code );
2201+ /* Return success, and wait for the buffered data to be processed to propagate the shutdown. */
2202+ return AWS_OP_SUCCESS ;
2203+ }
21452204 s_stop (connection , true /*stop_reading*/ , false /*stop_writing*/ , false /*schedule_shutdown*/ , error_code );
21462205 } else /* dir == AWS_CHANNEL_DIR_WRITE */ {
21472206
0 commit comments