-
Notifications
You must be signed in to change notification settings - Fork 137
fix: rollback transactions that are waiting for tx-id to be returned #4342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -614,6 +614,25 @@ ApiFuture<Empty> rollbackAsync() { | |
| getTransactionChannelHint()); | ||
| session.markUsed(clock.instant()); | ||
| return apiFuture; | ||
| } else if (transactionIdFuture != null) { | ||
| ApiFuture<ByteString> transactionIdOrEmptyFuture = | ||
| ApiFutures.catching( | ||
| transactionIdFuture, | ||
| Throwable.class, | ||
| input -> ByteString.empty(), | ||
| MoreExecutors.directExecutor()); | ||
| return ApiFutures.transformAsync( | ||
| transactionIdOrEmptyFuture, | ||
| transactionId -> | ||
| transactionId.isEmpty() | ||
| ? ApiFutures.immediateFuture(Empty.getDefaultInstance()) | ||
| : rpc.rollbackAsync( | ||
| RollbackRequest.newBuilder() | ||
| .setSession(session.getName()) | ||
| .setTransactionId(transactionId) | ||
| .build(), | ||
| getTransactionChannelHint()), | ||
|
Comment on lines
+626
to
+634
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The session should be marked as used when a rollback is initiated. This is consistent with other RPC calls in this class and is important for the session pool to correctly manage session liveness, preventing premature eviction of active sessions. transactionId -> {
if (transactionId.isEmpty()) {
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
session.markUsed(clock.instant());
return rpc.rollbackAsync(
RollbackRequest.newBuilder()
.setSession(session.getName())
.setTransactionId(transactionId)
.build(),
getTransactionChannelHint());
},
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| MoreExecutors.directExecutor()); | ||
| } else { | ||
| return ApiFutures.immediateFuture(Empty.getDefaultInstance()); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /* | ||
| * Copyright 2026 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.spanner; | ||
|
|
||
| import static org.junit.Assert.assertNull; | ||
|
|
||
| import com.google.api.core.ApiFuture; | ||
| import com.google.cloud.NoCredentials; | ||
| import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; | ||
| import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; | ||
| import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; | ||
| import com.google.cloud.spanner.connection.AbstractMockServerTest; | ||
| import com.google.cloud.spanner.connection.RandomResultSetGenerator; | ||
| import com.google.common.base.Function; | ||
| import com.google.spanner.v1.ExecuteSqlRequest; | ||
| import com.google.spanner.v1.RollbackRequest; | ||
| import io.grpc.ManagedChannelBuilder; | ||
| import io.grpc.Status; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import org.junit.BeforeClass; | ||
| import org.junit.Test; | ||
| import org.junit.runner.RunWith; | ||
| import org.junit.runners.JUnit4; | ||
| import org.threeten.bp.Duration; | ||
|
|
||
| @RunWith(JUnit4.class) | ||
| public class OrphanedTransactionTest extends AbstractMockServerTest { | ||
| private static final Statement STATEMENT = Statement.of("SELECT * FROM random"); | ||
|
|
||
| @BeforeClass | ||
| public static void setupReadResult() { | ||
| com.google.cloud.spanner.connection.RandomResultSetGenerator generator = | ||
| new RandomResultSetGenerator(10); | ||
| mockSpanner.putStatementResult(StatementResult.query(STATEMENT, generator.generate())); | ||
| } | ||
|
|
||
| private Spanner createSpanner() { | ||
| return SpannerOptions.newBuilder() | ||
| .setProjectId("fake-project") | ||
| .setHost("http://localhost:" + getPort()) | ||
| .setCredentials(NoCredentials.getInstance()) | ||
| .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) | ||
| .setSessionPoolOption( | ||
| SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build()) | ||
| .build() | ||
| .getService(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testOrphanedTransaction() throws Exception { | ||
| ExecutorService executor = Executors.newCachedThreadPool(); | ||
| try (Spanner spanner = createSpanner()) { | ||
| DatabaseClient client = | ||
| spanner.getDatabaseClient( | ||
| DatabaseId.of("fake-project", "fake-instance", "fake-database")); | ||
| // Freeze the mock server to ensure that the request lands on the mock server before we | ||
| // proceed. | ||
| mockSpanner.freeze(); | ||
| AsyncTransactionManager manager = client.transactionManagerAsync(); | ||
| TransactionContextFuture context = manager.beginAsync(); | ||
| context.then( | ||
| (txn, input) -> { | ||
| try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) { | ||
| resultSet.toListAsync( | ||
| (Function<StructReader, Object>) | ||
| row -> Objects.requireNonNull(row).getValue(0).getAsString(), | ||
| executor); | ||
| } | ||
| return null; | ||
| }, | ||
| executor); | ||
| // Wait for the ExecuteSqlRequest to land on the mock server. | ||
| mockSpanner.waitForRequestsToContain( | ||
| input -> | ||
| input instanceof ExecuteSqlRequest | ||
| && ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()), | ||
| 5000L); | ||
| // Now close the transaction. This should (eventually) trigger a rollback, even though the | ||
| // client has not yet received a transaction ID. | ||
| manager.closeAsync(); | ||
| // Unfreeze the mock server and wait for the Rollback request to be received. | ||
| mockSpanner.unfreeze(); | ||
| mockSpanner.waitForLastRequestToBe(RollbackRequest.class, 5000L); | ||
| } finally { | ||
| executor.shutdown(); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testOrphanedTransactionWithFailedFirstQuery() throws Exception { | ||
| ExecutorService executor = Executors.newCachedThreadPool(); | ||
| mockSpanner.setExecuteStreamingSqlExecutionTime( | ||
| SimulatedExecutionTime.ofException( | ||
| Status.INVALID_ARGUMENT.withDescription("table not found").asRuntimeException())); | ||
| try (Spanner spanner = createSpanner()) { | ||
| DatabaseClient client = | ||
| spanner.getDatabaseClient( | ||
| DatabaseId.of("fake-project", "fake-instance", "fake-database")); | ||
| // Freeze the mock server to ensure that the request lands on the mock server before we | ||
| // proceed. | ||
| mockSpanner.freeze(); | ||
| AsyncTransactionManager manager = client.transactionManagerAsync(); | ||
| TransactionContextFuture context = manager.beginAsync(); | ||
| context.then( | ||
| (txn, input) -> { | ||
| try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) { | ||
| resultSet.toListAsync( | ||
| (Function<StructReader, Object>) | ||
| row -> Objects.requireNonNull(row).getValue(0).getAsString(), | ||
| executor); | ||
| } | ||
| return null; | ||
| }, | ||
| executor); | ||
| // Wait for the ExecuteSqlRequest to land on the mock server. | ||
| mockSpanner.waitForRequestsToContain( | ||
| input -> | ||
| input instanceof ExecuteSqlRequest | ||
| && ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()), | ||
| 5000L); | ||
| // Now close the transaction. This will not trigger a Rollback, as the statement failed. | ||
| // The closeResult will be done when the error for the failed statement is returned to the | ||
| // client. | ||
| ApiFuture<Void> closeResult = manager.closeAsync(); | ||
| mockSpanner.unfreeze(); | ||
| assertNull(closeResult.get()); | ||
| } finally { | ||
| executor.shutdown(); | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. Do we have to update the session last use time?2. Can we move this entire into a new function? Previously we used to have 2 condition.
1. transaction id is present
2. transaction id is null
the fix which we are implementing, we are waiting for beginTransaction RPC, once we get it the response back, based on the success / failure, we are trying to do what we were doing before(if transaction id is there then rollback, else do nothing). Moving this a function and reusing the code might be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, see also my response to Gemini above :-)
Regarding the second point: I don't think there is much code to be shared, as the implementations are relatively different. There are two possible scenarios:
this.transactionIdis not null. How it has been set (e.g. returned by an explicitBeginTransaction, returned by a statement with an inlined-begin, or set in the constructor) is not important. The only important thing is that it is there.this.transactionIdFutureis not null: This means that we are waiting for a transaction ID to be returned, either by an inlined-begin, or by aBeginTransactionRPC. We don't wait for it to be returned, but we add a callback to the future. If the future returns a result, then we use that to execute an async Rollback.I think that the only way that we could share the logic for the two above scenarios, would be by wrapping
this.transactionIdin something like anApiFutures.immediateFuture(..)and then pass that Future in to the method that handles scenario 2. But I'm not sure that simplifies the overall code (and it does add a bit of extra overhead).Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of the following approach.
and just call
waitForTransactionIdbeforeclose()inrollbackAsync().I thought of this way. Maybe if you see any issues, we can proceed with the current approach itself. I am approving the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would make
rollbackAsync()potentially blocking, which is not something that we want.