Skip to content

Commit 23f62ee

Browse files
authored
Move h1_stream variables, to make thread usage more explicit (#504)
Some cleanup before embarking on major plumbing... While developing HTTP/1, we stumbled into the pattern of grouping variables under `thread_data` or `synced_data`, so it was more explicit how they should be accessed. But we never moved ALL the variables into the proper groups. We had a TODO about this. Now it is TODONE. Also: Fix some flaky integration tests by accepting any "2xx" status-code, not just literal "200. I observed these tests failing when they sometimes got 202, instead of 200: h2_sm_acquire_stream & h2_sm_acquire_stream_multiple_connections. But I applied the same fix to any other test I found that strictly checked for 200.
1 parent 84e8b41 commit 23f62ee

File tree

6 files changed

+65
-61
lines changed

6 files changed

+65
-61
lines changed

include/aws/http/private/h1_stream.h

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,21 @@ struct aws_h1_stream {
4040
*/
4141
struct aws_channel_task cross_thread_work_task;
4242

43-
/* Message (derived from outgoing request or response) to be submitted to encoder */
44-
struct aws_h1_encoder_message encoder_message;
45-
46-
bool is_outgoing_message_done;
43+
struct {
44+
/* Message (derived from outgoing request or response) to be submitted to encoder */
45+
struct aws_h1_encoder_message encoder_message;
4746

48-
bool is_incoming_message_done;
49-
bool is_incoming_head_done;
47+
bool is_outgoing_message_done;
5048

51-
/* If true, this is the last stream the connection should process.
52-
* See RFC-7230 Section 6: Connection Management. */
53-
bool is_final_stream;
49+
bool is_incoming_message_done;
50+
bool is_incoming_head_done;
5451

55-
/* Buffer for incoming data that needs to stick around. */
56-
struct aws_byte_buf incoming_storage_buf;
52+
/* If true, this is the last stream the connection should process.
53+
* See RFC-7230 Section 6: Connection Management. */
54+
bool is_final_stream;
5755

58-
struct {
59-
/* TODO: move most other members in here */
56+
/* Buffer for incoming data that needs to stick around. */
57+
struct aws_byte_buf incoming_storage_buf;
6058

6159
/* List of `struct aws_h1_chunk`, used for chunked encoding.
6260
* Encoder completes/frees/pops front chunk when it's done sending. */
@@ -77,12 +75,16 @@ struct aws_h1_stream {
7775
* Sharing a lock is fine because it's rare for an HTTP/1 connection
7876
* to have more than one stream at a time. */
7977
struct {
78+
/* Outgoing response on "request handler" stream which has been submitted by user,
79+
* but hasn't yet moved to thread_data.encoder_message. */
80+
struct aws_h1_encoder_message pending_outgoing_response;
81+
8082
/* List of `struct aws_h1_chunk` which have been submitted by user,
81-
* but haven't yet moved to encoder_message.pending_chunk_list where the encoder will find them. */
83+
* but haven't yet moved to thread_data.encoder_message.pending_chunk_list where the encoder will find them. */
8284
struct aws_linked_list pending_chunk_list;
8385

8486
/* trailing headers which have been submitted by user,
85-
* but haven't yet moved to encoder_message where the encoder will find them. */
87+
* but haven't yet moved to thread_data.encoder_message where the encoder will find them. */
8688
struct aws_h1_trailer *pending_trailer;
8789

8890
enum aws_h1_stream_api_state api_state;

source/h1_connection.c

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
637637
}
638638

639639
if (error_code != AWS_ERROR_SUCCESS) {
640-
if (stream->base.client_data && stream->is_incoming_message_done) {
640+
if (stream->base.client_data && stream->thread_data.is_incoming_message_done) {
641641
/* As a request that finished receiving the response, we ignore error and
642642
* consider it finished successfully */
643643
AWS_LOGF_DEBUG(
@@ -649,7 +649,7 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
649649
aws_error_name(error_code));
650650
error_code = AWS_ERROR_SUCCESS;
651651
}
652-
if (stream->base.server_data && stream->is_outgoing_message_done) {
652+
if (stream->base.server_data && stream->thread_data.is_outgoing_message_done) {
653653
/* As a server finished sending the response, but still failed with the request was not finished receiving.
654654
* We ignore error and consider it finished successfully */
655655
AWS_LOGF_DEBUG(
@@ -693,7 +693,7 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
693693

694694
/* If connection must shut down, do it BEFORE invoking stream-complete callback.
695695
* That way, if aws_http_connection_is_open() is called from stream-complete callback, it returns false. */
696-
if (stream->is_final_stream) {
696+
if (stream->thread_data.is_final_stream) {
697697
AWS_LOGF_TRACE(
698698
AWS_LS_HTTP_CONNECTION,
699699
"id=%p: Closing connection due to completion of final stream.",
@@ -845,12 +845,12 @@ static void s_set_outgoing_message_done(struct aws_h1_stream *stream) {
845845
struct aws_channel *channel = aws_http_connection_get_channel(connection);
846846
AWS_ASSERT(aws_channel_thread_is_callers_thread(channel));
847847

848-
if (stream->is_outgoing_message_done) {
848+
if (stream->thread_data.is_outgoing_message_done) {
849849
/* Already did the job */
850850
return;
851851
}
852852

853-
stream->is_outgoing_message_done = true;
853+
stream->thread_data.is_outgoing_message_done = true;
854854
AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns == -1);
855855
aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.send_end_timestamp_ns);
856856
AWS_ASSERT(stream->base.metrics.send_start_timestamp_ns != -1);
@@ -904,7 +904,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
904904

905905
/* RFC-7230 section 6.6: Tear-down.
906906
* If this was the final stream, don't allows any further streams to be sent */
907-
if (current->is_final_stream) {
907+
if (current->thread_data.is_final_stream) {
908908
AWS_LOGF_TRACE(
909909
AWS_LS_HTTP_CONNECTION,
910910
"id=%p: Done sending final stream, no further streams will be sent.",
@@ -919,7 +919,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
919919
}
920920

921921
/* If it's also done receiving data, then it's complete! */
922-
if (current->is_incoming_message_done) {
922+
if (current->thread_data.is_incoming_message_done) {
923923
/* Only 1st stream in list could finish receiving before it finished sending */
924924
AWS_ASSERT(&current->node == aws_linked_list_begin(&connection->thread_data.stream_list));
925925

@@ -942,7 +942,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
942942
struct aws_h1_stream *stream = AWS_CONTAINER_OF(node, struct aws_h1_stream, node);
943943

944944
/* If we already sent this stream's data, keep looking... */
945-
if (stream->is_outgoing_message_done) {
945+
if (stream->thread_data.is_outgoing_message_done) {
946946
continue;
947947
}
948948

@@ -975,7 +975,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
975975
aws_high_res_clock_get_ticks((uint64_t *)&current->base.metrics.send_start_timestamp_ns);
976976

977977
err = aws_h1_encoder_start_message(
978-
&connection->thread_data.encoder, &current->encoder_message, &current->base);
978+
&connection->thread_data.encoder, &current->thread_data.encoder_message, &current->base);
979979
(void)err;
980980
AWS_ASSERT(connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_INIT);
981981
AWS_ASSERT(!err);
@@ -1177,7 +1177,7 @@ static int s_decoder_on_request(
11771177
AWS_BYTE_CURSOR_PRI(*uri));
11781178

11791179
/* Copy strings to internal buffer */
1180-
struct aws_byte_buf *storage_buf = &incoming_stream->incoming_storage_buf;
1180+
struct aws_byte_buf *storage_buf = &incoming_stream->thread_data.incoming_storage_buf;
11811181
AWS_ASSERT(storage_buf->capacity == 0);
11821182

11831183
size_t storage_size = 0;
@@ -1261,7 +1261,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
12611261
"id=%p: Received 'Connection: close' header. This will be the final stream on this connection.",
12621262
(void *)&incoming_stream->base);
12631263

1264-
incoming_stream->is_final_stream = true;
1264+
incoming_stream->thread_data.is_final_stream = true;
12651265
{ /* BEGIN CRITICAL SECTION */
12661266
aws_h1_connection_lock_synced_data(connection);
12671267
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
@@ -1278,7 +1278,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
12781278
* Mark the stream's outgoing message as complete,
12791279
* so that we stop sending, and stop waiting for it to finish sending.
12801280
**/
1281-
if (!incoming_stream->is_outgoing_message_done) {
1281+
if (!incoming_stream->thread_data.is_outgoing_message_done) {
12821282
AWS_LOGF_DEBUG(
12831283
AWS_LS_HTTP_STREAM,
12841284
"id=%p: Received 'Connection: close' header, no more request data will be sent.",
@@ -1323,7 +1323,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
13231323

13241324
static int s_mark_head_done(struct aws_h1_stream *incoming_stream) {
13251325
/* Bail out if we've already done this */
1326-
if (incoming_stream->is_incoming_head_done) {
1326+
if (incoming_stream->thread_data.is_incoming_head_done) {
13271327
return AWS_OP_SUCCESS;
13281328
}
13291329

@@ -1335,7 +1335,7 @@ static int s_mark_head_done(struct aws_h1_stream *incoming_stream) {
13351335

13361336
if (header_block == AWS_HTTP_HEADER_BLOCK_MAIN) {
13371337
AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Main header block done.", (void *)&incoming_stream->base);
1338-
incoming_stream->is_incoming_head_done = true;
1338+
incoming_stream->thread_data.is_incoming_head_done = true;
13391339

13401340
} else if (header_block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) {
13411341
AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Informational header block done.", (void *)&incoming_stream->base);
@@ -1443,7 +1443,7 @@ static int s_decoder_on_done(void *user_data) {
14431443
}
14441444

14451445
/* Otherwise the incoming stream is finished decoding and we will update it if needed */
1446-
incoming_stream->is_incoming_message_done = true;
1446+
incoming_stream->thread_data.is_incoming_message_done = true;
14471447
aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_end_timestamp_ns);
14481448
AWS_ASSERT(incoming_stream->base.metrics.receive_start_timestamp_ns != -1);
14491449
AWS_ASSERT(
@@ -1454,7 +1454,7 @@ static int s_decoder_on_done(void *user_data) {
14541454

14551455
/* RFC-7230 section 6.6
14561456
* After reading the final message, the connection must not read any more */
1457-
if (incoming_stream->is_final_stream) {
1457+
if (incoming_stream->thread_data.is_final_stream) {
14581458
AWS_LOGF_TRACE(
14591459
AWS_LS_HTTP_CONNECTION,
14601460
"id=%p: Done reading final stream, no further streams will be read.",
@@ -1479,13 +1479,13 @@ static int s_decoder_on_done(void *user_data) {
14791479
return AWS_OP_ERR;
14801480
}
14811481
}
1482-
if (incoming_stream->is_outgoing_message_done) {
1482+
if (incoming_stream->thread_data.is_outgoing_message_done) {
14831483
AWS_ASSERT(&incoming_stream->node == aws_linked_list_begin(&connection->thread_data.stream_list));
14841484
s_stream_complete(incoming_stream, AWS_ERROR_SUCCESS);
14851485
}
14861486
s_set_incoming_stream_ptr(connection, NULL);
14871487

1488-
} else if (incoming_stream->is_outgoing_message_done) {
1488+
} else if (incoming_stream->thread_data.is_outgoing_message_done) {
14891489
/* Client side */
14901490
AWS_ASSERT(&incoming_stream->node == aws_linked_list_begin(&connection->thread_data.stream_list));
14911491

source/h1_stream.c

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ static void s_stream_destroy(struct aws_http_stream *stream_base) {
2323
aws_linked_list_empty(&stream->synced_data.pending_chunk_list) &&
2424
"Chunks should be marked complete before stream destroyed");
2525

26-
aws_h1_encoder_message_clean_up(&stream->encoder_message);
27-
aws_byte_buf_clean_up(&stream->incoming_storage_buf);
26+
aws_h1_encoder_message_clean_up(&stream->thread_data.encoder_message);
27+
aws_h1_encoder_message_clean_up(&stream->synced_data.pending_outgoing_response);
28+
aws_byte_buf_clean_up(&stream->thread_data.incoming_storage_buf);
2829
aws_mem_release(stream->base.alloc, stream);
2930
}
3031

@@ -58,29 +59,33 @@ static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void
5859

5960
int api_state = stream->synced_data.api_state;
6061

61-
bool found_chunks = !aws_linked_list_empty(&stream->synced_data.pending_chunk_list);
62+
/* If we have any new outgoing data, prompt the connection to try and send it. */
63+
bool new_outgoing_data = !aws_linked_list_empty(&stream->synced_data.pending_chunk_list);
6264
aws_linked_list_move_all_back(&stream->thread_data.pending_chunk_list, &stream->synced_data.pending_chunk_list);
6365

64-
stream->encoder_message.trailer = stream->synced_data.pending_trailer;
65-
stream->synced_data.pending_trailer = NULL;
66+
/* If we JUST learned about having an outgoing response, that's a reason to try sending data */
67+
if (stream->synced_data.has_outgoing_response && !stream->thread_data.has_outgoing_response) {
68+
stream->thread_data.has_outgoing_response = true;
69+
new_outgoing_data = true;
6670

67-
bool has_outgoing_response = stream->synced_data.has_outgoing_response;
71+
stream->thread_data.encoder_message = stream->synced_data.pending_outgoing_response;
72+
AWS_ZERO_STRUCT(stream->synced_data.pending_outgoing_response);
73+
74+
if (stream->thread_data.encoder_message.has_connection_close_header) {
75+
/* This will be the last stream connection will process */
76+
stream->thread_data.is_final_stream = true;
77+
}
78+
}
79+
80+
stream->thread_data.encoder_message.trailer = stream->synced_data.pending_trailer;
81+
stream->synced_data.pending_trailer = NULL;
6882

6983
uint64_t pending_window_update = stream->synced_data.pending_window_update;
7084
stream->synced_data.pending_window_update = 0;
7185

7286
s_stream_unlock_synced_data(stream);
7387
/* END CRITICAL SECTION */
7488

75-
/* If we have any new outgoing data, prompt the connection to try and send it. */
76-
bool new_outgoing_data = found_chunks;
77-
78-
/* If we JUST learned about having an outgoing response, that's a reason to try sending data */
79-
if (has_outgoing_response && !stream->thread_data.has_outgoing_response) {
80-
stream->thread_data.has_outgoing_response = true;
81-
new_outgoing_data = true;
82-
}
83-
8489
if (new_outgoing_data && (api_state == AWS_H1_STREAM_API_STATE_ACTIVE)) {
8590
aws_h1_connection_try_write_outgoing_stream(connection);
8691
}
@@ -413,7 +418,7 @@ struct aws_h1_stream *aws_h1_stream_new_request(
413418

414419
/* Validate request and cache info that the encoder will eventually need */
415420
if (aws_h1_encoder_message_init_from_request(
416-
&stream->encoder_message,
421+
&stream->thread_data.encoder_message,
417422
client_connection->alloc,
418423
options->request,
419424
&stream->thread_data.pending_chunk_list)) {
@@ -422,11 +427,11 @@ struct aws_h1_stream *aws_h1_stream_new_request(
422427

423428
/* RFC-7230 Section 6.3: The "close" connection option is used to signal
424429
* that a connection will not persist after the current request/response*/
425-
if (stream->encoder_message.has_connection_close_header) {
426-
stream->is_final_stream = true;
430+
if (stream->thread_data.encoder_message.has_connection_close_header) {
431+
stream->thread_data.is_final_stream = true;
427432
}
428433

429-
stream->synced_data.using_chunked_encoding = stream->encoder_message.has_chunked_encoding_header;
434+
stream->synced_data.using_chunked_encoding = stream->thread_data.encoder_message.has_chunked_encoding_header;
430435

431436
return stream;
432437

@@ -493,16 +498,15 @@ int aws_h1_stream_send_response(struct aws_h1_stream *stream, struct aws_http_me
493498
error_code = AWS_ERROR_INVALID_STATE;
494499
} else {
495500
stream->synced_data.has_outgoing_response = true;
496-
stream->encoder_message = encoder_message;
501+
stream->synced_data.pending_outgoing_response = encoder_message;
497502
if (encoder_message.has_connection_close_header) {
498503
/* This will be the last stream connection will process, new streams will be rejected */
499-
stream->is_final_stream = true;
500504

501505
/* Note: We're touching the connection's synced_data, which is OK
502506
* because an h1_connection and all its h1_streams share a single lock. */
503507
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
504508
}
505-
stream->synced_data.using_chunked_encoding = stream->encoder_message.has_chunked_encoding_header;
509+
stream->synced_data.using_chunked_encoding = encoder_message.has_chunked_encoding_header;
506510

507511
should_schedule_task = !stream->synced_data.is_cross_thread_work_task_scheduled;
508512
stream->synced_data.is_cross_thread_work_task_scheduled = true;

tests/test_connection_manager.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,7 +1564,7 @@ static int s_aws_http_on_incoming_header_block_done_proxy_test(
15641564
struct cm_tester *tester = &s_tester;
15651565
if (aws_http_stream_get_incoming_response_status(stream, &s_response_status_code) == AWS_OP_SUCCESS) {
15661566
aws_mutex_lock(&tester->lock);
1567-
tester->proxy_request_successful = s_response_status_code == 200;
1567+
tester->proxy_request_successful = s_response_status_code / 100 == 2;
15681568
aws_mutex_unlock(&tester->lock);
15691569
}
15701570

@@ -1771,7 +1771,7 @@ static int s_proxy_integration_test_helper_general(
17711771
aws_http_stream_activate(stream);
17721772

17731773
ASSERT_SUCCESS(s_wait_on_proxy_request_complete());
1774-
ASSERT_TRUE(s_response_status_code == 200);
1774+
ASSERT_TRUE(s_response_status_code / 100 == 2);
17751775

17761776
aws_http_stream_release(stream);
17771777
aws_http_message_destroy(request);

tests/test_localhost_integ.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ struct tester {
7878
size_t stream_completed_count;
7979
size_t stream_complete_errors;
8080
size_t stream_200_count;
81-
size_t stream_4xx_count;
8281
size_t stream_status_not_200_count;
8382

8483
uint64_t num_sen_received;
@@ -187,10 +186,9 @@ static void s_tester_on_stream_completed(struct aws_http_stream *stream, int err
187186
++s_tester.stream_complete_errors;
188187
s_tester.stream_completed_error_code = aws_last_error();
189188
} else {
190-
if (status == 200) {
189+
if (status / 100 == 2) {
191190
s_tester.stream_completed_with_200 = true;
192191
++s_tester.stream_200_count;
193-
} else if (status / 100 == 4) {
194192
} else {
195193
++s_tester.stream_status_not_200_count;
196194
}

tests/test_stream_manager.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ static void s_sm_tester_on_stream_complete(struct aws_http_stream *stream, int e
500500
++s_tester.stream_complete_errors;
501501
s_tester.stream_completed_error_code = aws_last_error();
502502
} else {
503-
if (status == 200) {
503+
if (status / 100 == 2) {
504504
++s_tester.stream_200_count;
505505
} else {
506506
++s_tester.stream_status_not_200_count;

0 commit comments

Comments
 (0)