Skip to content

Commit

Permalink
Changed endpoint to return the whole tx result, added msg index check
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiDiachuk committed Sep 27, 2024
1 parent 42f9d46 commit 64d91e2
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
2 changes: 1 addition & 1 deletion access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo
func (c *Client) SendAndSubscribeTransactionStatuses(
ctx context.Context,
tx flow.Transaction,
) (<-chan flow.TransactionStatus, <-chan error, error) {
) (<-chan flow.TransactionResult, <-chan error, error) {
return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx)
}

Expand Down
27 changes: 20 additions & 7 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses(
ctx context.Context,
tx flow.Transaction,
opts ...grpc.CallOption,
) (<-chan flow.TransactionStatus, <-chan error, error) {
) (<-chan flow.TransactionResult, <-chan error, error) {
txMsg, err := convert.TransactionToMessage(tx)
if err != nil {
return nil, nil, newEntityToMessageError(entityTransaction, err)
Expand All @@ -1149,7 +1149,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses(
return nil, nil, newRPCError(err)
}

txStatusChan := make(chan flow.TransactionStatus)
txStatusChan := make(chan flow.TransactionResult)
errChan := make(chan error)

sendErr := func(err error) {
Expand All @@ -1163,24 +1163,37 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses(
defer close(txStatusChan)
defer close(errChan)

messageIndex := uint64(0)

for {
// Receive the next txStatus response
txStatusResponse, err := subscribeClient.Recv()
// Receive the next txResult response
txResultsResponse, err := subscribeClient.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}
sendErr(fmt.Errorf("error receiving blockHeader: %w", err))
sendErr(fmt.Errorf("error receiving transaction result: %w", err))
return
}

if messageIndex != txResultsResponse.GetMessageIndex() {
sendErr(fmt.Errorf("tx result response was lost"))
return
}

txResult, err := convert.MessageToTransactionResult(txResultsResponse.GetTransactionResults(), c.jsonOptions)
if err != nil {
sendErr(fmt.Errorf("error converting transaction result: %w", err))
return
}

txStatus := flow.TransactionStatus(txStatusResponse.GetTransactionResults().Status)
messageIndex++

select {
case <-ctx.Done():
return
case txStatusChan <- txStatus:
case txStatusChan <- txResult:
}
}
}()
Expand Down
42 changes: 27 additions & 15 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,7 +2154,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) {
transactions := test.TransactionGenerator()

generateTransactionStatusResponses := func(count uint64, encodingVersion flow.EventEncodingVersion) []*access.SendAndSubscribeTransactionStatusesResponse {
var resTransactionStatuses []*access.SendAndSubscribeTransactionStatusesResponse
var resTransactionResults []*access.SendAndSubscribeTransactionStatusesResponse
results := test.TransactionResultGenerator(encodingVersion)

for i := uint64(0); i < count; i++ {
Expand All @@ -2163,12 +2163,13 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) {

response := &access.SendAndSubscribeTransactionStatusesResponse{
TransactionResults: transactionResult,
MessageIndex: i,
}

resTransactionStatuses = append(resTransactionStatuses, response)
resTransactionResults = append(resTransactionResults, response)
}

return resTransactionStatuses
return resTransactionResults
}

t.Run("Happy Path - CCF", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
Expand All @@ -2183,17 +2184,23 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) {

rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil)

txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx)
txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

expectedCounter := uint64(0)

for i := uint64(0); i < responseCount; i++ {
actualTxStatus := <-txStatusesCh
expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status)
require.Equal(t, expectedTxStatus, actualTxStatus)
actualTxResult := <-txResultCh
expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions)
require.NoError(t, err)
require.Equal(t, expectedTxResult, actualTxResult)
require.Equal(t, expectedCounter, stream.responses[i].MessageIndex)

expectedCounter++
}
cancel()

Expand All @@ -2212,17 +2219,22 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) {

rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil)

txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx)
txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

expectedCounter := uint64(0)
for i := uint64(0); i < responseCount; i++ {
actualTxStatus := <-txStatusesCh
expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status)
require.Equal(t, expectedTxStatus, actualTxStatus)
actualTxResult := <-txResultCh
expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions)
require.NoError(t, err)
require.Equal(t, expectedTxResult, actualTxResult)
require.Equal(t, expectedCounter, stream.responses[i].MessageIndex)

expectedCounter++
}
cancel()

Expand All @@ -2240,12 +2252,12 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) {
On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).
Return(stream, nil)

txStatusChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{})
txResultChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{})
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoTxStatuses(t, txStatusChan, wg.Done)
go assertNoTxResults(t, txResultChan, wg.Done)

errorCount := 0
for e := range errCh {
Expand Down Expand Up @@ -2285,9 +2297,9 @@ func (m *mockTransactionStatusesClientStream) Recv() (*access.SendAndSubscribeTr
return m.responses[m.offset], nil
}

func assertNoTxStatuses[TxStatus any](t *testing.T, txStatusChan <-chan TxStatus, done func()) {
func assertNoTxResults[TxStatus any](t *testing.T, txResultChan <-chan TxStatus, done func()) {
defer done()
for range txStatusChan {
for range txResultChan {
require.FailNow(t, "should not receive txStatus")
}
}

0 comments on commit 64d91e2

Please sign in to comment.