Skip to content

CDRIVER-4487 server selection logging #1821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
21 commits merged into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1a49fc6
sync server-selection spec tests
Nov 20, 2024
fc6658b
Add server_selection/logging to unified test runner
Nov 20, 2024
f5240eb
unified runner: add topologyDescriptionChangedEvent
Nov 20, 2024
41a35a7
unified tests: URI parameters need to be case insensitive
Nov 20, 2024
73659a3
more serialization for topology descriptions
Nov 26, 2024
111aee5
test_check_event: previousDescription and newDescription fields
Dec 5, 2024
524923b
Structured logs for server selection started/ended
Dec 5, 2024
85d3c7c
log message: waiting for suitable server to become available
Dec 5, 2024
253c624
fix duplicated comment line
Jan 9, 2025
dce2f83
comment, mention MONGOC_SERVER_DESCRIPTION_CONTENT_FLAG_ADDRESS
Jan 9, 2025
37dd5b1
Comment typo
Jan 9, 2025
bacb1f9
review feedback, inline "operation = cmd_name"
Jan 9, 2025
a021a3c
read prefs hedge can't be null
Jan 9, 2025
24175b9
Feedback, runtime checks on command type in _mongoc_write_command_get…
Jan 10, 2025
49c0709
mongoc_read_prefs_get_tags also shouldn't return null
Jan 10, 2025
52bac8e
simplify conditional in mongoc_topology_select_server_id exit
Jan 10, 2025
08ee4f4
TEST_SS_LOG_CONTEXT in test-conveniences as an arbitrary server selec…
Jan 10, 2025
babe503
Unused opts in _mongoc_structured_log_append_read_prefs
Jan 10, 2025
8b3fee6
Note and temporary INT_MAX clamp for truncation in topology_as_descri…
Jan 10, 2025
0225f4c
Update src/libmongoc/tests/test-conveniences.h
Jan 10, 2025
aa88778
Merge branch 'master' into CDRIVER-4487
Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build/generate-future-functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@
typedef("const_mongoc_read_prefs_ptr", "const mongoc_read_prefs_t *"),
typedef("const_mongoc_write_concern_ptr",
"const mongoc_write_concern_t *"),
typedef("const_mongoc_ss_log_context_ptr",
"const mongoc_ss_log_context_t *"),
]

type_list = [T.name for T in typedef_list]
Expand Down Expand Up @@ -453,6 +455,7 @@
"mongoc_topology_select",
[param("mongoc_topology_ptr", "topology"),
param("mongoc_ss_optype_t", "optype"),
param("const_mongoc_ss_log_context_ptr", "log_context"),
param("const_mongoc_read_prefs_ptr", "read_prefs"),
param("bool_ptr", "must_use_primary"),
param("bson_error_ptr", "error")]),
Expand Down
3 changes: 2 additions & 1 deletion src/libmongoc/src/mongoc/mongoc-aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ _mongoc_aggregate (mongoc_client_t *client,
cursor->is_aggr_with_write_stage = has_write_key;

/* server id isn't enough. ensure we're connected & know wire version */
server_stream = _mongoc_cursor_fetch_stream (cursor);
const mongoc_ss_log_context_t ss_log_context = {.operation = "aggregate"};
server_stream = _mongoc_cursor_fetch_stream (cursor, &ss_log_context);
if (!server_stream) {
GOTO (done);
}
Expand Down
9 changes: 6 additions & 3 deletions src/libmongoc/src/mongoc/mongoc-bulk-operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -777,20 +777,23 @@ mongoc_bulk_operation_execute (mongoc_bulk_operation_t *bulk, /* IN */
}

for (size_t i = 0u; i < bulk->commands.len; i++) {
command = &_mongoc_array_index (&bulk->commands, mongoc_write_command_t, i);

if (bulk->server_id) {
server_stream = mongoc_cluster_stream_for_server (
cluster, bulk->server_id, true /* reconnect_ok */, bulk->session, reply, error);
} else {
server_stream = mongoc_cluster_stream_for_writes (cluster, bulk->session, NULL, reply, error);
const mongoc_ss_log_context_t ss_log_context = {.operation = _mongoc_write_command_get_name (command),
.has_operation_id = true,
.operation_id = command->operation_id};
server_stream = mongoc_cluster_stream_for_writes (cluster, &ss_log_context, bulk->session, NULL, reply, error);
}

if (!server_stream) {
/* stream_for_server and stream_for_writes initialize reply on error */
RETURN (false);
}

command = &_mongoc_array_index (&bulk->commands, mongoc_write_command_t, i);

_mongoc_write_command_execute (command,
bulk->client,
server_stream,
Expand Down
13 changes: 10 additions & 3 deletions src/libmongoc/src/mongoc/mongoc-bulkwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,9 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
goto fail;
}

const mongoc_ss_log_context_t ss_log_context = {
.operation = "bulkWrite", .has_operation_id = true, .operation_id = self->operation_id};

// Select a stream.
{
bson_t reply;
Expand All @@ -1566,7 +1569,7 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
&self->client->cluster, opts->serverid, true /* reconnect_ok */, self->session, &reply, &error);
} else {
ss = mongoc_cluster_stream_for_writes (
&self->client->cluster, self->session, NULL /* deprioritized servers */, &reply, &error);
&self->client->cluster, &ss_log_context, self->session, NULL /* deprioritized servers */, &reply, &error);
}

if (!ss) {
Expand Down Expand Up @@ -1814,8 +1817,12 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
bson_t reply;
// Select a server and create a stream again.
mongoc_server_stream_cleanup (ss);
ss = mongoc_cluster_stream_for_writes (
&self->client->cluster, NULL /* session */, NULL /* deprioritized servers */, &reply, &error);
ss = mongoc_cluster_stream_for_writes (&self->client->cluster,
&ss_log_context,
NULL /* session */,
NULL /* deprioritized servers */,
&reply,
&error);

if (ss) {
parts.assembled.server_stream = ss;
Expand Down
5 changes: 3 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-change-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ _make_cursor (mongoc_change_stream_t *stream)
goto cleanup;
}

server_stream =
mongoc_cluster_stream_for_reads (&stream->client->cluster, stream->read_prefs, cs, NULL, &reply, &stream->err);
const mongoc_ss_log_context_t ss_log_context = {.operation = "aggregate"};
server_stream = mongoc_cluster_stream_for_reads (
&stream->client->cluster, &ss_log_context, stream->read_prefs, cs, NULL, &reply, &stream->err);
if (!server_stream) {
bson_destroy (&stream->err_doc);
bson_copy_to (&reply, &stream->err_doc);
Expand Down
4 changes: 3 additions & 1 deletion src/libmongoc/src/mongoc/mongoc-client-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
bson_error_t *error);

mongoc_server_session_t *
_mongoc_client_pop_server_session (mongoc_client_t *client, bson_error_t *error);
_mongoc_client_pop_server_session (mongoc_client_t *client,
const mongoc_ss_log_context_t *log_context,
bson_error_t *error);

bool
_mongoc_client_lookup_session (const mongoc_client_t *client,
Expand Down
3 changes: 2 additions & 1 deletion src/libmongoc/src/mongoc/mongoc-client-session.c
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,9 @@ mongoc_client_session_start_transaction (mongoc_client_session_t *session,
BSON_ASSERT (session);

ret = true;
const mongoc_ss_log_context_t ss_log_context = {.operation = "mongoc_client_session_start_transaction"};
server_stream = mongoc_cluster_stream_for_writes (
&session->client->cluster, session, NULL /* deprioritized servers */, NULL /* reply */, error);
&session->client->cluster, &ss_log_context, session, NULL /* deprioritized servers */, NULL /* reply */, error);
if (!server_stream) {
ret = false;
GOTO (done);
Expand Down
40 changes: 29 additions & 11 deletions src/libmongoc/src/mongoc/mongoc-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,8 @@ mongoc_client_start_session (mongoc_client_t *client, const mongoc_session_opt_t

ENTRY;

ss = _mongoc_client_pop_server_session (client, error);
const mongoc_ss_log_context_t ss_log_context = {.operation = "startSession"};
ss = _mongoc_client_pop_server_session (client, &ss_log_context, error);
if (!ss) {
RETURN (NULL);
}
Expand Down Expand Up @@ -1661,8 +1662,13 @@ _mongoc_client_retryable_read_command_with_stream (mongoc_client_t *client,
mongoc_deprioritized_servers_add_if_sharded (ds, server_stream->topology_type, server_stream->sd);
}

const mongoc_ss_log_context_t ss_log_context = {
.operation = parts->assembled.command_name,
.has_operation_id = true,
.operation_id = parts->assembled.operation_id,
};
retry_server_stream = mongoc_cluster_stream_for_reads (
&client->cluster, parts->read_prefs, parts->assembled.session, ds, NULL, &ignored_error);
&client->cluster, &ss_log_context, parts->read_prefs, parts->assembled.session, ds, NULL, &ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}
Expand Down Expand Up @@ -1761,7 +1767,8 @@ mongoc_client_command_simple (mongoc_client_t *client,
* configuration. The generic command method SHOULD allow an optional read
* preference argument."
*/
server_stream = mongoc_cluster_stream_for_reads (cluster, read_prefs, NULL, NULL, reply, error);
const mongoc_ss_log_context_t ss_log_context = {.operation = _mongoc_get_command_name (command)};
server_stream = mongoc_cluster_stream_for_reads (cluster, &ss_log_context, read_prefs, NULL, NULL, reply, error);

if (server_stream) {
ret = _mongoc_client_command_with_stream (client, &parts, read_prefs, server_stream, reply, error);
Expand Down Expand Up @@ -1895,6 +1902,7 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
prefs = NULL;
}

const mongoc_ss_log_context_t ss_log_context = {.operation = command_name};
if (read_write_opts.serverId) {
/* "serverId" passed in opts */
server_stream = mongoc_cluster_stream_for_server (
Expand All @@ -1904,9 +1912,9 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
parts.user_query_flags |= MONGOC_QUERY_SECONDARY_OK;
}
} else if (parts.is_write_command) {
server_stream = mongoc_cluster_stream_for_writes (cluster, cs, NULL, reply_ptr, error);
server_stream = mongoc_cluster_stream_for_writes (cluster, &ss_log_context, cs, NULL, reply_ptr, error);
} else {
server_stream = mongoc_cluster_stream_for_reads (cluster, prefs, cs, NULL, reply_ptr, error);
server_stream = mongoc_cluster_stream_for_reads (cluster, &ss_log_context, prefs, cs, NULL, reply_ptr, error);
}

if (!server_stream) {
Expand Down Expand Up @@ -2691,7 +2699,8 @@ mongoc_client_select_server (mongoc_client_t *client,
return NULL;
}

sd = mongoc_topology_select (client->topology, optype, prefs, NULL /* chosen read mode */, error);
const mongoc_ss_log_context_t ss_log_context = {.operation = "mongoc_client_select_server"};
sd = mongoc_topology_select (client->topology, optype, &ss_log_context, prefs, NULL /* chosen read mode */, error);
if (!sd) {
return NULL;
}
Expand All @@ -2703,7 +2712,7 @@ mongoc_client_select_server (mongoc_client_t *client,

/* check failed, retry once */
mongoc_server_description_destroy (sd);
sd = mongoc_topology_select (client->topology, optype, prefs, NULL /* chosen read mode */, error);
sd = mongoc_topology_select (client->topology, optype, &ss_log_context, prefs, NULL /* chosen read mode */, error);
if (sd) {
return sd;
}
Expand Down Expand Up @@ -2752,11 +2761,13 @@ mongoc_client_set_appname (mongoc_client_t *client, const char *appname)
}

mongoc_server_session_t *
_mongoc_client_pop_server_session (mongoc_client_t *client, bson_error_t *error)
_mongoc_client_pop_server_session (mongoc_client_t *client,
const mongoc_ss_log_context_t *log_context,
bson_error_t *error)
{
BSON_ASSERT_PARAM (client);

return _mongoc_topology_pop_server_session (client->topology, error);
return _mongoc_topology_pop_server_session (client->topology, log_context, error);
}

/*
Expand Down Expand Up @@ -2845,8 +2856,15 @@ _mongoc_client_end_sessions (mongoc_client_t *client)

while (!mongoc_server_session_pool_is_empty (t->session_pool)) {
prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED);
server_id = mongoc_topology_select_server_id (
t, MONGOC_SS_READ, prefs, NULL /* chosen read mode */, NULL /* deprioritized servers */, &error);
const mongoc_ss_log_context_t ss_log_context = {
.operation = "endSessions", .has_operation_id = true, .operation_id = 1 + cluster->operation_id};
server_id = mongoc_topology_select_server_id (t,
MONGOC_SS_READ,
&ss_log_context,
prefs,
NULL /* chosen read mode */,
NULL /* deprioritized servers */,
&error);

mongoc_read_prefs_destroy (prefs);
if (!server_id) {
Expand Down
3 changes: 3 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-cluster-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ mongoc_cluster_try_recv (mongoc_cluster_t *cluster,
*/
mongoc_server_stream_t *
mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_ss_log_context_t *log_context,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
Expand All @@ -139,6 +140,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
*/
mongoc_server_stream_t *
mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
const mongoc_ss_log_context_t *log_context,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
Expand All @@ -158,6 +160,7 @@ mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
*/
mongoc_server_stream_t *
mongoc_cluster_stream_for_aggr_with_write (mongoc_cluster_t *cluster,
const mongoc_ss_log_context_t *log_context,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
bson_t *reply,
Expand Down
30 changes: 21 additions & 9 deletions src/libmongoc/src/mongoc/mongoc-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2449,6 +2449,7 @@ static uint32_t
_mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
mongoc_topology_t *topology,
mongoc_ss_optype_t optype,
const mongoc_ss_log_context_t *log_context,
const mongoc_read_prefs_t *read_prefs,
bool *must_use_primary,
const mongoc_deprioritized_servers_t *ds,
Expand All @@ -2465,13 +2466,15 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
if (_in_sharded_txn (cs)) {
server_id = cs->server_id;
if (!server_id) {
server_id = mongoc_topology_select_server_id (topology, optype, read_prefs, must_use_primary, ds, error);
server_id =
mongoc_topology_select_server_id (topology, optype, log_context, read_prefs, must_use_primary, ds, error);
if (server_id) {
_mongoc_client_session_pin (cs, server_id);
}
}
} else {
server_id = mongoc_topology_select_server_id (topology, optype, read_prefs, must_use_primary, ds, error);
server_id =
mongoc_topology_select_server_id (topology, optype, log_context, read_prefs, must_use_primary, ds, error);
/* Transactions Spec: Additionally, any non-transaction operation using a
* pinned ClientSession MUST unpin the session and the operation MUST
* perform normal server selection. */
Expand Down Expand Up @@ -2504,6 +2507,7 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
static mongoc_server_stream_t *
_mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
mongoc_ss_optype_t optype,
const mongoc_ss_log_context_t *log_context,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
bool is_retryable,
Expand All @@ -2526,7 +2530,8 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,

BSON_ASSERT (cluster);

server_id = _mongoc_cluster_select_server_id (cs, topology, optype, read_prefs, &must_use_primary, ds, error);
server_id =
_mongoc_cluster_select_server_id (cs, topology, optype, log_context, read_prefs, &must_use_primary, ds, error);

if (!server_id) {
if (reply) {
Expand All @@ -2538,7 +2543,8 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,

if (!mongoc_cluster_check_interval (cluster, server_id)) {
/* Server Selection Spec: try once more */
server_id = _mongoc_cluster_select_server_id (cs, topology, optype, read_prefs, &must_use_primary, ds, error);
server_id =
_mongoc_cluster_select_server_id (cs, topology, optype, log_context, read_prefs, &must_use_primary, ds, error);

if (!server_id) {
if (reply) {
Expand Down Expand Up @@ -2610,6 +2616,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,

mongoc_server_stream_t *
mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_ss_log_context_t *log_context,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
Expand All @@ -2626,11 +2633,12 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
mongoc_uri_get_option_as_bool (cluster->uri, MONGOC_URI_RETRYREADS, MONGOC_DEFAULT_RETRYREADS);

return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_READ, prefs_override, cs, is_retryable, ds, reply, error);
cluster, MONGOC_SS_READ, log_context, prefs_override, cs, is_retryable, ds, reply, error);
}

mongoc_server_stream_t *
mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
const mongoc_ss_log_context_t *log_context,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
Expand All @@ -2639,11 +2647,13 @@ mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
const bool is_retryable =
mongoc_uri_get_option_as_bool (cluster->uri, MONGOC_URI_RETRYWRITES, MONGOC_DEFAULT_RETRYWRITES);

return _mongoc_cluster_stream_for_optype (cluster, MONGOC_SS_WRITE, NULL, cs, is_retryable, ds, reply, error);
return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_WRITE, log_context, NULL, cs, is_retryable, ds, reply, error);
}

mongoc_server_stream_t *
mongoc_cluster_stream_for_aggr_with_write (mongoc_cluster_t *cluster,
const mongoc_ss_log_context_t *log_context,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
bson_t *reply,
Expand All @@ -2656,7 +2666,7 @@ mongoc_cluster_stream_for_aggr_with_write (mongoc_cluster_t *cluster,
mongoc_uri_get_option_as_bool (cluster->uri, MONGOC_URI_RETRYWRITES, MONGOC_DEFAULT_RETRYWRITES);

return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_AGGREGATE_WITH_WRITE, prefs_override, cs, is_retryable, NULL, reply, error);
cluster, MONGOC_SS_AGGREGATE_WITH_WRITE, log_context, prefs_override, cs, is_retryable, NULL, reply, error);
}

static bool
Expand Down Expand Up @@ -3615,8 +3625,10 @@ mongoc_cluster_run_retryable_write (mongoc_cluster_t *cluster,
// If talking to a sharded cluster, deprioritize the just-used mongos to prefer a new mongos for the retry.
mongoc_deprioritized_servers_add_if_sharded (ds, cmd->server_stream->topology_type, cmd->server_stream->sd);

*retry_server_stream =
mongoc_cluster_stream_for_writes (cluster, cmd->session, ds, NULL /* reply */, &ignored_error);
const mongoc_ss_log_context_t ss_log_context = {
.operation = cmd->command_name, .has_operation_id = true, .operation_id = cmd->operation_id};
*retry_server_stream = mongoc_cluster_stream_for_writes (
cluster, &ss_log_context, cmd->session, ds, NULL /* reply */, &ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}
Expand Down
Loading