Skip to content

Commit e0670d4

Browse files
Merge pull request #175 from kaleido-io/existing
Remove unneeded "allowExisting" param from some db methods
2 parents 8b0a67d + 0a31205 commit e0670d4

23 files changed

+148
-200
lines changed

internal/assets/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id
124124
Status: fftypes.OpStatusPending,
125125
}
126126
tx.Hash = tx.Subject.Hash()
127-
err = am.database.UpsertTransaction(ctx, tx, true, false /* should be new, or idempotent replay */)
127+
err = am.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */)
128128
if err != nil {
129129
return nil, err
130130
}

internal/assets/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func TestCreateTokenPoolTransactionFail(t *testing.T) {
120120
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
121121
mdm.On("VerifyNamespaceExists", context.Background(), "ns1").Return(nil)
122122
mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.Anything).Return("tx12345", nil)
123-
mdi.On("UpsertTransaction", context.Background(), mock.Anything, true, false).Return(fmt.Errorf("pop"))
123+
mdi.On("UpsertTransaction", context.Background(), mock.Anything, false).Return(fmt.Errorf("pop"))
124124

125125
_, err := am.CreateTokenPool(context.Background(), "ns1", "magic-tokens", &fftypes.TokenPool{}, false)
126126
assert.Regexp(t, "pop", err)
@@ -137,7 +137,7 @@ func TestCreateTokenPoolSuccess(t *testing.T) {
137137
mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.Anything).Return("tx12345", nil)
138138
mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool {
139139
return tx.Subject.Type == fftypes.TransactionTypeTokenPool
140-
}), true, false).Return(nil)
140+
}), false).Return(nil)
141141
mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(nil)
142142

143143
_, err := am.CreateTokenPool(context.Background(), "ns1", "magic-tokens", &fftypes.TokenPool{}, false)
@@ -160,7 +160,7 @@ func TestCreateTokenPoolConfirm(t *testing.T) {
160160
})).Return("tx12345", nil).Times(1)
161161
mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool {
162162
return tx.Subject.Type == fftypes.TransactionTypeTokenPool
163-
}), true, false).Return(nil)
163+
}), false).Return(nil)
164164
mdi.On("UpsertOperation", mock.Anything, mock.MatchedBy(func(op *fftypes.Operation) bool {
165165
return op.BackendID == "tx12345"
166166
}), false).Return(nil).Times(1)

internal/batch/batch_manager_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ func TestE2EDispatchBroadcast(t *testing.T) {
101101
mdm.On("GetMessageData", mock.Anything, mock.Anything, true).Return([]*fftypes.Data{data}, true, nil)
102102
mdi.On("GetMessages", mock.Anything, mock.Anything).Return([]*fftypes.Message{msg}, nil, nil).Once()
103103
mdi.On("GetMessages", mock.Anything, mock.Anything).Return([]*fftypes.Message{}, nil, nil)
104-
mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
105-
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
104+
mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
105+
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
106106
rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything, mock.Anything).Return(nil)
107107
rag.RunFn = func(a mock.Arguments) {
108108
ctx := a.Get(0).(context.Context)
@@ -206,8 +206,8 @@ func TestE2EDispatchPrivate(t *testing.T) {
206206
mdm.On("GetMessageData", mock.Anything, mock.Anything, true).Return([]*fftypes.Data{data}, true, nil)
207207
mdi.On("GetMessages", mock.Anything, mock.Anything).Return([]*fftypes.Message{msg}, nil, nil).Once()
208208
mdi.On("GetMessages", mock.Anything, mock.Anything).Return([]*fftypes.Message{}, nil, nil)
209-
mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
210-
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
209+
mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
210+
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
211211
rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything, mock.Anything).Return(nil)
212212
rag.RunFn = func(a mock.Arguments) {
213213
ctx := a.Get(0).(context.Context)
@@ -433,7 +433,7 @@ func TestMessageSequencerUpdateBatchFail(t *testing.T) {
433433
}, nil, nil)
434434
mdm.On("GetMessageData", mock.Anything, mock.Anything, true).Return([]*fftypes.Data{{ID: dataID}}, true, nil)
435435
mdi.On("UpdateMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil)
436-
mdi.On("UpsertBatch", mock.Anything, mock.Anything, true, mock.Anything).Return(fmt.Errorf("fizzle"))
436+
mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("fizzle"))
437437
rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything, mock.Anything)
438438
rag.RunFn = func(a mock.Arguments) {
439439
ctx := a.Get(0).(context.Context)

internal/batch/batch_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (bp *batchProcessor) persistBatch(batch *fftypes.Batch, newWork []*batchWor
273273
}
274274
if err == nil {
275275
// Persist the batch itself
276-
err = bp.database.UpsertBatch(ctx, batch, true, seal /* we set the hash as it seals */)
276+
err = bp.database.UpsertBatch(ctx, batch, seal /* we set the hash as it seals */)
277277
}
278278
return err
279279
})

internal/batch/batch_processor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ func TestUnfilledBatch(t *testing.T) {
7070
})
7171
mockRunAsGroupPassthrough(mdi)
7272
mdi.On("UpdateMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil)
73-
mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
74-
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
73+
mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
74+
mdi.On("UpdateBatch", mock.Anything, mock.Anything).Return(nil)
7575

7676
// Generate the work the work
7777
work := make([]*batchWork, 5)
@@ -120,7 +120,7 @@ func TestFilledBatchSlowPersistence(t *testing.T) {
120120
return nil
121121
})
122122
bp.conf.BatchTimeout = 1 * time.Hour // Must fill the batch
123-
mockUpsert := mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
123+
mockUpsert := mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything)
124124
mockUpsert.ReturnArguments = mock.Arguments{nil}
125125
unblockPersistence := make(chan time.Time)
126126
mockUpsert.WaitFor = unblockPersistence
@@ -195,7 +195,7 @@ func TestCloseToUnblockUpsertBatch(t *testing.T) {
195195
bp.conf.BatchTimeout = 100 * time.Second
196196
mockRunAsGroupPassthrough(mdi)
197197
mdi.On("UpdateMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil)
198-
mup := mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
198+
mup := mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
199199
waitForCall := make(chan bool)
200200
mup.RunFn = func(a mock.Arguments) {
201201
waitForCall <- true

internal/batchpin/batchpin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *fftyp
6767
Status: fftypes.OpStatusPending,
6868
}
6969
tx.Hash = tx.Subject.Hash()
70-
err = bp.database.UpsertTransaction(ctx, tx, true, false /* should be new, or idempotent replay */)
70+
err = bp.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */)
7171
if err != nil {
7272
return err
7373
}

internal/batchpin/batchpin_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestSubmitPinnedBatchOk(t *testing.T) {
6363

6464
mii.On("Resolve", ctx, "id1").Return(identity, nil)
6565
mbi.On("VerifyIdentitySyntax", ctx, identity).Return(nil)
66-
mdi.On("UpsertTransaction", ctx, mock.Anything, true, false).Return(nil)
66+
mdi.On("UpsertTransaction", ctx, mock.Anything, false).Return(nil)
6767
mdi.On("UpsertOperation", ctx, mock.MatchedBy(func(op *fftypes.Operation) bool {
6868
assert.Equal(t, fftypes.OpTypeBlockchainBatchPin, op.Type)
6969
assert.Equal(t, "ut", op.Plugin)
@@ -103,7 +103,7 @@ func TestSubmitPinnedBatchOpFail(t *testing.T) {
103103

104104
mii.On("Resolve", ctx, "id1").Return(identity, nil)
105105
mbi.On("VerifyIdentitySyntax", ctx, identity).Return(nil)
106-
mdi.On("UpsertTransaction", ctx, mock.Anything, true, false).Return(nil)
106+
mdi.On("UpsertTransaction", ctx, mock.Anything, false).Return(nil)
107107
mdi.On("UpsertOperation", ctx, mock.Anything, false).Return(fmt.Errorf("pop"))
108108

109109
err := bp.SubmitPinnedBatch(ctx, batch, contexts)
@@ -137,7 +137,7 @@ func TestSubmitPinnedBatchTxInsertFail(t *testing.T) {
137137

138138
mii.On("Resolve", ctx, "id1").Return(identity, nil)
139139
mbi.On("VerifyIdentitySyntax", ctx, identity).Return(nil)
140-
mdi.On("UpsertTransaction", ctx, mock.Anything, true, false).Return(fmt.Errorf("pop"))
140+
mdi.On("UpsertTransaction", ctx, mock.Anything, false).Return(fmt.Errorf("pop"))
141141

142142
err := bp.SubmitPinnedBatch(ctx, batch, contexts)
143143
assert.Regexp(t, "pop", err)

internal/broadcast/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestSubmitTXAndUpdateDBUpdateBatchFail(t *testing.T) {
172172
defer cancel()
173173

174174
mdi := bm.database.(*databasemocks.Plugin)
175-
mdi.On("UpsertTransaction", mock.Anything, mock.Anything, true, false).Return(nil)
175+
mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(nil)
176176
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
177177
bm.blockchain.(*blockchainmocks.Plugin).On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", fmt.Errorf("pop"))
178178

@@ -186,7 +186,7 @@ func TestSubmitTXAndUpdateDBAddOp1Fail(t *testing.T) {
186186

187187
mdi := bm.database.(*databasemocks.Plugin)
188188
mbi := bm.blockchain.(*blockchainmocks.Plugin)
189-
mdi.On("UpsertTransaction", mock.Anything, mock.Anything, true, false).Return(nil)
189+
mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(nil)
190190
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
191191
mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop"))
192192
mbi.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("txid", nil)
@@ -215,7 +215,7 @@ func TestSubmitTXAndUpdateDBSucceed(t *testing.T) {
215215
mdi := bm.database.(*databasemocks.Plugin)
216216
mbi := bm.blockchain.(*blockchainmocks.Plugin)
217217
mbp := bm.batchpin.(*batchpinmocks.Submitter)
218-
mdi.On("UpsertTransaction", mock.Anything, mock.Anything, true, false).Return(nil)
218+
mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(nil)
219219
mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil)
220220
mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(nil)
221221
mbi.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

internal/database/sqlcommon/batch_sql.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -51,37 +51,34 @@ var (
5151
}
5252
)
5353

54-
func (s *SQLCommon) UpsertBatch(ctx context.Context, batch *fftypes.Batch, allowExisting, allowHashUpdate bool) (err error) {
54+
func (s *SQLCommon) UpsertBatch(ctx context.Context, batch *fftypes.Batch, allowHashUpdate bool) (err error) {
5555
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
5656
if err != nil {
5757
return err
5858
}
5959
defer s.rollbackTx(ctx, tx, autoCommit)
6060

61-
existing := false
62-
if allowExisting {
63-
// Do a select within the transaction to detemine if the UUID already exists
64-
batchRows, _, err := s.queryTx(ctx, tx,
65-
sq.Select("hash").
66-
From("batches").
67-
Where(sq.Eq{"id": batch.ID}),
68-
)
69-
if err != nil {
70-
return err
71-
}
61+
// Do a select within the transaction to detemine if the UUID already exists
62+
batchRows, _, err := s.queryTx(ctx, tx,
63+
sq.Select("hash").
64+
From("batches").
65+
Where(sq.Eq{"id": batch.ID}),
66+
)
67+
if err != nil {
68+
return err
69+
}
7270

73-
existing = batchRows.Next()
74-
if existing && !allowHashUpdate {
75-
var hash *fftypes.Bytes32
76-
_ = batchRows.Scan(&hash)
77-
if !fftypes.SafeHashCompare(hash, batch.Hash) {
78-
batchRows.Close()
79-
log.L(ctx).Errorf("Existing=%s New=%s", hash, batch.Hash)
80-
return database.HashMismatch
81-
}
71+
existing := batchRows.Next()
72+
if existing && !allowHashUpdate {
73+
var hash *fftypes.Bytes32
74+
_ = batchRows.Scan(&hash)
75+
if !fftypes.SafeHashCompare(hash, batch.Hash) {
76+
batchRows.Close()
77+
log.L(ctx).Errorf("Existing=%s New=%s", hash, batch.Hash)
78+
return database.HashMismatch
8279
}
83-
batchRows.Close()
8480
}
81+
batchRows.Close()
8582

8683
if existing {
8784

internal/database/sqlcommon/batch_sql_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestBatch2EWithDB(t *testing.T) {
5858
s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBatches, fftypes.ChangeEventTypeCreated, "ns1", batchID, mock.Anything).Return()
5959
s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBatches, fftypes.ChangeEventTypeUpdated, "ns1", batchID, mock.Anything).Return()
6060

61-
err := s.UpsertBatch(ctx, batch, true, true)
61+
err := s.UpsertBatch(ctx, batch, true)
6262
assert.NoError(t, err)
6363

6464
// Check we get the exact same batch back
@@ -96,10 +96,10 @@ func TestBatch2EWithDB(t *testing.T) {
9696
}
9797

9898
// Rejects hash change
99-
err = s.UpsertBatch(context.Background(), batchUpdated, true, false)
99+
err = s.UpsertBatch(context.Background(), batchUpdated, false)
100100
assert.Equal(t, database.HashMismatch, err)
101101

102-
err = s.UpsertBatch(context.Background(), batchUpdated, true, true)
102+
err = s.UpsertBatch(context.Background(), batchUpdated, true)
103103
assert.NoError(t, err)
104104

105105
// Check we get the exact same message back - note the removal of one of the batch elements
@@ -155,7 +155,7 @@ func TestBatch2EWithDB(t *testing.T) {
155155
func TestUpsertBatchFailBegin(t *testing.T) {
156156
s, mock := newMockProvider().init()
157157
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
158-
err := s.UpsertBatch(context.Background(), &fftypes.Batch{}, true, true)
158+
err := s.UpsertBatch(context.Background(), &fftypes.Batch{}, true)
159159
assert.Regexp(t, "FF10114", err)
160160
assert.NoError(t, mock.ExpectationsWereMet())
161161
}
@@ -166,7 +166,7 @@ func TestUpsertBatchFailSelect(t *testing.T) {
166166
mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop"))
167167
mock.ExpectRollback()
168168
batchID := fftypes.NewUUID()
169-
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true, true)
169+
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true)
170170
assert.Regexp(t, "FF10115", err)
171171
assert.NoError(t, mock.ExpectationsWereMet())
172172
}
@@ -178,7 +178,7 @@ func TestUpsertBatchFailInsert(t *testing.T) {
178178
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
179179
mock.ExpectRollback()
180180
batchID := fftypes.NewUUID()
181-
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true, true)
181+
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true)
182182
assert.Regexp(t, "FF10116", err)
183183
assert.NoError(t, mock.ExpectationsWereMet())
184184
}
@@ -190,7 +190,7 @@ func TestUpsertBatchFailUpdate(t *testing.T) {
190190
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(batchID.String()))
191191
mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop"))
192192
mock.ExpectRollback()
193-
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true, true)
193+
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true)
194194
assert.Regexp(t, "FF10117", err)
195195
assert.NoError(t, mock.ExpectationsWereMet())
196196
}
@@ -202,7 +202,7 @@ func TestUpsertBatchFailCommit(t *testing.T) {
202202
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}))
203203
mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1))
204204
mock.ExpectCommit().WillReturnError(fmt.Errorf("pop"))
205-
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true, true)
205+
err := s.UpsertBatch(context.Background(), &fftypes.Batch{ID: batchID}, true)
206206
assert.Regexp(t, "FF10119", err)
207207
assert.NoError(t, mock.ExpectationsWereMet())
208208
}

0 commit comments

Comments
 (0)