Skip to content

Commit 329e392

Browse files
author
Chris Elder
committed
[FAB-5854] Add additional unit tests ApplyUpdates()
Add unit tests for additional ApplyUpdates() scenarios: 1) If an individual update in the CouchDB bulk update fails, we fall back to one-by-one processing of the failed updates with SaveDoc(). This scenario needs additional unit tests. Change-Id: I817f6672d5f1336a468e12e6875bf12841a1aaae Signed-off-by: Chris Elder <chris.elder@us.ibm.com>
1 parent 7f4f74d commit 329e392

File tree

4 files changed

+260
-76
lines changed

4 files changed

+260
-76
lines changed

core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,5 +614,113 @@ func TestSmallBatchSize(t *testing.T, dbProvider statedb.VersionedDBProvider) {
614614

615615
vv, _ = db.GetState("ns1", "key11")
616616
testutil.AssertEquals(t, vv.Value, jsonValue11)
617+
}
618+
619+
// TestBatchWithIndividualRetry tests a single failure in a batch
620+
func TestBatchWithIndividualRetry(t *testing.T, dbProvider statedb.VersionedDBProvider) {
621+
622+
db, err := dbProvider.GetDBHandle("testbatchretry")
623+
testutil.AssertNoError(t, err, "")
624+
625+
batch := statedb.NewUpdateBatch()
626+
vv1 := statedb.VersionedValue{Value: []byte("value1"), Version: version.NewHeight(1, 1)}
627+
vv2 := statedb.VersionedValue{Value: []byte("value2"), Version: version.NewHeight(1, 2)}
628+
vv3 := statedb.VersionedValue{Value: []byte("value3"), Version: version.NewHeight(1, 3)}
629+
vv4 := statedb.VersionedValue{Value: []byte("value4"), Version: version.NewHeight(1, 4)}
630+
631+
batch.Put("ns", "key1", vv1.Value, vv1.Version)
632+
batch.Put("ns", "key2", vv2.Value, vv2.Version)
633+
batch.Put("ns", "key3", vv3.Value, vv3.Version)
634+
batch.Put("ns", "key4", vv4.Value, vv4.Version)
635+
savePoint := version.NewHeight(1, 5)
636+
err = db.ApplyUpdates(batch, savePoint)
637+
testutil.AssertNoError(t, err, "")
638+
639+
// Clear the cache for the next batch, in place of simulation
640+
if bulkdb, ok := db.(statedb.BulkOptimizable); ok {
641+
//clear the cached versions, this will force a read when getVerion is called
642+
bulkdb.ClearCachedVersions()
643+
}
644+
645+
batch = statedb.NewUpdateBatch()
646+
batch.Put("ns", "key1", vv1.Value, vv1.Version)
647+
batch.Put("ns", "key2", vv2.Value, vv2.Version)
648+
batch.Put("ns", "key3", vv3.Value, vv3.Version)
649+
batch.Put("ns", "key4", vv4.Value, vv4.Version)
650+
savePoint = version.NewHeight(1, 6)
651+
err = db.ApplyUpdates(batch, savePoint)
652+
testutil.AssertNoError(t, err, "")
653+
654+
// Update document key3
655+
batch = statedb.NewUpdateBatch()
656+
batch.Delete("ns", "key2", vv2.Version)
657+
batch.Put("ns", "key3", vv3.Value, vv3.Version)
658+
savePoint = version.NewHeight(1, 7)
659+
err = db.ApplyUpdates(batch, savePoint)
660+
testutil.AssertNoError(t, err, "")
661+
662+
// This should force a retry for couchdb revision conflict for both delete and update
663+
// Retry logic should correct the update and prevent delete from throwing an error
664+
batch = statedb.NewUpdateBatch()
665+
batch.Delete("ns", "key2", vv2.Version)
666+
batch.Put("ns", "key3", vv3.Value, vv3.Version)
667+
savePoint = version.NewHeight(1, 8)
668+
err = db.ApplyUpdates(batch, savePoint)
669+
testutil.AssertNoError(t, err, "")
670+
671+
//Create a new set of values that use JSONs instead of binary
672+
jsonValue5 := []byte(`{"asset_name": "marble5","color": "blue","size": 5,"owner": "fred"}`)
673+
jsonValue6 := []byte(`{"asset_name": "marble6","color": "blue","size": 6,"owner": "elaine"}`)
674+
jsonValue7 := []byte(`{"asset_name": "marble7","color": "blue","size": 7,"owner": "fred"}`)
675+
jsonValue8 := []byte(`{"asset_name": "marble8","color": "blue","size": 8,"owner": "elaine"}`)
676+
677+
// Clear the cache for the next batch, in place of simulation
678+
if bulkdb, ok := db.(statedb.BulkOptimizable); ok {
679+
//clear the cached versions, this will force a read when getVersion is called
680+
bulkdb.ClearCachedVersions()
681+
}
682+
683+
batch = statedb.NewUpdateBatch()
684+
batch.Put("ns1", "key5", jsonValue5, version.NewHeight(1, 9))
685+
batch.Put("ns1", "key6", jsonValue6, version.NewHeight(1, 10))
686+
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 11))
687+
batch.Put("ns1", "key8", jsonValue8, version.NewHeight(1, 12))
688+
savePoint = version.NewHeight(1, 6)
689+
err = db.ApplyUpdates(batch, savePoint)
690+
testutil.AssertNoError(t, err, "")
691+
692+
// Clear the cache for the next batch, in place of simulation
693+
if bulkdb, ok := db.(statedb.BulkOptimizable); ok {
694+
//clear the cached versions, this will force a read when getVersion is called
695+
bulkdb.ClearCachedVersions()
696+
}
697+
698+
//Send the batch through again to test updates
699+
batch = statedb.NewUpdateBatch()
700+
batch.Put("ns1", "key5", jsonValue5, version.NewHeight(1, 9))
701+
batch.Put("ns1", "key6", jsonValue6, version.NewHeight(1, 10))
702+
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 11))
703+
batch.Put("ns1", "key8", jsonValue8, version.NewHeight(1, 12))
704+
savePoint = version.NewHeight(1, 6)
705+
err = db.ApplyUpdates(batch, savePoint)
706+
testutil.AssertNoError(t, err, "")
707+
708+
// Update document key3
709+
// this will cause an inconsistent cache entry for connection db2
710+
batch = statedb.NewUpdateBatch()
711+
batch.Delete("ns1", "key6", version.NewHeight(1, 13))
712+
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 14))
713+
savePoint = version.NewHeight(1, 15)
714+
err = db.ApplyUpdates(batch, savePoint)
715+
testutil.AssertNoError(t, err, "")
716+
717+
// This should force a retry for couchdb revision conflict for both delete and update
718+
// Retry logic should correct the update and prevent delete from throwing an error
719+
batch = statedb.NewUpdateBatch()
720+
batch.Delete("ns1", "key6", version.NewHeight(1, 16))
721+
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 17))
722+
savePoint = version.NewHeight(1, 18)
723+
err = db.ApplyUpdates(batch, savePoint)
724+
testutil.AssertNoError(t, err, "")
617725

618726
}

core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var binaryWrapper = "valueBytes"
3434
// currently defaulted to 0 and is not used
3535
var querySkip = 0
3636

37-
// BatchDocument defines a document for a batch
37+
//BatchableDocument defines a document for a batch
3838
type BatchableDocument struct {
3939
CouchDoc couchdb.CouchDoc
4040
Deleted bool
@@ -158,7 +158,7 @@ func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.Version
158158
return &statedb.VersionedValue{Value: returnValue, Version: returnVersion}, nil
159159
}
160160

161-
// GetVersion implements method in VersionedDB interface
161+
//GetCachedVersion implements method in VersionedDB interface
162162
func (vdb *VersionedDB) GetCachedVersion(namespace string, key string) (*version.Height, bool) {
163163

164164
logger.Debugf("Retrieving cached version: %s~%s", key, namespace)
@@ -214,9 +214,6 @@ func removeDataWrapper(wrappedValue []byte, attachments []*couchdb.AttachmentInf
214214
// initialize the return value
215215
returnValue := []byte{}
216216

217-
// initialize a default return version
218-
returnVersion := version.NewHeight(0, 0)
219-
220217
// create a generic map for the json
221218
jsonResult := make(map[string]interface{})
222219

@@ -242,7 +239,7 @@ func removeDataWrapper(wrappedValue []byte, attachments []*couchdb.AttachmentInf
242239

243240
}
244241

245-
returnVersion = createVersionHeightFromVersionString(jsonResult["version"].(string))
242+
returnVersion := createVersionHeightFromVersionString(jsonResult["version"].(string))
246243

247244
return returnValue, returnVersion
248245

@@ -417,7 +414,7 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
417414

418415
// Use the batchUpdateMap for tracking couchdb updates by ID
419416
// this will be used in case there are retries required
420-
batchUpdateMap := make(map[string]interface{})
417+
batchUpdateMap := make(map[string]*BatchableDocument)
421418

422419
namespaces := updateBatch.GetUpdatedNamespaces()
423420
for _, ns := range namespaces {
@@ -465,8 +462,8 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
465462
}
466463
}
467464

468-
// Add the current docment to the update map
469-
batchUpdateMap[string(compositeKey)] = BatchableDocument{CouchDoc: *couchDoc, Deleted: isDelete}
465+
// Add the current docment, revision and delete flag to the update map
466+
batchUpdateMap[string(compositeKey)] = &BatchableDocument{CouchDoc: *couchDoc, Deleted: isDelete}
470467

471468
}
472469
}
@@ -476,7 +473,7 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
476473
//Add the documents to the batch update array
477474
batchUpdateDocs := []*couchdb.CouchDoc{}
478475
for _, updateDocument := range batchUpdateMap {
479-
batchUpdateDocument := updateDocument.(BatchableDocument)
476+
batchUpdateDocument := updateDocument
480477
batchUpdateDocs = append(batchUpdateDocs, &batchUpdateDocument.CouchDoc)
481478
}
482479

@@ -495,17 +492,35 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
495492
// If the document returned an error, retry the individual document
496493
if respDoc.Ok != true {
497494

498-
batchUpdateDocument := batchUpdateMap[respDoc.ID].(BatchableDocument)
495+
batchUpdateDocument := batchUpdateMap[respDoc.ID]
499496

500497
var err error
501498

499+
//Remove the "_rev" from the JSON before saving
500+
//this will allow the CouchDB retry logic to retry revisions without encountering
501+
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
502+
if batchUpdateDocument.CouchDoc.JSONValue != nil {
503+
err = removeJSONRevision(&batchUpdateDocument.CouchDoc.JSONValue)
504+
if err != nil {
505+
return err
506+
}
507+
}
508+
502509
// Check to see if the document was added to the batch as a delete type document
503510
if batchUpdateDocument.Deleted {
511+
512+
//Log the warning message that a retry is being attempted for batch delete issue
513+
logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", respDoc.ID)
514+
504515
// If this is a deleted document, then retry the delete
505516
// If the delete fails due to a document not being found (404 error),
506517
// the document has already been deleted and the DeleteDoc will not return an error
507518
err = vdb.db.DeleteDoc(respDoc.ID, "")
508519
} else {
520+
521+
//Log the warning message that a retry is being attempted for batch update issue
522+
logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", respDoc.ID)
523+
509524
// Save the individual document to couchdb
510525
// Note that this will do retries as needed
511526
_, err = vdb.db.SaveDoc(respDoc.ID, "", &batchUpdateDocument.CouchDoc)
@@ -674,6 +689,32 @@ func createCouchdbDocJSON(id, revision string, value []byte, chaincodeID string,
674689
return documentJSON
675690
}
676691

692+
// removeJSONRevision removes the "_rev" if this is a JSON
693+
func removeJSONRevision(jsonValue *[]byte) error {
694+
695+
jsonMap := make(map[string]interface{})
696+
697+
//Unmarshal the value into a map
698+
err := json.Unmarshal(*jsonValue, &jsonMap)
699+
if err != nil {
700+
logger.Errorf("Failed to unmarshal couchdb JSON data %s\n", err.Error())
701+
return err
702+
}
703+
704+
//delete the "_rev" entry
705+
delete(jsonMap, "_rev")
706+
707+
//marshal the updated map back into the byte array
708+
*jsonValue, err = json.Marshal(jsonMap)
709+
if err != nil {
710+
logger.Errorf("Failed to marshal couchdb JSON data %s\n", err.Error())
711+
return err
712+
}
713+
714+
return nil
715+
716+
}
717+
677718
// Savepoint docid (key) for couchdb
678719
const savepointDocID = "statedb_savepoint"
679720

0 commit comments

Comments
 (0)