Skip to content

Commit

Permalink
[FAB-3686] CouchDB timeout causes error upon retry
Browse files Browse the repository at this point in the history
If a state db document update to CouchDB has a http timeout
(currently defaulted at 35s), the transaction may eventually
succeed in CouchDB, but the peer will do a retry using the
previous CouchDB document revision number. The retry will
therefore fail due to revision number conflict. The peer will
think it failed, when in fact it succeeded. Peer panics when
it can't process the block to completion, since subsequent
blocks should not be processed. Need to fix retry logic to
account for this scenario, potentially by introducing a higher
level retry that could get the new revision number for a final
attempt.

Implement a retry in the couchdb SaveDoc/DeleteDoc functions to retry
if the return code is 409 (revision conflict).  If
the return code is 409, then retrieve the current revision
for the document ID and retry.   This logic is separate from
the timeout logic in handleRequest is only used for resolving
document revision conflicts caused by CouchDB timeouts.

Change-Id: I2ef79b63397a8f76e295dc7c562f5cc7f86da73d
Signed-off-by: Chris Elder <chris.elder@us.ibm.com>
Signed-off-by: David Enyeart <enyeart@us.ibm.com>
  • Loading branch information
Chris Elder authored and denyeart committed May 30, 2017
1 parent 3079333 commit b3eef4c
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 44 deletions.
123 changes: 79 additions & 44 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error)
func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc) (string, error) {

logger.Debugf("Entering SaveDoc() id=[%s]", id)

if !utf8.ValidString(id) {
return "", fmt.Errorf("doc id [%x] not a valid utf8 string", id)
}
Expand All @@ -479,19 +480,6 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc
// id can contain a '/', so encode separately
saveURL = &url.URL{Opaque: saveURL.String() + "/" + encodePathElement(id)}

if rev == "" {

//See if the document already exists, we need the rev for save
_, revdoc, err2 := dbclient.ReadDoc(id)
if err2 != nil {
//set the revision to indicate that the document was not found
rev = ""
} else {
//set the revision to the rev returned from the document read
rev = revdoc
}
}

logger.Debugf(" rev=%s", rev)

//Set up a buffer for the data to be pushed to couchdb
Expand Down Expand Up @@ -540,9 +528,10 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries

//handle the request for saving the JSON or attachments
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPut, saveURL.String(), data,
rev, defaultBoundary, maxRetries, keepConnectionOpen)
//handle the request for saving document with a retry if there is a revision conflict
resp, _, err := dbclient.handleRequestWithRevisionRetry(id, http.MethodPut,
*saveURL, data, rev, defaultBoundary, maxRetries, keepConnectionOpen)

if err != nil {
return "", err
}
Expand All @@ -560,6 +549,20 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc

}

//getDocumentRevision will return the revision if the document exists, otherwise it will return ""
func (dbclient *CouchDatabase) getDocumentRevision(id string) string {

var rev = ""

//See if the document already exists, we need the rev for saves and deletes
_, revdoc, err := dbclient.ReadDoc(id)
if err == nil {
//set the revision to the rev returned from the document read
rev = revdoc
}
return rev
}

func createAttachmentPart(couchDoc *CouchDoc, defaultBoundary string) (bytes.Buffer, string, error) {

//Create a buffer for writing the result
Expand Down Expand Up @@ -646,7 +649,8 @@ func getRevisionHeader(resp *http.Response) (string, error) {

}

//ReadDoc method provides function to retrieve a document from the database by id
//ReadDoc method provides function to retrieve a document and its revision
//from the database by id
func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
var couchDoc CouchDoc
attachments := []*Attachment{}
Expand Down Expand Up @@ -906,27 +910,14 @@ func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error {
// id can contain a '/', so encode separately
deleteURL = &url.URL{Opaque: deleteURL.String() + "/" + encodePathElement(id)}

if rev == "" {

//See if the document already exists, we need the rev for delete
_, revdoc, err2 := dbclient.ReadDoc(id)
if err2 != nil {
//set the revision to indicate that the document was not found
rev = ""
} else {
//set the revision to the rev returned from the document read
rev = revdoc
}
}

logger.Debugf(" rev=%s", rev)

//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries

resp, couchDBReturn, err := dbclient.CouchInstance.handleRequest(http.MethodDelete, deleteURL.String(), nil, rev, "", maxRetries, true)
//handle the request for saving document with a retry if there is a revision conflict
resp, couchDBReturn, err := dbclient.handleRequestWithRevisionRetry(id, http.MethodDelete,
*deleteURL, nil, "", "", maxRetries, true)

if err != nil {
fmt.Printf("couchDBReturn=%v", couchDBReturn)
if couchDBReturn != nil && couchDBReturn.StatusCode == 404 {
logger.Debug("Document not found (404), returning nil value instead of 404 error")
// non-existent document should return nil value instead of a 404 error
Expand Down Expand Up @@ -1173,9 +1164,52 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B

}

//handleRequestWithRevisionRetry method is a generic http request handler with
//a retry for document revision conflict errors,
//which may be detected during saves or deletes that timed out from client http perspective,
//but which eventually succeeded in couchdb
func (dbclient *CouchDatabase) handleRequestWithRevisionRetry(id, method string, connectURL url.URL, data []byte, rev string,
multipartBoundary string, maxRetries int, keepConnectionOpen bool) (*http.Response, *DBReturn, error) {

//Initialize a flag for the revsion conflict
revisionConflictDetected := false
var resp *http.Response
var couchDBReturn *DBReturn
var errResp error

//attempt the http request for the max number of retries
//In this case, the retry is to catch problems where a client timeout may miss a
//successful CouchDB update and cause a document revision conflict on a retry in handleRequest
for attempts := 0; attempts < maxRetries; attempts++ {

//if the revision was not passed in, or if a revision conflict is detected on prior attempt,
//query CouchDB for the document revision
if rev == "" || revisionConflictDetected {
rev = dbclient.getDocumentRevision(id)
}

//handle the request for saving/deleting the couchdb data
resp, couchDBReturn, errResp = dbclient.CouchInstance.handleRequest(method, connectURL.String(),
data, rev, multipartBoundary, maxRetries, keepConnectionOpen)

//If there was a 409 conflict error during the save/delete, log it and retry it.
//Otherwise, break out of the retry loop
if couchDBReturn != nil && couchDBReturn.StatusCode == 409 {
logger.Warningf("CouchDB document revision conflict detected, retrying. Attempt:%v", attempts+1)
revisionConflictDetected = true
} else {
break
}
}

// return the handleRequest results
return resp, couchDBReturn, errResp
}

//handleRequest method is a generic http request handler.
// if it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly
// If it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly.
// Any http error or CouchDB error (4XX or 500) will result in a golang error getting returned
func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data []byte, rev string,
multipartBoundary string, maxRetries int, keepConnectionOpen bool) (*http.Response, *DBReturn, error) {

Expand Down Expand Up @@ -1251,20 +1285,20 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//Execute http request
resp, errResp = couchInstance.client.Do(req)

//if an error is not detected then drop out of the retry
//if there is no golang http error and no CouchDB 500 error, then drop out of the retry
if errResp == nil && resp != nil && resp.StatusCode < 500 {
break
}

//if this is an error, record the retry error, else this is a 500 error
//if this is an unexpected golang http error, log the error and retry
if errResp != nil {

//Log the error with the retry count and continue
logger.Warningf("Retrying couchdb request in %s. Attempt:%v Error:%v",
waitDuration.String(), attempts+1, errResp.Error())

//otherwise this is an unexpected 500 error from CouchDB. Log the error and retry.
} else {

//Read the response body and close it for next attempt
jsonError, err := ioutil.ReadAll(resp.Body)
closeResponseBody(resp)
Expand All @@ -1288,18 +1322,19 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//backoff, doubling the retry time for next attempt
waitDuration *= 2

}
} // end retry loop

//if the error present, return the error
//if a golang http error is still present after retries are exhausted, return the error
if errResp != nil {
return nil, nil, errResp
}

//set the return code for the couchDB request
couchDBReturn.StatusCode = resp.StatusCode

//check to see if the status code is 400 or higher
//response codes 4XX and 500 will be treated as errors
//check to see if the status code from couchdb is 400 or higher
//response codes 4XX and 500 will be treated as errors -
//golang error will be created from the couchDBReturn contents and both will be returned
if resp.StatusCode >= 400 {
// close the response before returning error
defer closeResponseBody(resp)
Expand All @@ -1325,7 +1360,7 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat

logger.Debugf("Exiting handleRequest()")

//If no errors, then return the results
//If no errors, then return the http response and the couchdb return object
return resp, couchDBReturn, nil
}

Expand Down
48 changes: 48 additions & 0 deletions core/ledger/util/couchdb/couchdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,54 @@ func TestDBRequestTimeout(t *testing.T) {
}
}

func TestDBTimeoutConflictRetry(t *testing.T) {

if ledgerconfig.IsCouchDBEnabled() {

database := "testdbtimeoutretry"
err := cleanup(database)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to cleanup Error: %s", err))
defer cleanup(database)

// if there was an error upon cleanup, return here
if err != nil {
return
}

//create a new instance and database object
couchInstance, err := CreateCouchInstance(couchDBDef.URL, couchDBDef.Username, couchDBDef.Password,
couchDBDef.MaxRetries, 3, couchDBDef.RequestTimeout)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create couch instance"))
db := CouchDatabase{CouchInstance: *couchInstance, DBName: database}

//create a new database
_, errdb := db.CreateDatabaseIfNotExist()
testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to create database"))

//Retrieve the info for the new database and make sure the name matches
dbResp, _, errdb := db.GetDatabaseInfo()
testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to retrieve database information"))
testutil.AssertEquals(t, dbResp.DbName, database)

//Save the test document
_, saveerr := db.SaveDoc("1", "", &CouchDoc{JSONValue: assetJSON, Attachments: nil})
testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save a document"))

//Retrieve the test document
_, _, geterr := db.ReadDoc("1")
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document"))

//Save the test document with an invalid rev. This should cause a retry
_, saveerr = db.SaveDoc("1", "1-11111111111111111111111111111111", &CouchDoc{JSONValue: assetJSON, Attachments: nil})
testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save a document with a revision conflict"))

//Delete the test document with an invalid rev. This should cause a retry
deleteerr := db.DeleteDoc("1", "1-11111111111111111111111111111111")
testutil.AssertNoError(t, deleteerr, fmt.Sprintf("Error when trying to delete a document with a revision conflict"))

}
}

func TestDBBadJSON(t *testing.T) {

if ledgerconfig.IsCouchDBEnabled() {
Expand Down

0 comments on commit b3eef4c

Please sign in to comment.