Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 52 additions & 62 deletions source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,83 +303,73 @@ struct channel_shutdown_task_args {

static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately);

static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);

static void s_shutdown_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {

(void)task;

(void)status;
struct shutdown_task *shutdown_task = arg;
struct aws_channel *channel = shutdown_task->channel;
int error_code = shutdown_task->error_code;
bool shutdown_immediately = shutdown_task->shutdown_immediately;
if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);

if (status == AWS_TASK_STATUS_RUN_READY) {
s_channel_shutdown(shutdown_task->channel, shutdown_task->error_code, shutdown_task->shutdown_immediately);
}
}

static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);

static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
if (aws_channel_thread_is_callers_thread(channel)) {
if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);

struct aws_channel_slot *slot = channel->first;
channel->channel_state = AWS_CHANNEL_SHUTTING_DOWN;
struct aws_channel_slot *slot = channel->first;
channel->channel_state = AWS_CHANNEL_SHUTTING_DOWN;

if (slot) {
AWS_LOGF_TRACE(
AWS_LS_IO_CHANNEL,
"id=%p: shutting down slot %p (the first one) in the read direction",
(void *)channel,
(void *)slot);
if (slot) {
AWS_LOGF_TRACE(
AWS_LS_IO_CHANNEL,
"id=%p: shutting down slot %p (the first one) in the read direction",
(void *)channel,
(void *)slot);

return aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
}
aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
return;
}

channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: shutdown completed", (void *)channel);
channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: shutdown completed", (void *)channel);

aws_mutex_lock(&channel->cross_thread_tasks.lock);
channel->cross_thread_tasks.is_channel_shut_down = true;
aws_mutex_unlock(&channel->cross_thread_tasks.lock);
aws_mutex_lock(&channel->cross_thread_tasks.lock);
channel->cross_thread_tasks.is_channel_shut_down = true;
aws_mutex_unlock(&channel->cross_thread_tasks.lock);

if (channel->on_shutdown_completed) {
channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
channel->shutdown_notify_task.task.arg = channel;
channel->shutdown_notify_task.error_code = error_code;
aws_event_loop_schedule_task_now(channel->loop, &channel->shutdown_notify_task.task);
}
if (channel->on_shutdown_completed) {
channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
channel->shutdown_notify_task.task.arg = channel;
channel->shutdown_notify_task.error_code = error_code;
aws_event_loop_schedule_task_now(channel->loop, &channel->shutdown_notify_task.task);
}
} else {
AWS_LOGF_TRACE(
AWS_LS_IO_CHANNEL,
"id=%p: channel shutdown called from outside the "
"event-loop thread, scheduling task.",
(void *)channel);
}
}

bool need_to_schedule = true;
aws_mutex_lock(&channel->cross_thread_tasks.lock);
if (channel->cross_thread_tasks.shutdown_task.task.task_fn) {
need_to_schedule = false;
AWS_LOGF_DEBUG(
AWS_LS_IO_CHANNEL,
"id=%p: Channel shutdown is already pending, not scheduling another.",
(void *)channel);
static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
bool need_to_schedule = true;
aws_mutex_lock(&channel->cross_thread_tasks.lock);
if (channel->cross_thread_tasks.shutdown_task.task.task_fn) {
need_to_schedule = false;
AWS_LOGF_DEBUG(
AWS_LS_IO_CHANNEL, "id=%p: Channel shutdown is already pending, not scheduling another.", (void *)channel);

} else {
aws_channel_task_init(
&channel->cross_thread_tasks.shutdown_task.task,
s_shutdown_task,
&channel->cross_thread_tasks.shutdown_task,
"channel_cross_thread_shutdown");
channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
channel->cross_thread_tasks.shutdown_task.channel = channel;
channel->cross_thread_tasks.shutdown_task.error_code = error_code;
}
} else {
aws_channel_task_init(
&channel->cross_thread_tasks.shutdown_task.task,
s_shutdown_task,
&channel->cross_thread_tasks.shutdown_task,
"channel_shutdown");
channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
channel->cross_thread_tasks.shutdown_task.channel = channel;
channel->cross_thread_tasks.shutdown_task.error_code = error_code;
}

aws_mutex_unlock(&channel->cross_thread_tasks.lock);
aws_mutex_unlock(&channel->cross_thread_tasks.lock);

if (need_to_schedule) {
aws_channel_schedule_task_now(channel, &channel->cross_thread_tasks.shutdown_task.task);
}
if (need_to_schedule) {
AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: channel shutdown task is scheduled", (void *)channel);
aws_channel_schedule_task_now(channel, &channel->cross_thread_tasks.shutdown_task.task);
}

return AWS_OP_SUCCESS;
Expand Down