Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DatabaseClientImpl implements DatabaseClient {
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private final TraceWrapper tracer;
private Attributes commonAttributes;
private final Attributes databaseAttributes;
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
Expand Down Expand Up @@ -88,15 +88,15 @@ class DatabaseClientImpl implements DatabaseClient {
boolean useMultiplexedSessionPartitionedOps,
TraceWrapper tracer,
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
Attributes databaseAttributes) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps;
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;
this.databaseAttributes = databaseAttributes;

this.clientIdToOrdinalMap = new HashMap<String, Integer>();
this.dbId = this.dbIdFromClientId(this.clientId);
Expand Down Expand Up @@ -203,7 +203,7 @@ public Timestamp write(final Iterable<Mutation> mutations) throws SpannerExcepti
public CommitResponse writeWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
Expand All @@ -230,7 +230,7 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
Expand Down Expand Up @@ -260,7 +260,7 @@ int getNthRequest() {
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
Expand All @@ -278,7 +278,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(

@Override
public ReadContext singleUse() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse();
} catch (RuntimeException e) {
Expand All @@ -290,7 +290,7 @@ public ReadContext singleUse() {

@Override
public ReadContext singleUse(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse(bound);
} catch (RuntimeException e) {
Expand All @@ -302,7 +302,7 @@ public ReadContext singleUse(TimestampBound bound) {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -314,7 +314,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -326,7 +326,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {

@Override
public ReadOnlyTransaction readOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -338,7 +338,7 @@ public ReadOnlyTransaction readOnlyTransaction() {

@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -350,7 +350,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().readWriteTransaction(options);
} catch (RuntimeException e) {
Expand All @@ -362,7 +362,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {

@Override
public TransactionManager transactionManager(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManager(options);
} catch (RuntimeException e) {
Expand All @@ -374,7 +374,7 @@ public TransactionManager transactionManager(TransactionOption... options) {

@Override
public AsyncRunner runAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().runAsync(options);
} catch (RuntimeException e) {
Expand All @@ -386,7 +386,7 @@ public AsyncRunner runAsync(TransactionOption... options) {

@Override
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManagerAsync(options);
} catch (RuntimeException e) {
Expand Down Expand Up @@ -449,7 +449,7 @@ private TransactionOption[] withReqId(

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(
(session, reqId) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void run() {
List<SessionImpl> sessions;
int remainingSessionsToCreate = sessionCount;
ISpan span =
spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, commonAttributes);
spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, databaseAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
spanner
.getTracer()
Expand Down Expand Up @@ -185,7 +185,7 @@ interface SessionConsumer {
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
private final ScheduledExecutorService executor;
private final DatabaseId db;
private final Attributes commonAttributes;
private final Attributes databaseAttributes;

// SessionClient is created long before a DatabaseClientImpl is created,
// as batch sessions are firstly created then later attached to each Client.
Expand All @@ -204,7 +204,7 @@ interface SessionConsumer {
this.db = db;
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
this.databaseAttributes = spanner.getTracer().createDatabaseAttributes(db);
}

@Override
Expand Down Expand Up @@ -236,7 +236,8 @@ SessionImpl createSession() {
sessionChannelCounter++;
}
XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
ISpan span =
spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.databaseAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
Expand Down Expand Up @@ -289,7 +290,7 @@ SessionImpl createMultiplexedSession() {
ISpan span =
spanner
.getTracer()
.spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes);
.spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.databaseAttributes);
// MultiplexedSession doesn't use a channelId hence this hard-coded value.
int channelId = 0;
XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
multiplexedSessionDatabaseClient,
getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(),
useMultiplexedSessionForRW,
this.tracer.createCommonAttributes(db));
this.tracer.createDatabaseAttributes(db));
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -375,7 +375,7 @@ DatabaseClientImpl createDatabaseClient(
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient,
boolean useMultiplexedSessionPartitionedOps,
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
Attributes databaseAttributes) {
if (multiplexedSessionClient != null) {
// Set the session pool in the multiplexed session client.
// This is required to handle fallback to regular sessions for in-progress transactions that
Expand All @@ -390,7 +390,7 @@ DatabaseClientImpl createDatabaseClient(
useMultiplexedSessionPartitionedOps,
tracer,
useMultiplexedSessionForRW,
commonAttributes);
databaseAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class TraceWrapper {
private final Tracer openCensusTracer;
private final io.opentelemetry.api.trace.Tracer openTelemetryTracer;
private final boolean enableExtendedTracing;
private final Attributes commonAttributes;

TraceWrapper(
Tracer openCensusTracer,
Expand All @@ -68,20 +69,25 @@ class TraceWrapper {
this.openTelemetryTracer = openTelemetryTracer;
this.openCensusTracer = openCensusTracer;
this.enableExtendedTracing = enableExtendedTracing;
this.commonAttributes = createCommonAttributes();
}

ISpan spanBuilder(String spanName) {
return spanBuilder(spanName, Attributes.empty());
}

ISpan spanBuilder(String spanName, Attributes commonAttributes, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(commonAttributes, options));
ISpan spanBuilder(String spanName, Attributes attributes, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(attributes, options));
}

ISpan spanBuilder(String spanName, Attributes attributes) {
if (SpannerOptions.getActiveTracingFramework().equals(TracingFramework.OPEN_TELEMETRY)) {
return new OpenTelemetrySpan(
openTelemetryTracer.spanBuilder(spanName).setAllAttributes(attributes).startSpan());
openTelemetryTracer
.spanBuilder(spanName)
.setAllAttributes(attributes)
.setAllAttributes(commonAttributes)
.startSpan());
} else {
return new OpenCensusSpan(openCensusTracer.spanBuilder(spanName).startSpan());
}
Expand Down Expand Up @@ -209,10 +215,15 @@ Attributes createTableAttributes(String tableName, Options options) {
return builder.build();
}

Attributes createCommonAttributes(DatabaseId db) {
Attributes createDatabaseAttributes(DatabaseId db) {
AttributesBuilder builder = Attributes.builder();
builder.put(DB_NAME_KEY, db.getDatabase());
builder.put(INSTANCE_NAME_KEY, db.getInstanceId().getInstance());
return builder.build();
}

private Attributes createCommonAttributes() {
AttributesBuilder builder = Attributes.builder();
builder.put(GCP_CLIENT_SERVICE_KEY, "spanner");
builder.put(GCP_CLIENT_REPO_KEY, "googleapis/java-spanner");
builder.put(GCP_CLIENT_VERSION_KEY, GaxProperties.getLibraryVersion(TraceWrapper.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class XGoogSpannerRequestId {
@VisibleForTesting
static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId();

static String REQUEST_ID = "x-goog-spanner-request-id";
public static String REQUEST_ID = "x-goog-spanner-request-id";
public static final Metadata.Key<String> REQUEST_HEADER_KEY =
Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,17 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
Span span = Span.current();
DatabaseName databaseName = extractDatabaseName(headers);
String key = extractKey(databaseName, method.getFullMethodName());
String requestId = extractRequestId(headers);
TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName);
Attributes attributes =
getMetricAttributes(key, method.getFullMethodName(), databaseName);
Map<String, String> builtInMetricsAttributes =
getBuiltInMetricAttributes(key, databaseName);
builtInMetricsAttributes.put(
BuiltInMetricsConstant.REQUEST_ID_KEY.getKey(), extractRequestId(headers));
builtInMetricsAttributes.put(BuiltInMetricsConstant.REQUEST_ID_KEY.getKey(), requestId);
addBuiltInMetricAttributes(compositeTracer, builtInMetricsAttributes);
if (span != null) {
span.setAttribute(XGoogSpannerRequestId.REQUEST_ID, requestId);
}
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
Expand Down