Skip to content

Commit e2c7ad4

Browse files
SiyaoIsHidingabsurdfarce
authored andcommitted
CASSJAVA-97: Let users inject an ID for each request and write to the custom payload
patch by Jane He; reviewed by Abe Ratnofsky and Bret McGuire for CASSJAVA-97
1 parent ff2d7f2 commit e2c7ad4

File tree

22 files changed

+637
-59
lines changed

22 files changed

+637
-59
lines changed

core/revapi.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7407,6 +7407,11 @@
74077407
{
74087408
"code": "java.method.varargOverloadsOnlyDifferInVarargParameter",
74097409
"justification": "CASSJAVA-102: Migrate revapi config into dedicated config files, ported from pom.xml"
7410+
},
7411+
{
7412+
"code": "java.method.addedToInterface",
7413+
"new": "method java.util.Optional<com.datastax.oss.driver.api.core.tracker.RequestIdGenerator> com.datastax.oss.driver.api.core.context.DriverContext::getRequestIdGenerator()",
7414+
"justification": "CASSJAVA-97: Let users inject an ID for each request and write to the custom payload"
74107415
}
74117416
]
74127417
}

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,12 @@ public enum DefaultDriverOption implements DriverOption {
995995
* <p>Value-type: boolean
996996
*/
997997
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"),
998+
/**
999+
* The class of session-wide component that generates request IDs.
1000+
*
1001+
* <p>Value-type: {@link String}
1002+
*/
1003+
REQUEST_ID_GENERATOR_CLASS("advanced.request-id.generator.class"),
9981004
/**
9991005
* An address to always translate all node addresses to that same proxy hostname no matter what IP
10001006
* address a node has, but still using its native transport port.

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ public String toString() {
281281
new TypedDriverOption<>(
282282
DefaultDriverOption.REQUEST_TRACKER_CLASSES, GenericType.listOf(String.class));
283283

284+
/** The class of a session-wide component that generates request IDs. */
285+
public static final TypedDriverOption<String> REQUEST_ID_GENERATOR_CLASS =
286+
new TypedDriverOption<>(DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS, GenericType.STRING);
287+
284288
/** Whether to log successful requests. */
285289
public static final TypedDriverOption<Boolean> REQUEST_LOGGER_SUCCESS_ENABLED =
286290
new TypedDriverOption<>(

core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
3434
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
3535
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
36+
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
3637
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
3738
import edu.umd.cs.findbugs.annotations.NonNull;
3839
import java.util.Map;
@@ -139,6 +140,10 @@ default SpeculativeExecutionPolicy getSpeculativeExecutionPolicy(@NonNull String
139140
@NonNull
140141
RequestTracker getRequestTracker();
141142

143+
/** @return The driver's request ID generator; never {@code null}. */
144+
@NonNull
145+
Optional<RequestIdGenerator> getRequestIdGenerator();
146+
142147
/** @return The driver's request throttler; never {@code null}. */
143148
@NonNull
144149
RequestThrottler getRequestThrottler();

core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
2424
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
2525
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
26+
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
2627
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
2728
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
2829
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
@@ -59,6 +60,7 @@ public static Builder builder() {
5960
private final NodeStateListener nodeStateListener;
6061
private final SchemaChangeListener schemaChangeListener;
6162
private final RequestTracker requestTracker;
63+
private final RequestIdGenerator requestIdGenerator;
6264
private final Map<String, String> localDatacenters;
6365
private final Map<String, Predicate<Node>> nodeFilters;
6466
private final Map<String, NodeDistanceEvaluator> nodeDistanceEvaluators;
@@ -77,6 +79,7 @@ private ProgrammaticArguments(
7779
@Nullable NodeStateListener nodeStateListener,
7880
@Nullable SchemaChangeListener schemaChangeListener,
7981
@Nullable RequestTracker requestTracker,
82+
@Nullable RequestIdGenerator requestIdGenerator,
8083
@NonNull Map<String, String> localDatacenters,
8184
@NonNull Map<String, Predicate<Node>> nodeFilters,
8285
@NonNull Map<String, NodeDistanceEvaluator> nodeDistanceEvaluators,
@@ -94,6 +97,7 @@ private ProgrammaticArguments(
9497
this.nodeStateListener = nodeStateListener;
9598
this.schemaChangeListener = schemaChangeListener;
9699
this.requestTracker = requestTracker;
100+
this.requestIdGenerator = requestIdGenerator;
97101
this.localDatacenters = localDatacenters;
98102
this.nodeFilters = nodeFilters;
99103
this.nodeDistanceEvaluators = nodeDistanceEvaluators;
@@ -128,6 +132,11 @@ public RequestTracker getRequestTracker() {
128132
return requestTracker;
129133
}
130134

135+
@Nullable
136+
public RequestIdGenerator getRequestIdGenerator() {
137+
return requestIdGenerator;
138+
}
139+
131140
@NonNull
132141
public Map<String, String> getLocalDatacenters() {
133142
return localDatacenters;
@@ -196,6 +205,7 @@ public static class Builder {
196205
private NodeStateListener nodeStateListener;
197206
private SchemaChangeListener schemaChangeListener;
198207
private RequestTracker requestTracker;
208+
private RequestIdGenerator requestIdGenerator;
199209
private ImmutableMap.Builder<String, String> localDatacentersBuilder = ImmutableMap.builder();
200210
private final ImmutableMap.Builder<String, Predicate<Node>> nodeFiltersBuilder =
201211
ImmutableMap.builder();
@@ -294,6 +304,12 @@ public Builder addRequestTracker(@NonNull RequestTracker requestTracker) {
294304
return this;
295305
}
296306

307+
@NonNull
308+
public Builder withRequestIdGenerator(@Nullable RequestIdGenerator requestIdGenerator) {
309+
this.requestIdGenerator = requestIdGenerator;
310+
return this;
311+
}
312+
297313
@NonNull
298314
public Builder withLocalDatacenter(
299315
@NonNull String profileName, @NonNull String localDatacenter) {
@@ -417,6 +433,7 @@ public ProgrammaticArguments build() {
417433
nodeStateListener,
418434
schemaChangeListener,
419435
requestTracker,
436+
requestIdGenerator,
420437
localDatacentersBuilder.build(),
421438
nodeFiltersBuilder.build(),
422439
nodeDistanceEvaluatorsBuilder.build(),

core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
3636
import com.datastax.oss.driver.api.core.ssl.ProgrammaticSslEngineFactory;
3737
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
38+
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
3839
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
3940
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
4041
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
@@ -47,6 +48,7 @@
4748
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
4849
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
4950
import com.datastax.oss.driver.internal.core.session.DefaultSession;
51+
import com.datastax.oss.driver.internal.core.tracker.W3CContextRequestIdGenerator;
5052
import com.datastax.oss.driver.internal.core.util.concurrent.BlockingOperation;
5153
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
5254
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -83,6 +85,8 @@
8385
@NotThreadSafe
8486
public abstract class SessionBuilder<SelfT extends SessionBuilder, SessionT> {
8587

88+
public static final String ASTRA_PAYLOAD_KEY = "traceparent";
89+
8690
private static final Logger LOG = LoggerFactory.getLogger(SessionBuilder.class);
8791

8892
@SuppressWarnings("unchecked")
@@ -318,6 +322,17 @@ public SelfT addRequestTracker(@NonNull RequestTracker requestTracker) {
318322
return self;
319323
}
320324

325+
/**
326+
* Registers a request ID generator. The driver will use the generated ID in the logs and
327+
* optionally add to the custom payload so that users can correlate logs about the same request
328+
* from the Cassandra side.
329+
*/
330+
@NonNull
331+
public SelfT withRequestIdGenerator(@NonNull RequestIdGenerator requestIdGenerator) {
332+
this.programmaticArgumentsBuilder.withRequestIdGenerator(requestIdGenerator);
333+
return self;
334+
}
335+
321336
/**
322337
* Registers an authentication provider to use with the session.
323338
*
@@ -861,6 +876,13 @@ protected final CompletionStage<CqlSession> buildDefaultSessionAsync() {
861876
List<String> configContactPoints =
862877
defaultConfig.getStringList(DefaultDriverOption.CONTACT_POINTS, Collections.emptyList());
863878
if (cloudConfigInputStream != null) {
879+
// override request id generator, unless user has already set it
880+
if (programmaticArguments.getRequestIdGenerator() == null) {
881+
programmaticArgumentsBuilder.withRequestIdGenerator(
882+
new W3CContextRequestIdGenerator(ASTRA_PAYLOAD_KEY));
883+
LOG.debug(
884+
"A secure connect bundle is provided, using W3CContextRequestIdGenerator as request ID generator.");
885+
}
864886
if (!programmaticContactPoints.isEmpty() || !configContactPoints.isEmpty()) {
865887
LOG.info(
866888
"Both a secure connect bundle and contact points were provided. These are mutually exclusive. The contact points from the secure bundle will have priority.");
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.api.core.tracker;
19+
20+
import com.datastax.oss.driver.api.core.cql.Statement;
21+
import com.datastax.oss.driver.api.core.session.Request;
22+
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
23+
import edu.umd.cs.findbugs.annotations.NonNull;
24+
import java.nio.ByteBuffer;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.Map;
27+
28+
/**
29+
* Interface responsible for generating request IDs.
30+
*
31+
* <p>Note that all request IDs have a parent/child relationship. A "parent ID" can loosely be
32+
* thought of as encompassing a sequence of a request + any attendant retries, speculative
33+
* executions etc. It's scope is identical to that of a {@link
34+
* com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "request ID" represents a single
35+
* request within this larger scope. Note that a request corresponding to a request ID may be
36+
* retried; in that case the retry count will be appended to the corresponding identifier in the
37+
* logs.
38+
*/
39+
public interface RequestIdGenerator {
40+
41+
String DEFAULT_PAYLOAD_KEY = "request-id";
42+
43+
/**
44+
* Generates a unique identifier for the session request. This will be the identifier for the
45+
* entire `session.execute()` call. This identifier will be added to logs, and propagated to
46+
* request trackers.
47+
*
48+
* @return a unique identifier for the session request
49+
*/
50+
String getSessionRequestId();
51+
52+
/**
53+
* Generates a unique identifier for the node request. This will be the identifier for the CQL
54+
* request against a particular node. There can be one or more node requests for a single session
55+
* request, due to retries or speculative executions. This identifier will be added to logs, and
56+
* propagated to request trackers.
57+
*
58+
* @param statement the statement to be executed
59+
* @param parentId the session request identifier
60+
* @return a unique identifier for the node request
61+
*/
62+
String getNodeRequestId(@NonNull Request statement, @NonNull String parentId);
63+
64+
default String getCustomPayloadKey() {
65+
return DEFAULT_PAYLOAD_KEY;
66+
}
67+
68+
default Statement<?> getDecoratedStatement(
69+
@NonNull Statement<?> statement, @NonNull String requestId) {
70+
Map<String, ByteBuffer> customPayload =
71+
NullAllowingImmutableMap.<String, ByteBuffer>builder()
72+
.putAll(statement.getCustomPayload())
73+
.put(getCustomPayloadKey(), ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)))
74+
.build();
75+
return statement.setCustomPayload(customPayload);
76+
}
77+
}

core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,22 @@ default void onSuccess(
4747
@NonNull Node node) {}
4848

4949
/**
50-
* Invoked each time a request succeeds.
50+
* Invoked each time a session request succeeds. A session request is a `session.execute()` call
5151
*
5252
* @param latencyNanos the overall execution time (from the {@link Session#execute(Request,
5353
* GenericType) session.execute} call until the result is made available to the client).
5454
* @param executionProfile the execution profile of this request.
5555
* @param node the node that returned the successful response.
56-
* @param requestLogPrefix the dedicated log prefix for this request
56+
* @param sessionRequestLogPrefix the dedicated log prefix for this request
5757
*/
5858
default void onSuccess(
5959
@NonNull Request request,
6060
long latencyNanos,
6161
@NonNull DriverExecutionProfile executionProfile,
6262
@NonNull Node node,
63-
@NonNull String requestLogPrefix) {
64-
// If client doesn't override onSuccess with requestLogPrefix delegate call to the old method
63+
@NonNull String sessionRequestLogPrefix) {
64+
// If client doesn't override onSuccess with sessionRequestLogPrefix delegate call to the old
65+
// method
6566
onSuccess(request, latencyNanos, executionProfile, node);
6667
}
6768

@@ -78,22 +79,23 @@ default void onError(
7879
@Nullable Node node) {}
7980

8081
/**
81-
* Invoked each time a request fails.
82+
* Invoked each time a session request fails. A session request is a `session.execute()` call
8283
*
8384
* @param latencyNanos the overall execution time (from the {@link Session#execute(Request,
8485
* GenericType) session.execute} call until the error is propagated to the client).
8586
* @param executionProfile the execution profile of this request.
8687
* @param node the node that returned the error response, or {@code null} if the error occurred
87-
* @param requestLogPrefix the dedicated log prefix for this request
88+
* @param sessionRequestLogPrefix the dedicated log prefix for this request
8889
*/
8990
default void onError(
9091
@NonNull Request request,
9192
@NonNull Throwable error,
9293
long latencyNanos,
9394
@NonNull DriverExecutionProfile executionProfile,
9495
@Nullable Node node,
95-
@NonNull String requestLogPrefix) {
96-
// If client doesn't override onError with requestLogPrefix delegate call to the old method
96+
@NonNull String sessionRequestLogPrefix) {
97+
// If client doesn't override onError with sessionRequestLogPrefix delegate call to the old
98+
// method
9799
onError(request, error, latencyNanos, executionProfile, node);
98100
}
99101

@@ -110,23 +112,25 @@ default void onNodeError(
110112
@NonNull Node node) {}
111113

112114
/**
113-
* Invoked each time a request fails at the node level. Similar to {@link #onError(Request,
114-
* Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level.
115+
* Invoked each time a node request fails. A node request is a CQL request sent to a particular
116+
* node. There can be one or more node requests for a single session request, due to retries or
117+
* speculative executions.
115118
*
116119
* @param latencyNanos the overall execution time (from the {@link Session#execute(Request,
117120
* GenericType) session.execute} call until the error is propagated to the client).
118121
* @param executionProfile the execution profile of this request.
119122
* @param node the node that returned the error response.
120-
* @param requestLogPrefix the dedicated log prefix for this request
123+
* @param nodeRequestLogPrefix the dedicated log prefix for this request
121124
*/
122125
default void onNodeError(
123126
@NonNull Request request,
124127
@NonNull Throwable error,
125128
long latencyNanos,
126129
@NonNull DriverExecutionProfile executionProfile,
127130
@NonNull Node node,
128-
@NonNull String requestLogPrefix) {
129-
// If client doesn't override onNodeError with requestLogPrefix delegate call to the old method
131+
@NonNull String nodeRequestLogPrefix) {
132+
// If client doesn't override onNodeError with nodeRequestLogPrefix delegate call to the old
133+
// method
130134
onNodeError(request, error, latencyNanos, executionProfile, node);
131135
}
132136

@@ -142,22 +146,23 @@ default void onNodeSuccess(
142146
@NonNull Node node) {}
143147

144148
/**
145-
* Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request,
146-
* long, DriverExecutionProfile, Node, String)} but at per node level.
149+
* Invoked each time a node request succeeds. A node request is a CQL request sent to a particular
150+
* node. There can be one or more node requests for a single session request, due to retries or
151+
* speculative executions.
147152
*
148153
* @param latencyNanos the overall execution time (from the {@link Session#execute(Request,
149154
* GenericType) session.execute} call until the result is made available to the client).
150155
* @param executionProfile the execution profile of this request.
151156
* @param node the node that returned the successful response.
152-
* @param requestLogPrefix the dedicated log prefix for this request
157+
* @param nodeRequestLogPrefix the dedicated log prefix for this request
153158
*/
154159
default void onNodeSuccess(
155160
@NonNull Request request,
156161
long latencyNanos,
157162
@NonNull DriverExecutionProfile executionProfile,
158163
@NonNull Node node,
159-
@NonNull String requestLogPrefix) {
160-
// If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old
164+
@NonNull String nodeRequestLogPrefix) {
165+
// If client doesn't override onNodeSuccess with nodeRequestLogPrefix delegate call to the old
161166
// method
162167
onNodeSuccess(request, latencyNanos, executionProfile, node);
163168
}

0 commit comments

Comments
 (0)