Skip to content

Commit

Permalink
fix: deadlock with inline begin transaction when first statement resp…
Browse files Browse the repository at this point in the history
…onse is never returned (#7712)
  • Loading branch information
rahul2393 authored Apr 10, 2023
1 parent 7afc98a commit 688d4ab
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
34 changes: 34 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,40 @@ func TestClient_ReadWriteTransaction_BatchDmlWithErrorOnSecondStatement(t *testi
}
}

func TestClient_ReadWriteTransaction_MultipleReadsWithoutNext(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
_, err := client.ReadWriteTransaction(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"})
iter.Stop()
iter = tx.Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"})
iter.Stop()
iter = tx.Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"})
iter.Stop()
iter = tx.Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"})
iter.Stop()
return nil
})
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
}

func testReadWriteTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime, expectedAttempts int) error {
return testReadWriteTransactionWithConfig(t, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, executionTimes, expectedAttempts)
}
Expand Down
5 changes: 5 additions & 0 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1285,10 +1285,15 @@ func (t *ReadWriteTransaction) setTransactionID(tx transactionID) {
func (t *ReadWriteTransaction) release(err error) {
t.mu.Lock()
sh := t.sh
state := t.state
t.mu.Unlock()
if sh != nil && isSessionNotFoundError(err) {
sh.destroy()
}
// if transaction is released during initialization then do explicit begin transaction
if state == txInit {
t.setTransactionID(nil)
}
}

func beginTransaction(ctx context.Context, sid string, client *vkit.Client, opts TransactionOptions) (transactionID, error) {
Expand Down

0 comments on commit 688d4ab

Please sign in to comment.