Skip to content

Commit 18352c8

Browse files
TingDaoKgraebm
andauthored
Add support for cancel stream (#458)
Co-authored-by: Michael Graeb <graebm@amazon.com>
1 parent 0c96502 commit 18352c8

File tree

10 files changed

+179
-9
lines changed

10 files changed

+179
-9
lines changed

include/aws/http/private/h1_stream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ struct aws_h1_stream *aws_h1_stream_new_request(
117117
struct aws_h1_stream *aws_h1_stream_new_request_handler(const struct aws_http_request_handler_options *options);
118118

119119
int aws_h1_stream_activate(struct aws_http_stream *stream);
120+
void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code);
120121

121122
int aws_h1_stream_send_response(struct aws_h1_stream *stream, struct aws_http_message *response);
122123

include/aws/http/private/request_response_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct aws_http_stream_vtable {
1717
void (*destroy)(struct aws_http_stream *stream);
1818
void (*update_window)(struct aws_http_stream *stream, size_t increment_size);
1919
int (*activate)(struct aws_http_stream *stream);
20+
void (*cancel)(struct aws_http_stream *stream, int error_code);
2021

2122
int (*http1_write_chunk)(struct aws_http_stream *http1_stream, const struct aws_http1_chunk_options *options);
2223
int (*http1_add_trailer)(struct aws_http_stream *http1_stream, const struct aws_http_headers *trailing_headers);

include/aws/http/request_response.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,18 @@ void aws_http_stream_update_window(struct aws_http_stream *stream, size_t increm
11091109
AWS_HTTP_API
11101110
uint32_t aws_http_stream_get_id(const struct aws_http_stream *stream);
11111111

1112+
/**
1113+
* Cancel the stream in flight.
1114+
* For HTTP/1.1 streams, it's equivalent to closing the connection.
1115+
* For HTTP/2 streams, it's equivalent to calling reset on the stream with `AWS_HTTP2_ERR_CANCEL`.
1116+
*
1117+
* the stream will complete with the error code provided, unless the stream is
1118+
* already completing for other reasons, or the stream is not activated,
1119+
* in which case this call will have no impact.
1120+
*/
1121+
AWS_HTTP_API
1122+
void aws_http_stream_cancel(struct aws_http_stream *stream, int error_code);
1123+
11121124
/**
11131125
* Reset the HTTP/2 stream (HTTP/2 only).
11141126
* Note that if the stream closes before this async call is fully processed, the RST_STREAM frame will not be sent.

source/h1_connection.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,34 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) {
388388
return AWS_OP_SUCCESS;
389389
}
390390

391+
void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code) {
392+
struct aws_h1_stream *h1_stream = AWS_CONTAINER_OF(stream, struct aws_h1_stream, base);
393+
struct aws_http_connection *base_connection = stream->owning_connection;
394+
struct aws_h1_connection *connection = AWS_CONTAINER_OF(base_connection, struct aws_h1_connection, base);
395+
396+
{ /* BEGIN CRITICAL SECTION */
397+
aws_h1_connection_lock_synced_data(connection);
398+
if (h1_stream->synced_data.api_state != AWS_H1_STREAM_API_STATE_ACTIVE ||
399+
connection->synced_data.is_open == false) {
400+
/* Not active, nothing to cancel. */
401+
aws_h1_connection_unlock_synced_data(connection);
402+
AWS_LOGF_DEBUG(AWS_LS_HTTP_STREAM, "id=%p: Stream not active, nothing to cancel.", (void *)stream);
403+
return;
404+
}
405+
406+
aws_h1_connection_unlock_synced_data(connection);
407+
} /* END CRITICAL SECTION */
408+
AWS_LOGF_INFO(
409+
AWS_LS_HTTP_CONNECTION,
410+
"id=%p: Connection shutting down due to stream=%p cancelled with error code %d (%s).",
411+
(void *)&connection->base,
412+
(void *)stream,
413+
error_code,
414+
aws_error_name(error_code));
415+
416+
s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, error_code);
417+
}
418+
391419
struct aws_http_stream *s_make_request(
392420
struct aws_http_connection *client_connection,
393421
const struct aws_http_make_request_options *options) {

source/h1_stream.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ static const struct aws_http_stream_vtable s_stream_vtable = {
329329
.destroy = s_stream_destroy,
330330
.update_window = s_stream_update_window,
331331
.activate = aws_h1_stream_activate,
332+
.cancel = aws_h1_stream_cancel,
332333
.http1_write_chunk = s_stream_write_chunk,
333334
.http1_add_trailer = s_stream_add_trailer,
334335
.http2_reset_stream = NULL,

source/h2_stream.c

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ static int s_stream_write_data(
2727

2828
static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
2929
static struct aws_h2err s_send_rst_and_close_stream(struct aws_h2_stream *stream, struct aws_h2err stream_error);
30-
static int s_stream_reset_stream_internal(struct aws_http_stream *stream_base, struct aws_h2err stream_error);
30+
static int s_stream_reset_stream_internal(
31+
struct aws_http_stream *stream_base,
32+
struct aws_h2err stream_error,
33+
bool cancelling);
34+
static void s_stream_cancel(struct aws_http_stream *stream, int error_code);
3135

3236
struct aws_http_stream_vtable s_h2_stream_vtable = {
3337
.destroy = s_stream_destroy,
3438
.update_window = s_stream_update_window,
3539
.activate = aws_h2_stream_activate,
40+
.cancel = s_stream_cancel,
3641
.http1_write_chunk = NULL,
3742
.http2_reset_stream = s_stream_reset_stream,
3843
.http2_get_received_error_code = s_stream_get_received_error_code,
@@ -526,12 +531,16 @@ static void s_stream_update_window(struct aws_http_stream *stream_base, size_t i
526531
.h2_code = AWS_HTTP2_ERR_INTERNAL_ERROR,
527532
};
528533
/* Only when stream is not initialized reset will fail. So, we can assert it to be succeed. */
529-
AWS_FATAL_ASSERT(s_stream_reset_stream_internal(stream_base, stream_error) == AWS_OP_SUCCESS);
534+
AWS_FATAL_ASSERT(
535+
s_stream_reset_stream_internal(stream_base, stream_error, false /*cancelling*/) == AWS_OP_SUCCESS);
530536
}
531537
return;
532538
}
533539

534-
static int s_stream_reset_stream_internal(struct aws_http_stream *stream_base, struct aws_h2err stream_error) {
540+
static int s_stream_reset_stream_internal(
541+
struct aws_http_stream *stream_base,
542+
struct aws_h2err stream_error,
543+
bool cancelling) {
535544

536545
struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base);
537546
struct aws_h2_connection *connection = s_get_h2_connection(stream);
@@ -553,21 +562,25 @@ static int s_stream_reset_stream_internal(struct aws_http_stream *stream_base, s
553562
} /* END CRITICAL SECTION */
554563

555564
if (stream_is_init) {
565+
if (cancelling) {
566+
/* Not an error if we are just cancelling. */
567+
AWS_LOGF_DEBUG(AWS_LS_HTTP_STREAM, "id=%p: Stream not in process, nothing to cancel.", (void *)stream);
568+
return AWS_OP_SUCCESS;
569+
}
556570
AWS_H2_STREAM_LOG(
557571
ERROR, stream, "Reset stream failed. Stream is in initialized state, please activate the stream first.");
558572
return aws_raise_error(AWS_ERROR_INVALID_STATE);
559573
}
574+
if (reset_called) {
575+
AWS_H2_STREAM_LOG(DEBUG, stream, "Reset stream ignored. Reset stream has been called already.");
576+
}
577+
560578
if (cross_thread_work_should_schedule) {
561579
AWS_H2_STREAM_LOG(TRACE, stream, "Scheduling stream cross-thread work task");
562580
/* increment the refcount of stream to keep it alive until the task runs */
563581
aws_atomic_fetch_add(&stream->base.refcount, 1);
564582
aws_channel_schedule_task_now(connection->base.channel_slot->channel, &stream->cross_thread_work_task);
565-
return AWS_OP_SUCCESS;
566583
}
567-
if (reset_called) {
568-
AWS_H2_STREAM_LOG(DEBUG, stream, "Reset stream ignored. Reset stream has been called already.");
569-
}
570-
571584
return AWS_OP_SUCCESS;
572585
}
573586

@@ -583,7 +596,16 @@ static int s_stream_reset_stream(struct aws_http_stream *stream_base, uint32_t h
583596
(void *)stream_base,
584597
aws_http2_error_code_to_str(http2_error),
585598
http2_error);
586-
return s_stream_reset_stream_internal(stream_base, stream_error);
599+
return s_stream_reset_stream_internal(stream_base, stream_error, false /*cancelling*/);
600+
}
601+
602+
void s_stream_cancel(struct aws_http_stream *stream_base, int error_code) {
603+
struct aws_h2err stream_error = {
604+
.aws_code = error_code,
605+
.h2_code = AWS_HTTP2_ERR_CANCEL,
606+
};
607+
s_stream_reset_stream_internal(stream_base, stream_error, true /*cancelling*/);
608+
return;
587609
}
588610

589611
static int s_stream_get_received_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error) {

source/request_response.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,10 @@ uint32_t aws_http_stream_get_id(const struct aws_http_stream *stream) {
12011201
return stream->id;
12021202
}
12031203

1204+
void aws_http_stream_cancel(struct aws_http_stream *stream, int error_code) {
1205+
stream->vtable->cancel(stream, error_code);
1206+
}
1207+
12041208
int aws_http2_stream_reset(struct aws_http_stream *http2_stream, uint32_t http2_error) {
12051209
AWS_PRECONDITION(http2_stream);
12061210
AWS_PRECONDITION(http2_stream->vtable);

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ add_test_case(h1_client_switching_protocols_fails_pending_requests)
143143
add_test_case(h1_client_switching_protocols_fails_subsequent_requests)
144144
add_test_case(h1_client_switching_protocols_requires_downstream_handler)
145145
add_test_case(h1_client_connection_close_before_request_finishes)
146+
add_test_case(h1_client_stream_cancel)
146147
add_test_case(h1_client_response_close_connection_before_request_finishes)
147148
add_test_case(h1_client_response_first_byte_timeout_connection)
148149
add_test_case(h1_client_response_first_byte_timeout_request_override)
@@ -475,6 +476,7 @@ add_test_case(h2_client_conn_failed_initial_settings_completed_not_invoked)
475476
add_test_case(h2_client_stream_reset_stream)
476477
add_test_case(h2_client_stream_reset_ignored_stream_closed)
477478
add_test_case(h2_client_stream_reset_failed_before_activate_called)
479+
add_test_case(h2_client_stream_cancel_stream)
478480
add_test_case(h2_client_stream_keeps_alive_for_cross_thread_task)
479481
add_test_case(h2_client_stream_get_received_reset_error_code)
480482
add_test_case(h2_client_stream_get_sent_reset_error_code)

tests/test_h1_client.c

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4236,6 +4236,62 @@ H1_CLIENT_TEST_CASE(h1_client_connection_close_before_request_finishes) {
42364236
return AWS_OP_SUCCESS;
42374237
}
42384238

4239+
H1_CLIENT_TEST_CASE(h1_client_stream_cancel) {
4240+
(void)ctx;
4241+
struct tester tester;
4242+
ASSERT_SUCCESS(s_tester_init(&tester, allocator));
4243+
4244+
/* set up request whose body won't send immediately */
4245+
struct slow_body_sender body_sender;
4246+
AWS_ZERO_STRUCT(body_sender);
4247+
s_slow_body_sender_init(&body_sender);
4248+
struct aws_input_stream *body_stream = &body_sender.base;
4249+
4250+
struct aws_http_header headers[] = {
4251+
{
4252+
.name = aws_byte_cursor_from_c_str("Content-Length"),
4253+
.value = aws_byte_cursor_from_c_str("16"),
4254+
},
4255+
};
4256+
4257+
struct aws_http_message *request = aws_http_message_new_request(allocator);
4258+
ASSERT_NOT_NULL(request);
4259+
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
4260+
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt")));
4261+
ASSERT_SUCCESS(aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)));
4262+
aws_http_message_set_body_stream(request, body_stream);
4263+
4264+
struct client_stream_tester stream_tester;
4265+
ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, &tester, request));
4266+
4267+
/* send head of request */
4268+
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
4269+
4270+
/* Ensure the request can be destroyed after request is sent */
4271+
aws_http_message_destroy(request);
4272+
aws_input_stream_release(body_stream);
4273+
4274+
/* Something absurd */
4275+
aws_http_stream_cancel(stream_tester.stream, AWS_ERROR_COND_VARIABLE_ERROR_UNKNOWN);
4276+
/* The second call will take not action */
4277+
aws_http_stream_cancel(stream_tester.stream, AWS_ERROR_SUCCESS);
4278+
/* Wait for channel to finish shutdown */
4279+
testing_channel_drain_queued_tasks(&tester.testing_channel);
4280+
/* check result, should not receive any body */
4281+
const char *expected = "PUT /plan.txt HTTP/1.1\r\n"
4282+
"Content-Length: 16\r\n"
4283+
"\r\n";
4284+
ASSERT_SUCCESS(testing_channel_check_written_messages_str(&tester.testing_channel, allocator, expected));
4285+
4286+
ASSERT_TRUE(stream_tester.complete);
4287+
ASSERT_INT_EQUALS(AWS_ERROR_COND_VARIABLE_ERROR_UNKNOWN, stream_tester.on_complete_error_code);
4288+
4289+
/* clean up */
4290+
client_stream_tester_clean_up(&stream_tester);
4291+
ASSERT_SUCCESS(s_tester_clean_up(&tester));
4292+
return AWS_OP_SUCCESS;
4293+
}
4294+
42394295
/* When response has `connection: close` any further request body should not be sent. */
42404296
H1_CLIENT_TEST_CASE(h1_client_response_close_connection_before_request_finishes) {
42414297
(void)ctx;

tests/test_h2_client.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4508,6 +4508,49 @@ TEST_CASE(h2_client_stream_reset_failed_before_activate_called) {
45084508
return s_tester_clean_up();
45094509
}
45104510

4511+
TEST_CASE(h2_client_stream_cancel_stream) {
4512+
ASSERT_SUCCESS(s_tester_init(allocator, ctx));
4513+
/* get connection preface and acks out of the way */
4514+
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
4515+
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
4516+
struct aws_http_message *request = aws_http2_message_new_request(allocator);
4517+
ASSERT_NOT_NULL(request);
4518+
4519+
struct aws_http_header request_headers_src[] = {
4520+
DEFINE_HEADER(":method", "GET"),
4521+
DEFINE_HEADER(":scheme", "https"),
4522+
DEFINE_HEADER(":path", "/"),
4523+
};
4524+
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
4525+
struct aws_http_make_request_options request_options = {
4526+
.self_size = sizeof(request_options),
4527+
.request = request,
4528+
};
4529+
4530+
struct client_stream_tester stream_tester;
4531+
ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, request));
4532+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
4533+
4534+
/* Cancel the request */
4535+
aws_http_stream_cancel(stream_tester.stream, AWS_ERROR_COND_VARIABLE_ERROR_UNKNOWN);
4536+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
4537+
4538+
ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection));
4539+
ASSERT_TRUE(stream_tester.complete);
4540+
ASSERT_INT_EQUALS(AWS_ERROR_COND_VARIABLE_ERROR_UNKNOWN, stream_tester.on_complete_error_code);
4541+
/* validate that stream sent RST_STREAM */
4542+
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
4543+
struct h2_decoded_frame *rst_stream_frame =
4544+
h2_decode_tester_find_frame(&s_tester.peer.decode, AWS_H2_FRAME_T_RST_STREAM, 0, NULL);
4545+
/* But the error code is not the same as user was trying to send */
4546+
ASSERT_UINT_EQUALS(AWS_HTTP2_ERR_CANCEL, rst_stream_frame->error_code);
4547+
4548+
/* clean up */
4549+
aws_http_message_release(request);
4550+
client_stream_tester_clean_up(&stream_tester);
4551+
return s_tester_clean_up();
4552+
}
4553+
45114554
TEST_CASE(h2_client_stream_keeps_alive_for_cross_thread_task) {
45124555

45134556
ASSERT_SUCCESS(s_tester_init(allocator, ctx));

0 commit comments

Comments
 (0)