Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions include/aws/http/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ typedef void(aws_http2_on_remote_settings_change_fn)(
size_t num_settings,
void *user_data);

/**
* Callback invoked on each statistics sample.
*
* connection_nonce is unique to each connection for disambiguation of each callback per connection.
*/
typedef void(
aws_http_statistics_observer_fn)(size_t connection_nonce, const struct aws_array_list *stats_list, void *user_data);

/**
* Configuration options for connection monitoring
*/
Expand All @@ -121,6 +129,17 @@ struct aws_http_connection_monitoring_options {
* as unhealthy.
*/
uint32_t allowable_throughput_failure_interval_seconds;

/**
* invoked on each statistics publish by the underlying IO channel. Install this callback to receive the statistics
* for observation. This field is optional.
*/
aws_http_statistics_observer_fn *statistics_observer_fn;

/**
* user_data to be passed to statistics_observer_fn.
*/
void *statistics_observer_user_data;
};

/**
Expand Down
5 changes: 5 additions & 0 deletions source/connection_monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ static void s_process_statistics(
}
}

if (impl->options.statistics_observer_fn) {
impl->options.statistics_observer_fn(
(size_t)(uintptr_t)(context), stats_list, impl->options.statistics_observer_user_data);
}

struct aws_channel *channel = context;

uint64_t bytes_per_second = 0;
Expand Down
40 changes: 40 additions & 0 deletions tests/test_connection_monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,52 @@ static struct http_monitor_test_stats_event s_test_rw_above_events[] = {
.expected_throughput = 1000,
},
};

struct observer_cb_data {
bool invoked;
size_t nonce;
size_t number_of_stats;
struct aws_crt_statistics_socket socket_stats;
struct aws_crt_statistics_http1_channel http_stats;
};
static void s_observer_cb(size_t connection_nonce, const struct aws_array_list *stats, void *user_data) {
struct observer_cb_data *cb_data = user_data;
cb_data->invoked = true;
cb_data->nonce = connection_nonce;
cb_data->number_of_stats = aws_array_list_length(stats);

for (size_t i = 0; i < cb_data->number_of_stats; ++i) {
struct aws_crt_statistics_base *base_ptr = NULL;
aws_array_list_get_at(stats, (void **)&base_ptr, i);

if (base_ptr->category == AWSCRT_STAT_CAT_SOCKET) {
cb_data->socket_stats = *(struct aws_crt_statistics_socket *)base_ptr;
}

if (base_ptr->category == AWSCRT_STAT_CAT_HTTP1_CHANNEL) {
cb_data->http_stats = *(struct aws_crt_statistics_http1_channel *)base_ptr;
}
}
}
static int s_test_http_connection_monitor_rw_above(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

struct observer_cb_data cb_data;
AWS_ZERO_STRUCT(cb_data);
s_test_options.statistics_observer_fn = s_observer_cb;
s_test_options.statistics_observer_user_data = &cb_data;
int result = s_do_http_monitoring_test(
allocator, &s_test_options, s_test_rw_above_events, AWS_ARRAY_SIZE(s_test_rw_above_events));
ASSERT_TRUE(result == AWS_OP_SUCCESS);
ASSERT_TRUE(cb_data.invoked);
ASSERT_TRUE(cb_data.nonce > 0);
ASSERT_UINT_EQUALS(2U, cb_data.number_of_stats);
ASSERT_UINT_EQUALS(s_test_rw_above_events[0].socket_stats.bytes_written, cb_data.socket_stats.bytes_written);
ASSERT_UINT_EQUALS(s_test_rw_above_events[0].socket_stats.bytes_read, cb_data.socket_stats.bytes_read);
ASSERT_UINT_EQUALS(
s_test_rw_above_events[0].http_stats.current_outgoing_stream_id, cb_data.http_stats.current_outgoing_stream_id);
ASSERT_UINT_EQUALS(
s_test_rw_above_events[0].http_stats.current_incoming_stream_id, cb_data.http_stats.current_incoming_stream_id);

return AWS_OP_SUCCESS;
}
Expand Down