Skip to content

Commit 93ca22e

Browse files
committed
chore: route multi-use read-only txn reads via location-aware cache
1 parent 95ac7a7 commit 93ca22e

File tree

3 files changed

+294
-6
lines changed

3 files changed

+294
-6
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
7474
reqBuilder.getRoutingHintBuilder());
7575
}
7676

77+
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder, boolean preferLeader) {
78+
recipeCache.computeKeys(reqBuilder);
79+
return fillRoutingHint(
80+
preferLeader,
81+
KeyRangeCache.RangeMode.COVERING_SPLIT,
82+
reqBuilder.getDirectedReadOptions(),
83+
reqBuilder.getRoutingHintBuilder());
84+
}
85+
7786
public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) {
7887
recipeCache.computeKeys(reqBuilder);
7988
return fillRoutingHint(
@@ -83,6 +92,15 @@ public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) {
8392
reqBuilder.getRoutingHintBuilder());
8493
}
8594

95+
public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean preferLeader) {
96+
recipeCache.computeKeys(reqBuilder);
97+
return fillRoutingHint(
98+
preferLeader,
99+
KeyRangeCache.RangeMode.PICK_RANDOM,
100+
reqBuilder.getDirectedReadOptions(),
101+
reqBuilder.getRoutingHintBuilder());
102+
}
103+
86104
public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) {
87105
if (!reqBuilder.hasMutationKey()) {
88106
return null;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ final class KeyAwareChannel extends ManagedChannel {
6767
private final Map<String, SoftReference<ChannelFinder>> channelFinders =
6868
new ConcurrentHashMap<>();
6969
private final Map<ByteString, String> transactionAffinities = new ConcurrentHashMap<>();
70+
// Maps read-only transaction IDs to their preferLeader value.
71+
// Strong reads → true (prefer leader), Stale reads → false (any replica).
72+
private final Map<ByteString, Boolean> readOnlyTransactions = new ConcurrentHashMap<>();
7073

7174
private KeyAwareChannel(
7275
InstantiatingGrpcChannelProvider channelProvider,
@@ -184,12 +187,34 @@ private void clearAffinity(ByteString transactionId) {
184187
return;
185188
}
186189
transactionAffinities.remove(transactionId);
190+
readOnlyTransactions.remove(transactionId);
187191
}
188192

189193
void clearTransactionAffinity(ByteString transactionId) {
190194
clearAffinity(transactionId);
191195
}
192196

197+
private boolean isReadOnlyTransaction(ByteString transactionId) {
198+
return transactionId != null
199+
&& !transactionId.isEmpty()
200+
&& readOnlyTransactions.containsKey(transactionId);
201+
}
202+
203+
@Nullable
204+
private Boolean readOnlyPreferLeader(ByteString transactionId) {
205+
if (transactionId == null || transactionId.isEmpty()) {
206+
return null;
207+
}
208+
return readOnlyTransactions.get(transactionId);
209+
}
210+
211+
private void trackReadOnlyTransaction(ByteString transactionId, boolean preferLeader) {
212+
if (transactionId == null || transactionId.isEmpty()) {
213+
return;
214+
}
215+
readOnlyTransactions.put(transactionId, preferLeader);
216+
}
217+
193218
private void recordAffinity(
194219
ByteString transactionId, @Nullable ChannelEndpoint endpoint, boolean allowDefault) {
195220
if (transactionId == null || transactionId.isEmpty() || endpoint == null) {
@@ -250,6 +275,8 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
250275
@Nullable private Boolean pendingMessageCompression;
251276
@Nullable private io.grpc.Status cancelledStatus;
252277
@Nullable private Metadata cancelledTrailers;
278+
private boolean isReadOnlyBegin;
279+
private boolean readOnlyIsStrong;
253280
private final Object lock = new Object();
254281

255282
KeyAwareClientCall(
@@ -325,7 +352,12 @@ public void sendMessage(RequestT message) {
325352
finder = parentChannel.getOrCreateChannelFinder(databaseId);
326353
endpoint = finder.findServer(reqBuilder);
327354
}
328-
allowDefaultAffinity = true;
355+
if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) {
356+
isReadOnlyBegin = true;
357+
readOnlyIsStrong = reqBuilder.getOptions().getReadOnly().getStrong();
358+
} else {
359+
allowDefaultAffinity = true;
360+
}
329361
message = (RequestT) reqBuilder.build();
330362
} else if (message instanceof CommitRequest) {
331363
CommitRequest request = (CommitRequest) message;
@@ -486,11 +518,17 @@ void maybeClearAffinity() {
486518
private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) {
487519
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
488520
ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction());
489-
ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId);
521+
// Skip affinity for read-only transactions so each read routes independently.
522+
boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId);
523+
ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId);
490524
ChannelFinder finder = null;
491525
if (databaseId != null) {
492526
finder = parentChannel.getOrCreateChannelFinder(databaseId);
493-
ChannelEndpoint routed = finder.findServer(reqBuilder);
527+
Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId);
528+
ChannelEndpoint routed =
529+
preferLeaderOverride != null
530+
? finder.findServer(reqBuilder, preferLeaderOverride)
531+
: finder.findServer(reqBuilder);
494532
if (endpoint == null) {
495533
endpoint = routed;
496534
}
@@ -501,11 +539,17 @@ private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) {
501539
private RoutingDecision routeFromRequest(ExecuteSqlRequest.Builder reqBuilder) {
502540
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
503541
ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction());
504-
ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId);
542+
// Skip affinity for read-only transactions so each query routes independently.
543+
boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId);
544+
ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId);
505545
ChannelFinder finder = null;
506546
if (databaseId != null) {
507547
finder = parentChannel.getOrCreateChannelFinder(databaseId);
508-
ChannelEndpoint routed = finder.findServer(reqBuilder);
548+
Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId);
549+
ChannelEndpoint routed =
550+
preferLeaderOverride != null
551+
? finder.findServer(reqBuilder, preferLeaderOverride)
552+
: finder.findServer(reqBuilder);
509553
if (endpoint == null) {
510554
endpoint = routed;
511555
}
@@ -554,7 +598,13 @@ public void onMessage(ResponseT message) {
554598
transactionId = transactionIdFromTransaction(response);
555599
}
556600
if (transactionId != null) {
557-
call.maybeRecordAffinity(transactionId);
601+
if (call.isReadOnlyBegin) {
602+
// Track the read-only transaction so subsequent reads skip affinity
603+
// and route independently based on key-based routing.
604+
call.parentChannel.trackReadOnlyTransaction(transactionId, call.readOnlyIsStrong);
605+
} else if (!call.parentChannel.isReadOnlyTransaction(transactionId)) {
606+
call.maybeRecordAffinity(transactionId);
607+
}
558608
}
559609
super.onMessage(message);
560610
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,18 @@
2828
import com.google.spanner.v1.CommitResponse;
2929
import com.google.spanner.v1.ExecuteSqlRequest;
3030
import com.google.spanner.v1.Group;
31+
import com.google.spanner.v1.PartialResultSet;
3132
import com.google.spanner.v1.Range;
33+
import com.google.spanner.v1.ReadRequest;
3234
import com.google.spanner.v1.ResultSet;
35+
import com.google.spanner.v1.ResultSetMetadata;
3336
import com.google.spanner.v1.RollbackRequest;
3437
import com.google.spanner.v1.RoutingHint;
3538
import com.google.spanner.v1.SpannerGrpc;
3639
import com.google.spanner.v1.Tablet;
3740
import com.google.spanner.v1.Transaction;
41+
import com.google.spanner.v1.TransactionOptions;
42+
import com.google.spanner.v1.TransactionSelector;
3843
import io.grpc.CallOptions;
3944
import io.grpc.ClientCall;
4045
import io.grpc.ManagedChannel;
@@ -269,6 +274,221 @@ public void resultSetCacheUpdateRoutesSubsequentRequest() throws Exception {
269274
assertThat(harness.endpointCache.callCountForAddress("routed:1234")).isEqualTo(1);
270275
}
271276

277+
@Test
278+
public void readOnlyTransactionRoutesEachReadIndependently() throws Exception {
279+
TestHarness harness = createHarness();
280+
ByteString transactionId = ByteString.copyFromUtf8("ro-tx-1");
281+
282+
// 1. Begin a read-only transaction (stale read).
283+
ClientCall<BeginTransactionRequest, Transaction> beginCall =
284+
harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT);
285+
CapturingListener<Transaction> beginListener = new CapturingListener<>();
286+
beginCall.start(beginListener, new Metadata());
287+
beginCall.sendMessage(
288+
BeginTransactionRequest.newBuilder()
289+
.setSession(SESSION)
290+
.setOptions(
291+
TransactionOptions.newBuilder()
292+
.setReadOnly(
293+
TransactionOptions.ReadOnly.newBuilder()
294+
.setReturnReadTimestamp(true)
295+
.build()))
296+
.build());
297+
298+
// BeginTransaction goes to default channel.
299+
assertThat(harness.defaultManagedChannel.callCount()).isEqualTo(1);
300+
301+
@SuppressWarnings("unchecked")
302+
RecordingClientCall<BeginTransactionRequest, Transaction> beginDelegate =
303+
(RecordingClientCall<BeginTransactionRequest, Transaction>)
304+
harness.defaultManagedChannel.latestCall();
305+
beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build());
306+
beginDelegate.emitOnClose(Status.OK, new Metadata());
307+
308+
// 2. Populate cache with routing data for two different key ranges.
309+
CacheUpdate cacheUpdate =
310+
CacheUpdate.newBuilder()
311+
.setDatabaseId(7L)
312+
.addRange(
313+
Range.newBuilder()
314+
.setStartKey(bytes("a"))
315+
.setLimitKey(bytes("m"))
316+
.setGroupUid(1L)
317+
.setSplitId(1L)
318+
.setGeneration(bytes("1")))
319+
.addRange(
320+
Range.newBuilder()
321+
.setStartKey(bytes("m"))
322+
.setLimitKey(bytes("z"))
323+
.setGroupUid(2L)
324+
.setSplitId(2L)
325+
.setGeneration(bytes("1")))
326+
.addGroup(
327+
Group.newBuilder()
328+
.setGroupUid(1L)
329+
.setGeneration(bytes("1"))
330+
.addTablets(
331+
Tablet.newBuilder()
332+
.setTabletUid(1L)
333+
.setServerAddress("server-a:1234")
334+
.setIncarnation(bytes("1"))
335+
.setDistance(0)))
336+
.addGroup(
337+
Group.newBuilder()
338+
.setGroupUid(2L)
339+
.setGeneration(bytes("1"))
340+
.addTablets(
341+
Tablet.newBuilder()
342+
.setTabletUid(2L)
343+
.setServerAddress("server-b:1234")
344+
.setIncarnation(bytes("1"))
345+
.setDistance(0)))
346+
.build();
347+
348+
// Seed the cache via a dummy query response with cache update.
349+
ClientCall<ExecuteSqlRequest, ResultSet> seedCall =
350+
harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT);
351+
seedCall.start(new CapturingListener<ResultSet>(), new Metadata());
352+
seedCall.sendMessage(
353+
ExecuteSqlRequest.newBuilder()
354+
.setSession(SESSION)
355+
.setRoutingHint(RoutingHint.newBuilder().setKey(bytes("a")).build())
356+
.build());
357+
@SuppressWarnings("unchecked")
358+
RecordingClientCall<ExecuteSqlRequest, ResultSet> seedDelegate =
359+
(RecordingClientCall<ExecuteSqlRequest, ResultSet>)
360+
harness.defaultManagedChannel.latestCall();
361+
seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build());
362+
363+
// 3. Send a streaming read with key in range [a, m) → should go to server-a.
364+
ClientCall<ReadRequest, PartialResultSet> readCallA =
365+
harness.channel.newCall(SpannerGrpc.getStreamingReadMethod(), CallOptions.DEFAULT);
366+
readCallA.start(new CapturingListener<PartialResultSet>(), new Metadata());
367+
readCallA.sendMessage(
368+
ReadRequest.newBuilder()
369+
.setSession(SESSION)
370+
.setTransaction(TransactionSelector.newBuilder().setId(transactionId))
371+
.setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build())
372+
.build());
373+
374+
assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1);
375+
376+
// 4. Send an ExecuteStreamingSql with key in range [m, z) → should go to server-b.
377+
ClientCall<ExecuteSqlRequest, PartialResultSet> queryCallB =
378+
harness.channel.newCall(SpannerGrpc.getExecuteStreamingSqlMethod(), CallOptions.DEFAULT);
379+
queryCallB.start(new CapturingListener<PartialResultSet>(), new Metadata());
380+
queryCallB.sendMessage(
381+
ExecuteSqlRequest.newBuilder()
382+
.setSession(SESSION)
383+
.setTransaction(TransactionSelector.newBuilder().setId(transactionId))
384+
.setRoutingHint(RoutingHint.newBuilder().setKey(bytes("n")).build())
385+
.build());
386+
387+
assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(1);
388+
389+
// Neither read was pinned to the default host (besides the initial begin + seed).
390+
// default had: 1 begin + 1 seed = 2 calls
391+
assertThat(harness.defaultManagedChannel.callCount()).isEqualTo(2);
392+
}
393+
394+
@Test
395+
public void readOnlyTransactionDoesNotRecordAffinity() throws Exception {
396+
TestHarness harness = createHarness();
397+
ByteString transactionId = ByteString.copyFromUtf8("ro-tx-2");
398+
399+
// Begin a read-only transaction.
400+
ClientCall<BeginTransactionRequest, Transaction> beginCall =
401+
harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT);
402+
beginCall.start(new CapturingListener<Transaction>(), new Metadata());
403+
beginCall.sendMessage(
404+
BeginTransactionRequest.newBuilder()
405+
.setSession(SESSION)
406+
.setOptions(
407+
TransactionOptions.newBuilder()
408+
.setReadOnly(
409+
TransactionOptions.ReadOnly.newBuilder()
410+
.setReturnReadTimestamp(true)
411+
.build()))
412+
.build());
413+
414+
@SuppressWarnings("unchecked")
415+
RecordingClientCall<BeginTransactionRequest, Transaction> beginDelegate =
416+
(RecordingClientCall<BeginTransactionRequest, Transaction>)
417+
harness.defaultManagedChannel.latestCall();
418+
beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build());
419+
beginDelegate.emitOnClose(Status.OK, new Metadata());
420+
421+
// No affinity should be recorded for the default endpoint.
422+
// Verify by checking that the endpoint cache was never queried for affinity lookup.
423+
// The default endpoint getCount tracks affinity lookups.
424+
assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0);
425+
426+
// Send a read using the transaction ID (no cache populated, so falls back to default).
427+
ClientCall<ExecuteSqlRequest, ResultSet> readCall =
428+
harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT);
429+
readCall.start(new CapturingListener<ResultSet>(), new Metadata());
430+
readCall.sendMessage(
431+
ExecuteSqlRequest.newBuilder()
432+
.setSession(SESSION)
433+
.setTransaction(TransactionSelector.newBuilder().setId(transactionId))
434+
.build());
435+
436+
// The read goes to default (no cache data), but NOT because of affinity.
437+
// No affinity lookup should have been performed for the read-only txn.
438+
assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0);
439+
440+
// Now receive a response with the transaction ID — should NOT record affinity.
441+
@SuppressWarnings("unchecked")
442+
RecordingClientCall<ExecuteSqlRequest, ResultSet> readDelegate =
443+
(RecordingClientCall<ExecuteSqlRequest, ResultSet>)
444+
harness.defaultManagedChannel.latestCall();
445+
readDelegate.emitOnMessage(
446+
ResultSet.newBuilder()
447+
.setMetadata(
448+
ResultSetMetadata.newBuilder()
449+
.setTransaction(Transaction.newBuilder().setId(transactionId)))
450+
.build());
451+
452+
// Still no affinity recorded.
453+
assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0);
454+
}
455+
456+
@Test
457+
public void readOnlyTransactionCleanupOnClose() throws Exception {
458+
TestHarness harness = createHarness();
459+
ByteString transactionId = ByteString.copyFromUtf8("ro-tx-3");
460+
461+
// Begin a read-only transaction.
462+
ClientCall<BeginTransactionRequest, Transaction> beginCall =
463+
harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT);
464+
beginCall.start(new CapturingListener<Transaction>(), new Metadata());
465+
beginCall.sendMessage(
466+
BeginTransactionRequest.newBuilder()
467+
.setSession(SESSION)
468+
.setOptions(
469+
TransactionOptions.newBuilder()
470+
.setReadOnly(
471+
TransactionOptions.ReadOnly.newBuilder()
472+
.setReturnReadTimestamp(true)
473+
.build()))
474+
.build());
475+
476+
@SuppressWarnings("unchecked")
477+
RecordingClientCall<BeginTransactionRequest, Transaction> beginDelegate =
478+
(RecordingClientCall<BeginTransactionRequest, Transaction>)
479+
harness.defaultManagedChannel.latestCall();
480+
beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build());
481+
beginDelegate.emitOnClose(Status.OK, new Metadata());
482+
483+
// Clear transaction affinity (simulates MultiUseReadOnlyTransaction.close()).
484+
harness.channel.clearTransactionAffinity(transactionId);
485+
486+
// After cleanup, reads with this transaction ID should use normal affinity logic.
487+
// This ensures no memory leak for the readOnlyTransactions map.
488+
// We can verify indirectly: a BeginTransaction for a read-write txn with the same ID
489+
// would record affinity normally.
490+
}
491+
272492
private static TestHarness createHarness() throws IOException {
273493
FakeEndpointCache endpointCache = new FakeEndpointCache(DEFAULT_ADDRESS);
274494
InstantiatingGrpcChannelProvider provider =

0 commit comments

Comments
 (0)