Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,25 @@ ApiFuture<Empty> rollbackAsync() {
getTransactionChannelHint());
session.markUsed(clock.instant());
return apiFuture;
} else if (transactionIdFuture != null) {
ApiFuture<ByteString> transactionIdOrEmptyFuture =
Copy link
Collaborator

@sakthivelmanii sakthivelmanii Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1. Do we have to update the session last use time?
2. Can we move this entire into a new function? Previously we used to have 2 condition.
1. transaction id is present
2. transaction id is null
the fix which we are implementing, we are waiting for beginTransaction RPC, once we get it the response back, based on the success / failure, we are trying to do what we were doing before(if transaction id is there then rollback, else do nothing). Moving this a function and reusing the code might be better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to update the session last use time?

No, see also my response to Gemini above :-)

Regarding the second point: I don't think there is much code to be shared, as the implementations are relatively different. There are two possible scenarios:

  1. There is a transaction ID: In that case this.transactionId is not null. How it has been set (e.g. returned by an explicit BeginTransaction, returned by a statement with an inlined-begin, or set in the constructor) is not important. The only important thing is that it is there.
  2. There is no transaction ID, but this.transactionIdFuture is not null: This means that we are waiting for a transaction ID to be returned, either by an inlined-begin, or by a BeginTransaction RPC. We don't wait for it to be returned, but we add a callback to the future. If the future returns a result, then we use that to execute an async Rollback.

I think that the only way that we could share the logic for the two above scenarios, would be by wrapping this.transactionId in something like an ApiFutures.immediateFuture(..) and then pass that Future in to the method that handles scenario 2. But I'm not sure that simplifies the overall code (and it does add a bit of extra overhead).

Copy link
Collaborator

@sakthivelmanii sakthivelmanii Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of the following approach.

public void waitForTransactionId() {
  if (transactionId != null || transactionIdFuture == null) {
    return;
  }
  try {
    transactionId = SpannerApiFutures.get(transactionIdFuture);
  } catch (SpannerException ignored) {
  }
}

and just call waitForTransactionId before close() in rollbackAsync().

I thought of this way. Maybe if you see any issues, we can proceed with the current approach itself. I am approving the PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make rollbackAsync() potentially blocking, which is not something that we want.

ApiFutures.catching(
transactionIdFuture,
Throwable.class,
input -> ByteString.empty(),
MoreExecutors.directExecutor());
return ApiFutures.transformAsync(
transactionIdOrEmptyFuture,
transactionId ->
transactionId.isEmpty()
? ApiFutures.immediateFuture(Empty.getDefaultInstance())
: rpc.rollbackAsync(
RollbackRequest.newBuilder()
.setSession(session.getName())
.setTransactionId(transactionId)
.build(),
getTransactionChannelHint()),
Comment on lines +626 to +634
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The session should be marked as used when a rollback is initiated. This is consistent with other RPC calls in this class and is important for the session pool to correctly manage session liveness, preventing premature eviction of active sessions.

transactionId -> {
  if (transactionId.isEmpty()) {
    return ApiFutures.immediateFuture(Empty.getDefaultInstance());
  }
  session.markUsed(clock.instant());
  return rpc.rollbackAsync(
      RollbackRequest.newBuilder()
          .setSession(session.getName())
          .setTransactionId(transactionId)
          .build(),
      getTransactionChannelHint());
},

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session.markUsed call is redundant, now that all operations use multiplexed sessions. I will open a follow-up PR to remove all calls to this, as it does not mean anything in the current setup.

MoreExecutors.directExecutor());
} else {
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import static org.junit.Assert.assertNull;

import com.google.api.core.ApiFuture;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.connection.AbstractMockServerTest;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
import com.google.common.base.Function;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RollbackRequest;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class OrphanedTransactionTest extends AbstractMockServerTest {
private static final Statement STATEMENT = Statement.of("SELECT * FROM random");

@BeforeClass
public static void setupReadResult() {
com.google.cloud.spanner.connection.RandomResultSetGenerator generator =
new RandomResultSetGenerator(10);
mockSpanner.putStatementResult(StatementResult.query(STATEMENT, generator.generate()));
}

private Spanner createSpanner() {
return SpannerOptions.newBuilder()
.setProjectId("fake-project")
.setHost("http://localhost:" + getPort())
.setCredentials(NoCredentials.getInstance())
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
.setSessionPoolOption(
SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build())
.build()
.getService();
}

@Test
public void testOrphanedTransaction() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
try (Spanner spanner = createSpanner()) {
DatabaseClient client =
spanner.getDatabaseClient(
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
// Freeze the mock server to ensure that the request lands on the mock server before we
// proceed.
mockSpanner.freeze();
AsyncTransactionManager manager = client.transactionManagerAsync();
TransactionContextFuture context = manager.beginAsync();
context.then(
(txn, input) -> {
try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) {
resultSet.toListAsync(
(Function<StructReader, Object>)
row -> Objects.requireNonNull(row).getValue(0).getAsString(),
executor);
}
return null;
},
executor);
// Wait for the ExecuteSqlRequest to land on the mock server.
mockSpanner.waitForRequestsToContain(
input ->
input instanceof ExecuteSqlRequest
&& ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()),
5000L);
// Now close the transaction. This should (eventually) trigger a rollback, even though the
// client has not yet received a transaction ID.
manager.closeAsync();
// Unfreeze the mock server and wait for the Rollback request to be received.
mockSpanner.unfreeze();
mockSpanner.waitForLastRequestToBe(RollbackRequest.class, 5000L);
} finally {
executor.shutdown();
}
}

@Test
public void testOrphanedTransactionWithFailedFirstQuery() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofException(
Status.INVALID_ARGUMENT.withDescription("table not found").asRuntimeException()));
try (Spanner spanner = createSpanner()) {
DatabaseClient client =
spanner.getDatabaseClient(
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
// Freeze the mock server to ensure that the request lands on the mock server before we
// proceed.
mockSpanner.freeze();
AsyncTransactionManager manager = client.transactionManagerAsync();
TransactionContextFuture context = manager.beginAsync();
context.then(
(txn, input) -> {
try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) {
resultSet.toListAsync(
(Function<StructReader, Object>)
row -> Objects.requireNonNull(row).getValue(0).getAsString(),
executor);
}
return null;
},
executor);
// Wait for the ExecuteSqlRequest to land on the mock server.
mockSpanner.waitForRequestsToContain(
input ->
input instanceof ExecuteSqlRequest
&& ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()),
5000L);
// Now close the transaction. This will not trigger a Rollback, as the statement failed.
// The closeResult will be done when the error for the failed statement is returned to the
// client.
ApiFuture<Void> closeResult = manager.closeAsync();
mockSpanner.unfreeze();
assertNull(closeResult.get());
} finally {
executor.shutdown();
}
}
}
Loading