Skip to content

Commit 4aba4a8

Browse files
manish-sethiryjones
authored andcommitted
[FAB-7692] Refactor statecouchdb impl
This CR refactors the code for statecouchdb implementation. This includes splitting the code into different files and making the code more modular by spliting it into high level structures and functions so to make the code more readble and maintainable The unit tests are not changed in this CR so as to make sure that things are not lost in the refactoring Change-Id: I5c937f7d3c541be0b89591175b6946ae32c5833f Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 2892119 commit 4aba4a8

File tree

7 files changed

+832
-965
lines changed

7 files changed

+832
-965
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package statecouchdb
7+
8+
import (
9+
"sync"
10+
)
11+
12+
// batch is executed in a separate goroutine.
13+
type batch interface {
14+
execute() error
15+
}
16+
17+
// executeBatches executes each batch in a separate goroutine and returns error if
18+
// any of the batches return error during its execution
19+
func executeBatches(batches []batch) error {
20+
logger.Debugf("Executing batches = %s", batches)
21+
numBatches := len(batches)
22+
if numBatches == 0 {
23+
return nil
24+
}
25+
if numBatches == 1 {
26+
return batches[0].execute()
27+
}
28+
var batchWG sync.WaitGroup
29+
batchWG.Add(numBatches)
30+
errsChan := make(chan error, numBatches)
31+
defer close(errsChan)
32+
for _, b := range batches {
33+
go func(b batch) {
34+
defer batchWG.Done()
35+
if err := b.execute(); err != nil {
36+
errsChan <- err
37+
return
38+
}
39+
}(b)
40+
}
41+
batchWG.Wait()
42+
if len(errsChan) > 0 {
43+
return <-errsChan
44+
}
45+
return nil
46+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package statecouchdb
7+
8+
import (
9+
"errors"
10+
"fmt"
11+
12+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
13+
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
14+
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
15+
)
16+
17+
// nsCommittersBuilder implements `batch` interface. Each batch operates on a specific namespace in the updates and
18+
// builds one or more batches of type subNsCommitter.
19+
type nsCommittersBuilder struct {
20+
updates map[string]*statedb.VersionedValue
21+
db *couchdb.CouchDatabase
22+
revisions map[string]string
23+
subNsCommitters []batch
24+
}
25+
26+
// subNsCommitter implements `batch` interface. Each batch commits the portion of updates within a namespace assigned to it
27+
type subNsCommitter struct {
28+
db *couchdb.CouchDatabase
29+
batchUpdateMap map[string]*batchableDocument
30+
}
31+
32+
// buildCommitters build the batches of type subNsCommitter. This functions processes different namespaces in parallel
33+
func (vdb *VersionedDB) buildCommitters(updates *statedb.UpdateBatch) ([]batch, error) {
34+
namespaces := updates.GetUpdatedNamespaces()
35+
var nsCommitterBuilder []batch
36+
for _, ns := range namespaces {
37+
nsUpdates := updates.GetUpdates(ns)
38+
db, err := vdb.getNamespaceDBHandle(ns)
39+
if err != nil {
40+
return nil, err
41+
}
42+
nsRevs := vdb.committedDataCache.revs[ns]
43+
if nsRevs == nil {
44+
nsRevs = make(nsRevisions)
45+
}
46+
// for each namespace, construct one builder with the corresponding couchdb handle and couch revisions
47+
// that are already loaded into cache (during validation phase)
48+
nsCommitterBuilder = append(nsCommitterBuilder, &nsCommittersBuilder{updates: nsUpdates, db: db, revisions: nsRevs})
49+
}
50+
if err := executeBatches(nsCommitterBuilder); err != nil {
51+
return nil, err
52+
}
53+
// accumulate results across namespaces (one or more batches of `subNsCommitter` for a namespace from each builder)
54+
var combinedSubNsCommitters []batch
55+
for _, b := range nsCommitterBuilder {
56+
combinedSubNsCommitters = append(combinedSubNsCommitters, b.(*nsCommittersBuilder).subNsCommitters...)
57+
}
58+
return combinedSubNsCommitters, nil
59+
}
60+
61+
// execute implements the function in `batch` interface. This function builds one or more `subNsCommitter`s that
62+
// cover the updates for a namespace
63+
func (builder *nsCommittersBuilder) execute() error {
64+
if err := addRevisionsForMissingKeys(builder.revisions, builder.db, builder.updates); err != nil {
65+
return err
66+
}
67+
maxBacthSize := ledgerconfig.GetMaxBatchUpdateSize()
68+
batchUpdateMap := make(map[string]*batchableDocument)
69+
for key, vv := range builder.updates {
70+
couchDoc, err := keyValToCouchDoc(&keyValue{key: key, VersionedValue: vv}, builder.revisions[key])
71+
if err != nil {
72+
return err
73+
}
74+
batchUpdateMap[key] = &batchableDocument{CouchDoc: *couchDoc, Deleted: vv.Value == nil}
75+
if len(batchUpdateMap) == maxBacthSize {
76+
builder.subNsCommitters = append(builder.subNsCommitters, &subNsCommitter{builder.db, batchUpdateMap})
77+
batchUpdateMap = make(map[string]*batchableDocument)
78+
}
79+
}
80+
if len(batchUpdateMap) > 0 {
81+
builder.subNsCommitters = append(builder.subNsCommitters, &subNsCommitter{builder.db, batchUpdateMap})
82+
}
83+
return nil
84+
}
85+
86+
// execute implements the function in `batch` interface. This function commits the updates managed by a `subNsCommitter`
87+
func (committer *subNsCommitter) execute() error {
88+
return commitUpdates(committer.db, committer.batchUpdateMap)
89+
}
90+
91+
// commitUpdates commits the given updates to couchdb
92+
func commitUpdates(db *couchdb.CouchDatabase, batchUpdateMap map[string]*batchableDocument) error {
93+
//Add the documents to the batch update array
94+
batchUpdateDocs := []*couchdb.CouchDoc{}
95+
for _, updateDocument := range batchUpdateMap {
96+
batchUpdateDocument := updateDocument
97+
batchUpdateDocs = append(batchUpdateDocs, &batchUpdateDocument.CouchDoc)
98+
}
99+
100+
// Do the bulk update into couchdb. Note that this will do retries if the entire bulk update fails or times out
101+
batchUpdateResp, err := db.BatchUpdateDocuments(batchUpdateDocs)
102+
if err != nil {
103+
return err
104+
}
105+
// IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY
106+
// iterate through the response from CouchDB by document
107+
for _, respDoc := range batchUpdateResp {
108+
// If the document returned an error, retry the individual document
109+
if respDoc.Ok != true {
110+
batchUpdateDocument := batchUpdateMap[respDoc.ID]
111+
var err error
112+
//Remove the "_rev" from the JSON before saving
113+
//this will allow the CouchDB retry logic to retry revisions without encountering
114+
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
115+
if batchUpdateDocument.CouchDoc.JSONValue != nil {
116+
err = removeJSONRevision(&batchUpdateDocument.CouchDoc.JSONValue)
117+
if err != nil {
118+
return err
119+
}
120+
}
121+
// Check to see if the document was added to the batch as a delete type document
122+
if batchUpdateDocument.Deleted {
123+
logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", respDoc.ID)
124+
// If this is a deleted document, then retry the delete
125+
// If the delete fails due to a document not being found (404 error),
126+
// the document has already been deleted and the DeleteDoc will not return an error
127+
err = db.DeleteDoc(respDoc.ID, "")
128+
} else {
129+
logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", respDoc.ID)
130+
// Save the individual document to couchdb
131+
// Note that this will do retries as needed
132+
_, err = db.SaveDoc(respDoc.ID, "", &batchUpdateDocument.CouchDoc)
133+
}
134+
135+
// If the single document update or delete returns an error, then throw the error
136+
if err != nil {
137+
errorString := fmt.Sprintf("Error occurred while saving document ID = %v Error: %s Reason: %s\n",
138+
respDoc.ID, respDoc.Error, respDoc.Reason)
139+
140+
logger.Errorf(errorString)
141+
return fmt.Errorf(errorString)
142+
}
143+
}
144+
}
145+
return nil
146+
}
147+
148+
// nsFlusher implements `batch` interface and a batch executes the function `couchdb.EnsureFullCommit()` for the given namespace
149+
type nsFlusher struct {
150+
db *couchdb.CouchDatabase
151+
}
152+
153+
func (vdb *VersionedDB) ensureFullCommit(dbs []*couchdb.CouchDatabase) error {
154+
var flushers []batch
155+
for _, db := range dbs {
156+
flushers = append(flushers, &nsFlusher{db})
157+
}
158+
return executeBatches(flushers)
159+
}
160+
161+
func (f *nsFlusher) execute() error {
162+
dbResponse, err := f.db.EnsureFullCommit()
163+
if err != nil || dbResponse.Ok != true {
164+
logger.Errorf("Failed to perform full commit\n")
165+
return errors.New("Failed to perform full commit")
166+
}
167+
return nil
168+
}
169+
170+
func addRevisionsForMissingKeys(revisions map[string]string, db *couchdb.CouchDatabase, nsUpdates map[string]*statedb.VersionedValue) error {
171+
var missingKeys []string
172+
for key := range nsUpdates {
173+
_, ok := revisions[key]
174+
if !ok {
175+
missingKeys = append(missingKeys, key)
176+
}
177+
}
178+
retrievedMetadata, err := retrieveNsMetadata(db, missingKeys)
179+
if err != nil {
180+
return err
181+
}
182+
for _, metadata := range retrievedMetadata {
183+
revisions[metadata.ID] = revisions[metadata.Rev]
184+
}
185+
return nil
186+
}
187+
188+
//batchableDocument defines a document for a batch
189+
type batchableDocument struct {
190+
CouchDoc couchdb.CouchDoc
191+
Deleted bool
192+
}

0 commit comments

Comments
 (0)