|
| 1 | +/* |
| 2 | +Copyright IBM Corp. All Rights Reserved. |
| 3 | +SPDX-License-Identifier: Apache-2.0 |
| 4 | +*/ |
| 5 | + |
| 6 | +package statecouchdb |
| 7 | + |
| 8 | +import ( |
| 9 | + "errors" |
| 10 | + "fmt" |
| 11 | + |
| 12 | + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" |
| 13 | + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" |
| 14 | + "github.com/hyperledger/fabric/core/ledger/util/couchdb" |
| 15 | +) |
| 16 | + |
| 17 | +// nsCommittersBuilder implements `batch` interface. Each batch operates on a specific namespace in the updates and |
| 18 | +// builds one or more batches of type subNsCommitter. |
| 19 | +type nsCommittersBuilder struct { |
| 20 | + updates map[string]*statedb.VersionedValue |
| 21 | + db *couchdb.CouchDatabase |
| 22 | + revisions map[string]string |
| 23 | + subNsCommitters []batch |
| 24 | +} |
| 25 | + |
| 26 | +// subNsCommitter implements `batch` interface. Each batch commits the portion of updates within a namespace assigned to it |
| 27 | +type subNsCommitter struct { |
| 28 | + db *couchdb.CouchDatabase |
| 29 | + batchUpdateMap map[string]*batchableDocument |
| 30 | +} |
| 31 | + |
| 32 | +// buildCommitters build the batches of type subNsCommitter. This functions processes different namespaces in parallel |
| 33 | +func (vdb *VersionedDB) buildCommitters(updates *statedb.UpdateBatch) ([]batch, error) { |
| 34 | + namespaces := updates.GetUpdatedNamespaces() |
| 35 | + var nsCommitterBuilder []batch |
| 36 | + for _, ns := range namespaces { |
| 37 | + nsUpdates := updates.GetUpdates(ns) |
| 38 | + db, err := vdb.getNamespaceDBHandle(ns) |
| 39 | + if err != nil { |
| 40 | + return nil, err |
| 41 | + } |
| 42 | + nsRevs := vdb.committedDataCache.revs[ns] |
| 43 | + if nsRevs == nil { |
| 44 | + nsRevs = make(nsRevisions) |
| 45 | + } |
| 46 | + // for each namespace, construct one builder with the corresponding couchdb handle and couch revisions |
| 47 | + // that are already loaded into cache (during validation phase) |
| 48 | + nsCommitterBuilder = append(nsCommitterBuilder, &nsCommittersBuilder{updates: nsUpdates, db: db, revisions: nsRevs}) |
| 49 | + } |
| 50 | + if err := executeBatches(nsCommitterBuilder); err != nil { |
| 51 | + return nil, err |
| 52 | + } |
| 53 | + // accumulate results across namespaces (one or more batches of `subNsCommitter` for a namespace from each builder) |
| 54 | + var combinedSubNsCommitters []batch |
| 55 | + for _, b := range nsCommitterBuilder { |
| 56 | + combinedSubNsCommitters = append(combinedSubNsCommitters, b.(*nsCommittersBuilder).subNsCommitters...) |
| 57 | + } |
| 58 | + return combinedSubNsCommitters, nil |
| 59 | +} |
| 60 | + |
| 61 | +// execute implements the function in `batch` interface. This function builds one or more `subNsCommitter`s that |
| 62 | +// cover the updates for a namespace |
| 63 | +func (builder *nsCommittersBuilder) execute() error { |
| 64 | + if err := addRevisionsForMissingKeys(builder.revisions, builder.db, builder.updates); err != nil { |
| 65 | + return err |
| 66 | + } |
| 67 | + maxBacthSize := ledgerconfig.GetMaxBatchUpdateSize() |
| 68 | + batchUpdateMap := make(map[string]*batchableDocument) |
| 69 | + for key, vv := range builder.updates { |
| 70 | + couchDoc, err := keyValToCouchDoc(&keyValue{key: key, VersionedValue: vv}, builder.revisions[key]) |
| 71 | + if err != nil { |
| 72 | + return err |
| 73 | + } |
| 74 | + batchUpdateMap[key] = &batchableDocument{CouchDoc: *couchDoc, Deleted: vv.Value == nil} |
| 75 | + if len(batchUpdateMap) == maxBacthSize { |
| 76 | + builder.subNsCommitters = append(builder.subNsCommitters, &subNsCommitter{builder.db, batchUpdateMap}) |
| 77 | + batchUpdateMap = make(map[string]*batchableDocument) |
| 78 | + } |
| 79 | + } |
| 80 | + if len(batchUpdateMap) > 0 { |
| 81 | + builder.subNsCommitters = append(builder.subNsCommitters, &subNsCommitter{builder.db, batchUpdateMap}) |
| 82 | + } |
| 83 | + return nil |
| 84 | +} |
| 85 | + |
| 86 | +// execute implements the function in `batch` interface. This function commits the updates managed by a `subNsCommitter` |
| 87 | +func (committer *subNsCommitter) execute() error { |
| 88 | + return commitUpdates(committer.db, committer.batchUpdateMap) |
| 89 | +} |
| 90 | + |
| 91 | +// commitUpdates commits the given updates to couchdb |
| 92 | +func commitUpdates(db *couchdb.CouchDatabase, batchUpdateMap map[string]*batchableDocument) error { |
| 93 | + //Add the documents to the batch update array |
| 94 | + batchUpdateDocs := []*couchdb.CouchDoc{} |
| 95 | + for _, updateDocument := range batchUpdateMap { |
| 96 | + batchUpdateDocument := updateDocument |
| 97 | + batchUpdateDocs = append(batchUpdateDocs, &batchUpdateDocument.CouchDoc) |
| 98 | + } |
| 99 | + |
| 100 | + // Do the bulk update into couchdb. Note that this will do retries if the entire bulk update fails or times out |
| 101 | + batchUpdateResp, err := db.BatchUpdateDocuments(batchUpdateDocs) |
| 102 | + if err != nil { |
| 103 | + return err |
| 104 | + } |
| 105 | + // IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY |
| 106 | + // iterate through the response from CouchDB by document |
| 107 | + for _, respDoc := range batchUpdateResp { |
| 108 | + // If the document returned an error, retry the individual document |
| 109 | + if respDoc.Ok != true { |
| 110 | + batchUpdateDocument := batchUpdateMap[respDoc.ID] |
| 111 | + var err error |
| 112 | + //Remove the "_rev" from the JSON before saving |
| 113 | + //this will allow the CouchDB retry logic to retry revisions without encountering |
| 114 | + //a mismatch between the "If-Match" and the "_rev" tag in the JSON |
| 115 | + if batchUpdateDocument.CouchDoc.JSONValue != nil { |
| 116 | + err = removeJSONRevision(&batchUpdateDocument.CouchDoc.JSONValue) |
| 117 | + if err != nil { |
| 118 | + return err |
| 119 | + } |
| 120 | + } |
| 121 | + // Check to see if the document was added to the batch as a delete type document |
| 122 | + if batchUpdateDocument.Deleted { |
| 123 | + logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", respDoc.ID) |
| 124 | + // If this is a deleted document, then retry the delete |
| 125 | + // If the delete fails due to a document not being found (404 error), |
| 126 | + // the document has already been deleted and the DeleteDoc will not return an error |
| 127 | + err = db.DeleteDoc(respDoc.ID, "") |
| 128 | + } else { |
| 129 | + logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", respDoc.ID) |
| 130 | + // Save the individual document to couchdb |
| 131 | + // Note that this will do retries as needed |
| 132 | + _, err = db.SaveDoc(respDoc.ID, "", &batchUpdateDocument.CouchDoc) |
| 133 | + } |
| 134 | + |
| 135 | + // If the single document update or delete returns an error, then throw the error |
| 136 | + if err != nil { |
| 137 | + errorString := fmt.Sprintf("Error occurred while saving document ID = %v Error: %s Reason: %s\n", |
| 138 | + respDoc.ID, respDoc.Error, respDoc.Reason) |
| 139 | + |
| 140 | + logger.Errorf(errorString) |
| 141 | + return fmt.Errorf(errorString) |
| 142 | + } |
| 143 | + } |
| 144 | + } |
| 145 | + return nil |
| 146 | +} |
| 147 | + |
| 148 | +// nsFlusher implements `batch` interface and a batch executes the function `couchdb.EnsureFullCommit()` for the given namespace |
| 149 | +type nsFlusher struct { |
| 150 | + db *couchdb.CouchDatabase |
| 151 | +} |
| 152 | + |
| 153 | +func (vdb *VersionedDB) ensureFullCommit(dbs []*couchdb.CouchDatabase) error { |
| 154 | + var flushers []batch |
| 155 | + for _, db := range dbs { |
| 156 | + flushers = append(flushers, &nsFlusher{db}) |
| 157 | + } |
| 158 | + return executeBatches(flushers) |
| 159 | +} |
| 160 | + |
| 161 | +func (f *nsFlusher) execute() error { |
| 162 | + dbResponse, err := f.db.EnsureFullCommit() |
| 163 | + if err != nil || dbResponse.Ok != true { |
| 164 | + logger.Errorf("Failed to perform full commit\n") |
| 165 | + return errors.New("Failed to perform full commit") |
| 166 | + } |
| 167 | + return nil |
| 168 | +} |
| 169 | + |
| 170 | +func addRevisionsForMissingKeys(revisions map[string]string, db *couchdb.CouchDatabase, nsUpdates map[string]*statedb.VersionedValue) error { |
| 171 | + var missingKeys []string |
| 172 | + for key := range nsUpdates { |
| 173 | + _, ok := revisions[key] |
| 174 | + if !ok { |
| 175 | + missingKeys = append(missingKeys, key) |
| 176 | + } |
| 177 | + } |
| 178 | + retrievedMetadata, err := retrieveNsMetadata(db, missingKeys) |
| 179 | + if err != nil { |
| 180 | + return err |
| 181 | + } |
| 182 | + for _, metadata := range retrievedMetadata { |
| 183 | + revisions[metadata.ID] = revisions[metadata.Rev] |
| 184 | + } |
| 185 | + return nil |
| 186 | +} |
| 187 | + |
| 188 | +//batchableDocument defines a document for a batch |
| 189 | +type batchableDocument struct { |
| 190 | + CouchDoc couchdb.CouchDoc |
| 191 | + Deleted bool |
| 192 | +} |
0 commit comments