Skip to content

Commit 878b4fa

Browse files
authored
Fix bug where last few bytes on socket go unread (#642)
**Issue:** aws-c-s3 is occasionally seeing errors when the server sends an HTTP response with a `Connection: close` header (meaning it intends to close the connection after the response is sent). The server is sending the full response, then immediately hanging up. But the last few bytes of the response never make it to the HTTP client. **Diagnosis:** If a peer closed their socket immediately after the last few bytes were sent, our socket code wasn't always reading those last few bytes. On Linux, if a socket has unread data AND the peer has closed their side, the event from epoll has 2 flags set: `EPOLLIN|EPOLLRDHUP`. This means "the socket is has data to read AND the other side is closed (or half-closed) and won't be sending any more data". Our [socket handler code](https://github.com/awslabs/aws-c-io/blob/e762fd250589dfa98a9cce9c3ffca3414d99fdda/source/socket_channel_handler.c#L217-L225) *kinda* did the right thing by "attempting" to read from the socket before shutting down the channel. But if the downstream read window reached 0, that "attempt" wouldn't read all the data. **Description of changes:** The socket handler no longer shuts down the channel in response to an error event. Instead, the error event queues more reads to happen. And only when `read()` reports that the socket is finished (due to error or EOF), will the socket handler shut down the channel.
1 parent e762fd2 commit 878b4fa

File tree

7 files changed

+436
-144
lines changed

7 files changed

+436
-144
lines changed

source/posix/socket.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,6 +1668,23 @@ static void s_on_socket_io_event(
16681668
* subscribed is set to false. */
16691669
aws_ref_count_acquire(&socket_impl->internal_refcount);
16701670

1671+
/* NOTE: READABLE|WRITABLE|HANG_UP events might arrive simultaneously
1672+
* (e.g. peer sends last few bytes and immediately hangs up).
1673+
* Notify user of READABLE|WRITABLE events first, so they try to read any remaining bytes. */
1674+
1675+
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
1676+
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
1677+
if (socket->readable_fn) {
1678+
socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
1679+
}
1680+
}
1681+
/* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
1682+
* have been cleaned up, so this next branch is safe. */
1683+
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
1684+
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
1685+
s_process_socket_write_requests(socket, NULL);
1686+
}
1687+
16711688
if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) {
16721689
aws_raise_error(AWS_IO_SOCKET_CLOSED);
16731690
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd);
@@ -1688,19 +1705,6 @@ static void s_on_socket_io_event(
16881705
goto end_check;
16891706
}
16901707

1691-
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
1692-
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
1693-
if (socket->readable_fn) {
1694-
socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
1695-
}
1696-
}
1697-
/* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
1698-
* have been cleaned up, so this next branch is safe. */
1699-
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
1700-
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
1701-
s_process_socket_write_requests(socket, NULL);
1702-
}
1703-
17041708
end_check:
17051709
aws_ref_count_release(&socket_impl->internal_refcount);
17061710
}

source/socket_channel_handler.c

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
122122
*/
123123
static void s_do_read(struct socket_handler *socket_handler) {
124124

125+
if (socket_handler->shutdown_in_progress) {
126+
return;
127+
}
128+
125129
size_t downstream_window = aws_channel_slot_downstream_read_window(socket_handler->slot);
126130
size_t max_to_read =
127131
downstream_window > socket_handler->max_rw_size ? socket_handler->max_rw_size : downstream_window;
@@ -139,17 +143,20 @@ static void s_do_read(struct socket_handler *socket_handler) {
139143

140144
size_t total_read = 0;
141145
size_t read = 0;
142-
while (total_read < max_to_read && !socket_handler->shutdown_in_progress) {
146+
int last_error = 0;
147+
while (total_read < max_to_read) {
143148
size_t iter_max_read = max_to_read - total_read;
144149

145150
struct aws_io_message *message = aws_channel_acquire_message_from_pool(
146151
socket_handler->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, iter_max_read);
147152

148153
if (!message) {
154+
last_error = aws_last_error();
149155
break;
150156
}
151157

152158
if (aws_socket_read(socket_handler->socket, &message->message_data, &read)) {
159+
last_error = aws_last_error();
153160
aws_mem_release(message->allocator, message);
154161
break;
155162
}
@@ -162,6 +169,7 @@ static void s_do_read(struct socket_handler *socket_handler) {
162169
(unsigned long long)read);
163170

164171
if (aws_channel_slot_send_message(socket_handler->slot, message, AWS_CHANNEL_DIR_READ)) {
172+
last_error = aws_last_error();
165173
aws_mem_release(message->allocator, message);
166174
break;
167175
}
@@ -170,30 +178,29 @@ static void s_do_read(struct socket_handler *socket_handler) {
170178
AWS_LOGF_TRACE(
171179
AWS_LS_IO_SOCKET_HANDLER,
172180
"id=%p: total read on this tick %llu",
173-
(void *)&socket_handler->slot->handler,
181+
(void *)socket_handler->slot->handler,
174182
(unsigned long long)total_read);
175183

176184
socket_handler->stats.bytes_read += total_read;
177185

178186
/* resubscribe as long as there's no error, just return if we're in a would block scenario. */
179187
if (total_read < max_to_read) {
180-
int last_error = aws_last_error();
188+
AWS_ASSERT(last_error != 0);
181189

182-
if (last_error != AWS_IO_READ_WOULD_BLOCK && !socket_handler->shutdown_in_progress) {
190+
if (last_error != AWS_IO_READ_WOULD_BLOCK) {
183191
aws_channel_shutdown(socket_handler->slot->channel, last_error);
192+
} else {
193+
AWS_LOGF_TRACE(
194+
AWS_LS_IO_SOCKET_HANDLER,
195+
"id=%p: out of data to read on socket. "
196+
"Waiting on event-loop notification.",
197+
(void *)socket_handler->slot->handler);
184198
}
185-
186-
AWS_LOGF_TRACE(
187-
AWS_LS_IO_SOCKET_HANDLER,
188-
"id=%p: out of data to read on socket. "
189-
"Waiting on event-loop notification.",
190-
(void *)socket_handler->slot->handler);
191199
return;
192200
}
193201
/* in this case, everything was fine, but there's still pending reads. We need to schedule a task to do the read
194202
* again. */
195-
if (!socket_handler->shutdown_in_progress && total_read == socket_handler->max_rw_size &&
196-
!socket_handler->read_task_storage.task_fn) {
203+
if (total_read == socket_handler->max_rw_size && !socket_handler->read_task_storage.task_fn) {
197204

198205
AWS_LOGF_TRACE(
199206
AWS_LS_IO_SOCKET_HANDLER,
@@ -212,17 +219,29 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
212219
(void)socket;
213220

214221
struct socket_handler *socket_handler = user_data;
215-
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET_HANDLER, "id=%p: socket is now readable", (void *)socket_handler->slot->handler);
216-
217-
/* read regardless so we can pick up data that was sent prior to the close. For example, peer sends a TLS ALERT
218-
* then immediately closes the socket. On some platforms, we'll never see the readable flag. So we want to make
222+
AWS_LOGF_TRACE(
223+
AWS_LS_IO_SOCKET_HANDLER,
224+
"id=%p: socket on-readable with error code %d(%s)",
225+
(void *)socket_handler->slot->handler,
226+
error_code,
227+
aws_error_name(error_code));
228+
229+
/* Regardless of error code call read() until it reports error or EOF,
230+
* so we can pick up data that was sent prior to the close.
231+
*
232+
* For example, if peer closes the socket immediately after sending the last
233+
* bytes of data, the READABLE and HANGUP events arrive simultaneously.
234+
*
235+
* Another example, peer sends a TLS ALERT then immediately closes the socket.
236+
* On some platforms, we'll never see the readable flag. So we want to make
219237
* sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket
220-
* closure, when in reality it was a TLS error */
238+
* closure, when in reality it was a TLS error
239+
*
240+
* It may take more than one read() to get all remaining data.
241+
* Also, if the downstream read-window reaches 0, we need to patiently
242+
* wait until the window opens before we can call read() again. */
243+
(void)error_code;
221244
s_do_read(socket_handler);
222-
223-
if (error_code && !socket_handler->shutdown_in_progress) {
224-
aws_channel_shutdown(socket_handler->slot->channel, error_code);
225-
}
226245
}
227246

228247
/* Either the result of a context switch (for fairness in the event loop), or a window update. */

source/windows/iocp/socket.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ static int s_determine_socket_error(int error) {
636636
case IO_STATUS_TIMEOUT:
637637
return AWS_IO_SOCKET_TIMEOUT;
638638
case IO_PIPE_BROKEN:
639+
case ERROR_BROKEN_PIPE:
639640
return AWS_IO_SOCKET_CLOSED;
640641
case STATUS_INVALID_ADDRESS_COMPONENT:
641642
case WSAEADDRNOTAVAIL:
@@ -2970,7 +2971,7 @@ static int s_tcp_read(struct aws_socket *socket, struct aws_byte_buf *buffer, si
29702971

29712972
AWS_LOGF_ERROR(
29722973
AWS_LS_IO_SOCKET,
2973-
"id=%p handle=%p: ReadFile() failed with error %d",
2974+
"id=%p handle=%p: recv() failed with error %d",
29742975
(void *)socket,
29752976
(void *)socket->io_handle.data.handle,
29762977
error);

tests/CMakeLists.txt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,28 @@ add_test_case(pem_sanitize_wrong_format_rejected)
123123

124124
add_test_case(socket_handler_echo_and_backpressure)
125125
add_test_case(socket_handler_close)
126+
# These tests fail on Windows due to some bug in our server code where, if the socket is closed
127+
# immediately after data is written, that data does not flush cleanly to the client.
128+
# I've lost days to this bug, and no one is using our Windows server funcionality,
129+
# so disabling these tests on Windows and moving along for now.
130+
# I tried the following:
131+
# 1) Wrote 2 simple standalone Windows programs, server and client, using simple synchronous socket code.
132+
# WORKED PERFECTLY. So it's not a fundamental issue with Windows.
133+
# 2) Commented out server part of this failing test, and used the simple standalone server instead.
134+
# WORKED PERFECTLY. So it's not a problem with our actual client code.
135+
# 3) Copy/pasted the simple standlone server code into this test, and used that instead of our actual server code.
136+
# WORKED PERFECTLY. So it's not a problem with the server and client sockets being in the same process.
137+
# 4) Commented out the client part of this failing test, and used the simple standalone client instead.
138+
# FAILED. The standalone client got WSAECONNRESET (Connection reset by peer) before receiving all the data.
139+
# So it's something with our complicated non-blocking server code.
140+
# The last interesting thing I noticed before giving up was: we call shutdown() immediately
141+
# before calling closesocket() but shutdown() gets error WSAENOTCONN, even
142+
# though, at that moment, the socket should be connected just fine.
143+
if(NOT WIN32)
144+
add_net_test_case(socket_handler_read_to_eof_after_peer_hangup)
145+
add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup)
146+
add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup)
147+
endif()
126148
add_test_case(socket_pinned_event_loop)
127149
add_net_test_case(socket_pinned_event_loop_dns_failure)
128150

tests/read_write_test_handler.c

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -190,42 +190,60 @@ struct rw_handler_write_task_args {
190190
struct aws_channel_slot *slot;
191191
struct aws_byte_buf *buffer;
192192
struct aws_channel_task task;
193+
aws_channel_on_message_write_completed_fn *on_completion;
194+
void *user_data;
193195
};
194196

195-
static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) {
196-
(void)task;
197-
(void)task_status;
198-
struct rw_handler_write_task_args *write_task_args = arg;
197+
static void s_rw_handler_write_now(
198+
struct aws_channel_slot *slot,
199+
struct aws_byte_buf *buffer,
200+
aws_channel_on_message_write_completed_fn *on_completion,
201+
void *user_data) {
199202

200-
struct aws_io_message *msg = aws_channel_acquire_message_from_pool(
201-
write_task_args->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, write_task_args->buffer->len);
203+
struct aws_io_message *msg =
204+
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len);
205+
206+
msg->on_completion = on_completion;
207+
msg->user_data = user_data;
202208

203-
struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(write_task_args->buffer);
204-
aws_byte_buf_append(&msg->message_data, &write_buffer);
209+
struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer);
210+
AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &write_buffer) == AWS_OP_SUCCESS);
205211

206-
aws_channel_slot_send_message(write_task_args->slot, msg, AWS_CHANNEL_DIR_WRITE);
212+
AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS);
213+
}
207214

215+
static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) {
216+
(void)task;
217+
(void)task_status;
218+
struct rw_handler_write_task_args *write_task_args = arg;
219+
s_rw_handler_write_now(
220+
write_task_args->slot, write_task_args->buffer, write_task_args->on_completion, write_task_args->user_data);
208221
aws_mem_release(write_task_args->handler->alloc, write_task_args);
209222
}
210223

211224
void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer) {
225+
rw_handler_write_with_callback(handler, slot, buffer, NULL /*on_completion*/, NULL /*user_data*/);
226+
}
227+
228+
void rw_handler_write_with_callback(
229+
struct aws_channel_handler *handler,
230+
struct aws_channel_slot *slot,
231+
struct aws_byte_buf *buffer,
232+
aws_channel_on_message_write_completed_fn *on_completion,
233+
void *user_data) {
212234

213235
struct rw_test_handler_impl *handler_impl = handler->impl;
214236

215237
if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) {
216-
struct aws_io_message *msg =
217-
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len);
218-
219-
struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer);
220-
aws_byte_buf_append(&msg->message_data, &write_buffer);
221-
222-
aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE);
238+
s_rw_handler_write_now(slot, buffer, on_completion, user_data);
223239
} else {
224240
struct rw_handler_write_task_args *write_task_args =
225241
aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args));
226242
write_task_args->handler = handler;
227243
write_task_args->buffer = buffer;
228244
write_task_args->slot = slot;
245+
write_task_args->on_completion = on_completion;
246+
write_task_args->user_data = user_data;
229247
aws_channel_task_init(&write_task_args->task, s_rw_handler_write_task, write_task_args, "rw_handler_write");
230248

231249
aws_channel_schedule_task_now(slot->channel, &write_task_args->task);

tests/read_write_test_handler.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ void rw_handler_enable_wait_on_destroy(
3636

3737
void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer);
3838

39+
void rw_handler_write_with_callback(
40+
struct aws_channel_handler *handler,
41+
struct aws_channel_slot *slot,
42+
struct aws_byte_buf *buffer,
43+
aws_channel_on_message_write_completed_fn *on_completion,
44+
void *user_data);
45+
3946
void rw_handler_trigger_read(struct aws_channel_handler *handler, struct aws_channel_slot *slot);
4047

4148
bool rw_handler_shutdown_called(struct aws_channel_handler *handler);

0 commit comments

Comments
 (0)