Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,12 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error {
}
}
}
conflictErr, conflictTestOk := err.(operations.ConflictError)
if conflictTestOk && conflictErr.IsConflictError() {
// We know that the connector has received our batch, so we shouldn't need to retry
payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the whole block and we can extract this to a function

if core.IsPinned(payload.Batch.TX.Type) {
   payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent)
} else {
   payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateConfirmed)
}

For pinned batches we have to wait for the blockchain TX that is why we set them as sent and you might have just assumed this was always the case in conflict mode. But this batch processor is completed agnostic to how the batch is handled, it could be a conflict from data exchange or any other plugin needed to dispatch that batch. In the case where it's not pinned, we don't need to wait for TX so we can set it as confirmed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore there is a snippet of code above that handles a cancelled batch pin and this is an edge case but if the last error before the cancel is a conflict we will update the state of the initial batch incorrectly from cancelled to sent

return true, nil
}
} else {
if core.IsPinned(payload.Batch.TX.Type) {
payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent)
Expand Down
23 changes: 23 additions & 0 deletions internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func mockRunAsGroupPassthrough(mdi *databasemocks.Plugin) {
}
}

type testConflictError struct {
err error
}

func (tce *testConflictError) Error() string {
return tce.err.Error()
}

func (tce *testConflictError) IsConflictError() bool {
return true
}

func TestUnfilledBatch(t *testing.T) {
log.SetLevel("debug")
coreconfig.Reset()
Expand Down Expand Up @@ -129,6 +141,17 @@ func TestUnfilledBatch(t *testing.T) {
mim.AssertExpectations(t)
}

func TestHandleDispatchConflictError(t *testing.T) {
cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error {
conflictErr := testConflictError{err: fmt.Errorf("pop")}
return &conflictErr
})
defer cancel()
bp.dispatchBatch(&DispatchPayload{})
bp.cancelCtx()
<-bp.done
}

func TestBatchSizeOverflow(t *testing.T) {
log.SetLevel("debug")
coreconfig.Reset()
Expand Down