1111#include <aws/http/private/h1_stream.h>
1212#include <aws/http/private/request_response_impl.h>
1313#include <aws/http/status_code.h>
14+ #include <aws/io/event_loop.h>
1415#include <aws/io/logging.h>
1516
1617#include <inttypes.h>
@@ -535,6 +536,7 @@ static int s_aws_http1_switch_protocols(struct aws_h1_connection *connection) {
535536static void s_stream_complete (struct aws_h1_stream * stream , int error_code ) {
536537 struct aws_h1_connection * connection =
537538 AWS_CONTAINER_OF (stream -> base .owning_connection , struct aws_h1_connection , base );
539+ AWS_ASSERT (aws_channel_thread_is_callers_thread (connection -> base .channel_slot -> channel ));
538540
539541 /*
540542 * If this is the end of a successful CONNECT request, mark ourselves as pass-through since the proxy layer
@@ -547,6 +549,14 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
547549 }
548550 }
549551
552+ if (stream -> base .client_data && stream -> base .client_data -> response_first_byte_timeout_task .fn != NULL ) {
553+ /* There is an outstanding response timeout task, but stream completed, we can cancel it now. We are
554+ * safe to do it as we always on connection thread to schedule the task or cancel it */
555+ struct aws_event_loop * connection_loop = aws_channel_get_event_loop (connection -> base .channel_slot -> channel );
556+ /* The task will be zeroed out within the call */
557+ aws_event_loop_cancel_task (connection_loop , & stream -> base .client_data -> response_first_byte_timeout_task );
558+ }
559+
550560 if (error_code != AWS_ERROR_SUCCESS ) {
551561 if (stream -> base .client_data && stream -> is_incoming_message_done ) {
552562 /* As a request that finished receiving the response, we ignore error and
@@ -721,6 +731,87 @@ static void s_client_update_incoming_stream_ptr(struct aws_h1_connection *connec
721731 s_set_incoming_stream_ptr (connection , desired );
722732}
723733
734+ static void s_http_stream_response_first_byte_timeout_task (
735+ struct aws_task * task ,
736+ void * arg ,
737+ enum aws_task_status status ) {
738+ (void )task ;
739+ struct aws_h1_stream * stream = arg ;
740+ struct aws_http_connection * connection_base = stream -> base .owning_connection ;
741+ /* zero-out task to indicate that it's no longer scheduled */
742+ AWS_ZERO_STRUCT (stream -> base .client_data -> response_first_byte_timeout_task );
743+
744+ if (status == AWS_TASK_STATUS_CANCELED ) {
745+ return ;
746+ }
747+
748+ struct aws_h1_connection * connection = AWS_CONTAINER_OF (connection_base , struct aws_h1_connection , base );
749+ /* Timeout happened, close the connection */
750+ uint64_t response_first_byte_timeout_ms = stream -> base .client_data -> response_first_byte_timeout_ms == 0
751+ ? connection_base -> client_data -> response_first_byte_timeout_ms
752+ : stream -> base .client_data -> response_first_byte_timeout_ms ;
753+ AWS_LOGF_INFO (
754+ AWS_LS_HTTP_CONNECTION ,
755+ "id=%p: Closing connection as timeout after request sent to the first byte received happened. "
756+ "response_first_byte_timeout_ms is %" PRIu64 "." ,
757+ (void * )connection_base ,
758+ response_first_byte_timeout_ms );
759+
760+ /* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
761+ s_stop (
762+ connection ,
763+ false /*stop_reading*/ ,
764+ false /*stop_writing*/ ,
765+ true /*schedule_shutdown*/ ,
766+ AWS_ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT );
767+ }
768+
769+ static void s_set_outgoing_message_done (struct aws_h1_stream * stream ) {
770+ struct aws_http_connection * connection = stream -> base .owning_connection ;
771+ struct aws_channel * channel = aws_http_connection_get_channel (connection );
772+ AWS_ASSERT (aws_channel_thread_is_callers_thread (channel ));
773+
774+ if (stream -> is_outgoing_message_done ) {
775+ /* Already did the job */
776+ return ;
777+ }
778+
779+ stream -> is_outgoing_message_done = true;
780+ AWS_ASSERT (stream -> base .metrics .send_end_timestamp_ns == -1 );
781+ aws_high_res_clock_get_ticks ((uint64_t * )& stream -> base .metrics .send_end_timestamp_ns );
782+ AWS_ASSERT (stream -> base .metrics .send_start_timestamp_ns != -1 );
783+ AWS_ASSERT (stream -> base .metrics .send_end_timestamp_ns >= stream -> base .metrics .send_start_timestamp_ns );
784+ stream -> base .metrics .sending_duration_ns =
785+ stream -> base .metrics .send_end_timestamp_ns - stream -> base .metrics .send_start_timestamp_ns ;
786+ if (stream -> base .metrics .receive_start_timestamp_ns == -1 ) {
787+ /* We haven't receive any message, schedule the response timeout task */
788+
789+ uint64_t response_first_byte_timeout_ms = 0 ;
790+ if (stream -> base .client_data != NULL && connection -> client_data != NULL ) {
791+ response_first_byte_timeout_ms = stream -> base .client_data -> response_first_byte_timeout_ms == 0
792+ ? connection -> client_data -> response_first_byte_timeout_ms
793+ : stream -> base .client_data -> response_first_byte_timeout_ms ;
794+ }
795+ if (response_first_byte_timeout_ms != 0 ) {
796+ /* The task should not be initialized before. */
797+ AWS_ASSERT (stream -> base .client_data -> response_first_byte_timeout_task .fn == NULL );
798+ aws_task_init (
799+ & stream -> base .client_data -> response_first_byte_timeout_task ,
800+ s_http_stream_response_first_byte_timeout_task ,
801+ stream ,
802+ "http_stream_response_first_byte_timeout_task" );
803+ uint64_t now_ns = 0 ;
804+ aws_channel_current_clock_time (channel , & now_ns );
805+ struct aws_event_loop * connection_loop = aws_channel_get_event_loop (channel );
806+ aws_event_loop_schedule_task_future (
807+ connection_loop ,
808+ & stream -> base .client_data -> response_first_byte_timeout_task ,
809+ now_ns + aws_timestamp_convert (
810+ response_first_byte_timeout_ms , AWS_TIMESTAMP_MILLIS , AWS_TIMESTAMP_NANOS , NULL ));
811+ }
812+ }
813+ }
814+
724815/**
725816 * If necessary, update `outgoing_stream` so it is pointing at a stream
726817 * with data to send, or NULL if all streams are done sending data.
@@ -735,13 +826,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
735826
736827 /* If current stream is done sending data... */
737828 if (current && !aws_h1_encoder_is_message_in_progress (& connection -> thread_data .encoder )) {
738- current -> is_outgoing_message_done = true;
739- AWS_ASSERT (current -> base .metrics .send_end_timestamp_ns == -1 );
740- aws_high_res_clock_get_ticks ((uint64_t * )& current -> base .metrics .send_end_timestamp_ns );
741- AWS_ASSERT (current -> base .metrics .send_start_timestamp_ns != -1 );
742- AWS_ASSERT (current -> base .metrics .send_end_timestamp_ns >= current -> base .metrics .send_start_timestamp_ns );
743- current -> base .metrics .sending_duration_ns =
744- current -> base .metrics .send_end_timestamp_ns - current -> base .metrics .send_start_timestamp_ns ;
829+ s_set_outgoing_message_done (current );
745830
746831 /* RFC-7230 section 6.6: Tear-down.
747832 * If this was the final stream, don't allows any further streams to be sent */
@@ -1124,16 +1209,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
11241209 AWS_LS_HTTP_STREAM ,
11251210 "id=%p: Received 'Connection: close' header, no more request data will be sent." ,
11261211 (void * )& incoming_stream -> base );
1127- incoming_stream -> is_outgoing_message_done = true;
1128- AWS_ASSERT (incoming_stream -> base .metrics .send_end_timestamp_ns == -1 );
1129- aws_high_res_clock_get_ticks ((uint64_t * )& incoming_stream -> base .metrics .send_end_timestamp_ns );
1130- AWS_ASSERT (incoming_stream -> base .metrics .send_start_timestamp_ns != -1 );
1131- AWS_ASSERT (
1132- incoming_stream -> base .metrics .send_end_timestamp_ns >=
1133- incoming_stream -> base .metrics .send_start_timestamp_ns );
1134- incoming_stream -> base .metrics .sending_duration_ns =
1135- incoming_stream -> base .metrics .send_end_timestamp_ns -
1136- incoming_stream -> base .metrics .send_start_timestamp_ns ;
1212+ s_set_outgoing_message_done (incoming_stream );
11371213 }
11381214 /* Stop writing right now.
11391215 * Shutdown will be scheduled after we finishing parsing the response */
@@ -1856,6 +1932,15 @@ static int s_try_process_next_stream_read_message(struct aws_h1_connection *conn
18561932 if (incoming_stream -> base .metrics .receive_start_timestamp_ns == -1 ) {
18571933 /* That's the first time for the stream receives any message */
18581934 aws_high_res_clock_get_ticks ((uint64_t * )& incoming_stream -> base .metrics .receive_start_timestamp_ns );
1935+ if (incoming_stream -> base .client_data &&
1936+ incoming_stream -> base .client_data -> response_first_byte_timeout_task .fn != NULL ) {
1937+ /* There is an outstanding response timeout task, as we already received the data, we can cancel it now. We
1938+ * are safe to do it as we always on connection thread to schedule the task or cancel it */
1939+ struct aws_event_loop * connection_loop = aws_channel_get_event_loop (connection -> base .channel_slot -> channel );
1940+ /* The task will be zeroed out within the call */
1941+ aws_event_loop_cancel_task (
1942+ connection_loop , & incoming_stream -> base .client_data -> response_first_byte_timeout_task );
1943+ }
18591944 }
18601945
18611946 /* As decoder runs, it invokes the internal s_decoder_X callbacks, which in turn invoke user callbacks.
0 commit comments