diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index 7c2b579a..f705428a 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -4,6 +4,7 @@ import ( "bufio" "context" _ "embed" // used to embed config + "errors" "fmt" "io" "net/http" @@ -40,9 +41,8 @@ const ( followerMode ) -// Retry -const ( - retries = 5 +var ( + waitForRoundTimeout = 5 * time.Second ) const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt" @@ -320,11 +320,14 @@ func (algodImp *algodImporter) catchupNode(network string, targetRound uint64) e } } - _, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx) + status, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx) algodImp.logger.Tracef("importer algod.catchupNode() called StatusAfterBlock(targetRound=%d) err: %v", targetRound, err) if err != nil { err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err) } + if status.LastRound < targetRound { + err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round %d != %d", status.LastRound, targetRound) + } return err } @@ -414,69 +417,125 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error return delta, nil } -func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) { - var blockbytes []byte - var err error - var status models.NodeStatus - var blk data.BlockData +// SyncError is used to indicate algod and conduit are not synchronized. +type SyncError struct { + rnd uint64 + expected uint64 +} - for r := 0; r < retries; r++ { - status, err = algodImp.aclient.StatusAfterBlock(rnd - 1).Do(algodImp.ctx) - algodImp.logger.Tracef("importer algod.GetBlock() called StatusAfterBlock(%d) err: %v", rnd-1, err) - if err != nil { - // If context has expired. - if algodImp.ctx.Err() != nil { - return blk, fmt.Errorf("GetBlock ctx error: %w", err) - } - err = fmt.Errorf("error getting status for round: %w", err) - algodImp.logger.Errorf("error getting status for round %d (attempt %d): %s", rnd, r, err.Error()) - continue +func (e *SyncError) Error() string { + return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected) +} + +func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) { + ctxWithTimeout, cf := context.WithTimeout(ctx, to) + defer cf() + status, err := c.StatusAfterBlock(rnd - 1).Do(ctxWithTimeout) + l.Tracef("importer algod.waitForRoundWithTimeout() called StatusAfterBlock(%d) err: %v", rnd-1, err) + + if err == nil { + // When c.StatusAfterBlock has a server-side timeout it returns the current status. + // We use a context with timeout and the algod default timeout is 1 minute, so technically + // with the current versions, this check should never be required. + if rnd <= status.LastRound { + return status.LastRound, nil } - start := time.Now() - blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx) - algodImp.logger.Tracef("importer algod.GetBlock() called BlockRaw(%d) err: %v", rnd, err) - dt := time.Since(start) - getAlgodRawBlockTimeSeconds.Observe(dt.Seconds()) - if err != nil { - algodImp.logger.Errorf("error getting block for round %d (attempt %d): %s", rnd, r, err.Error()) - continue + return 0, &SyncError{ + rnd: status.LastRound, + expected: rnd, } - tmpBlk := new(models.BlockResponse) - err = msgpack.Decode(blockbytes, tmpBlk) - if err != nil { - return blk, err + } + + // If there was a different error and the node is responsive, call status before returning a SyncError. + status2, err2 := c.Status().Do(ctx) + l.Tracef("importer algod.waitForRoundWithTimeout() called Status() err: %v", err2) + if err2 != nil { + // If there was an error getting status, return the original error. + return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2)) + } + if status2.LastRound < rnd { + return 0, &SyncError{ + rnd: status.LastRound, + expected: rnd, + } + } + + // This is probably a connection error, not a SyncError. + return 0, fmt.Errorf("unknown errors: StatusAfterBlock(%w), Status(%w)", err, err2) +} + +func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) { + var blockbytes []byte + var blk data.BlockData + + nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout) + if err != nil { + // If context has expired. + if algodImp.ctx.Err() != nil { + return blk, fmt.Errorf("GetBlock ctx error: %w", err) } + algodImp.logger.Errorf(err.Error()) + return data.BlockData{}, err + } + start := time.Now() + blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx) + algodImp.logger.Tracef("importer algod.GetBlock() called BlockRaw(%d) err: %v", rnd, err) + dt := time.Since(start) + getAlgodRawBlockTimeSeconds.Observe(dt.Seconds()) + if err != nil { + algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error()) + return data.BlockData{}, err + } + tmpBlk := new(models.BlockResponse) + err = msgpack.Decode(blockbytes, tmpBlk) + if err != nil { + return blk, err + } + + blk.BlockHeader = tmpBlk.Block.BlockHeader + blk.Payset = tmpBlk.Block.Payset + blk.Certificate = tmpBlk.Cert - blk.BlockHeader = tmpBlk.Block.BlockHeader - blk.Payset = tmpBlk.Block.Payset - blk.Certificate = tmpBlk.Cert - - if algodImp.mode == followerMode { - // Round 0 has no delta associated with it - if rnd != 0 { - var delta sdk.LedgerStateDelta - delta, err = algodImp.getDelta(rnd) - if err != nil { - if status.LastRound < rnd { - err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", status.LastRound, rnd, err) - } else { - err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", status.LastRound, rnd, err) - } - algodImp.logger.Error(err.Error()) - return data.BlockData{}, err + if algodImp.mode == followerMode { + // Round 0 has no delta associated with it + if rnd != 0 { + var delta sdk.LedgerStateDelta + delta, err = algodImp.getDelta(rnd) + if err != nil { + if nodeRound < rnd { + err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err) + } else { + err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err) } - blk.Delta = &delta + algodImp.logger.Error(err.Error()) + return data.BlockData{}, err } + blk.Delta = &delta } - - return blk, err } - err = fmt.Errorf("failed to get block for round %d after %d attempts, check node configuration: %s", rnd, retries, err) - algodImp.logger.Errorf(err.Error()) return blk, err } +func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) { + blk, err := algodImp.getBlockInner(rnd) + + if err != nil { + target := &SyncError{} + if errors.As(err, &target) { + algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error()) + _, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx) + } else { + err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err) + algodImp.logger.Errorf(err.Error()) + } + return data.BlockData{}, err + } + + return blk, nil + +} + func (algodImp *algodImporter) ProvideMetrics(subsystem string) []prometheus.Collector { getAlgodRawBlockTimeSeconds = initGetAlgodRawBlockTimeSeconds(subsystem) return []prometheus.Collector{ diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index c6308e32..49dc3fd7 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -3,16 +3,16 @@ package algodimporter import ( "context" "fmt" - "net/http" - "net/http/httptest" - "strings" - "testing" - "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" "github.com/algorand/go-algorand-sdk/v2/client/v2/algod" "github.com/algorand/go-algorand-sdk/v2/client/v2/common/models" @@ -277,10 +277,11 @@ func TestInitCatchup(t *testing.T) { algodServer: NewAlgodServer( GenesisResponder, MakePostSyncRoundResponder(http.StatusOK), + MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), MakeJsonResponderSeries("/v2/status", []int{http.StatusOK, http.StatusOK, http.StatusBadRequest}, []interface{}{models.NodeStatus{LastRound: 1235}}), MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, nil)), netAddr: "", - errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: HTTP 400", + errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round", errGetGen: "", logs: []string{}, }, @@ -291,6 +292,7 @@ func TestInitCatchup(t *testing.T) { catchpoint: "1236#abcd", algodServer: NewAlgodServer( GenesisResponder, + MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), MakePostSyncRoundResponder(http.StatusOK), MakeJsonResponderSeries("/v2/status", []int{http.StatusOK}, []interface{}{ models.NodeStatus{LastRound: 1235}, @@ -302,6 +304,33 @@ func TestInitCatchup(t *testing.T) { }), MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, "")), netAddr: "", + errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round", + errGetGen: "", + logs: []string{ + "catchup phase Processed Accounts: 1 / 1", + "catchup phase Verified Accounts: 1 / 1", + "catchup phase Acquired Blocks: 1 / 1", + "catchup phase Verified Blocks", + }}, + { + name: "monitor catchup success", + adminToken: "admin", + targetRound: 1237, + catchpoint: "1236#abcd", + algodServer: NewAlgodServer( + GenesisResponder, + MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), + MakePostSyncRoundResponder(http.StatusOK), + MakeJsonResponderSeries("/v2/status", []int{http.StatusOK}, []interface{}{ + models.NodeStatus{LastRound: 1235}, + models.NodeStatus{Catchpoint: "1236#abcd", CatchpointProcessedAccounts: 1, CatchpointTotalAccounts: 1}, + models.NodeStatus{Catchpoint: "1236#abcd", CatchpointVerifiedAccounts: 1, CatchpointTotalAccounts: 1}, + models.NodeStatus{Catchpoint: "1236#abcd", CatchpointAcquiredBlocks: 1, CatchpointTotalBlocks: 1}, + models.NodeStatus{Catchpoint: "1236#abcd"}, + models.NodeStatus{LastRound: 1237}, // this is the only difference from the previous test + }), + MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, "")), + netAddr: "", errInit: "", errGetGen: "", logs: []string{ @@ -581,6 +610,10 @@ netaddr: %s } func TestGetBlockFailure(t *testing.T) { + // Note: There are panics in the log because the init function in these tests calls the + // delta endpoint and causes a panic in most cases. This causes the "needs catchup" + // function to send out a sync request at which point logic continues as normal and + // the GetBlock function is able to run for the test. tests := []struct { name string algodServer *httptest.Server @@ -634,6 +667,7 @@ func TestAlgodImporter_ProvideMetrics(t *testing.T) { } func TestGetBlockErrors(t *testing.T) { + waitForRoundTimeout = time.Hour testcases := []struct { name string rnd uint64 @@ -644,28 +678,29 @@ func TestGetBlockErrors(t *testing.T) { err string }{ { - name: "Cannot get status", + name: "Cannot wait for block", rnd: 123, - blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{}}), - err: fmt.Sprintf("error getting status for round"), - logs: []string{"error getting status for round 123", "failed to get block for round 123 "}, + blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}), + err: fmt.Sprintf("error getting block for round 123"), + logs: []string{"error getting block for round 123"}, }, { name: "Cannot get block", rnd: 123, blockAfterResponder: BlockAfterResponder, + deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""), - err: fmt.Sprintf("failed to get block"), - logs: []string{"error getting block for round 123", "failed to get block for round 123 "}, + err: fmt.Sprintf("error getting block for round 123"), + logs: []string{"error getting block for round 123"}, }, { - name: "Cannot get delta (node behind)", + name: "Cannot get delta (node behind, re-send sync)", rnd: 200, blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}), blockResponder: BlockResponder, deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""), - err: fmt.Sprintf("ledger state delta not found: node round (50) is behind required round (200)"), - logs: []string{"ledger state delta not found: node round (50) is behind required round (200)"}, + err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"), + logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"}, }, { name: "Cannot get delta (caught up)", @@ -721,6 +756,7 @@ func TestGetBlockErrors(t *testing.T) { for _, log := range tc.logs { found := false for _, entry := range hook.AllEntries() { + fmt.Println(strings.Contains(entry.Message, log)) found = found || strings.Contains(entry.Message, log) } noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log) diff --git a/conduit/plugins/importers/algod/mock_algod_test.go b/conduit/plugins/importers/algod/mock_algod_test.go index f3b94c93..f5b5bed5 100644 --- a/conduit/plugins/importers/algod/mock_algod_test.go +++ b/conduit/plugins/importers/algod/mock_algod_test.go @@ -107,7 +107,17 @@ func MakeNodeStatusResponder(status models.NodeStatus) algodCustomHandler { return MakeJsonResponder("/v2/status", status) } -var BlockAfterResponder = MakeBlockAfterResponder(models.NodeStatus{}) +// BlockAfterResponder handles /v2/status requests and returns a NodeStatus object with the provided last round +func BlockAfterResponder(r *http.Request, w http.ResponseWriter) bool { + if strings.Contains(r.URL.Path, "/wait-for-block-after") { + rnd, _ := strconv.Atoi(path.Base(r.URL.Path)) + w.WriteHeader(http.StatusOK) + status := models.NodeStatus{LastRound: uint64(rnd + 1)} + _, _ = w.Write(json.Encode(status)) + return true + } + return false +} func MakeLedgerStateDeltaResponder(delta types.LedgerStateDelta) algodCustomHandler { return MakeMsgpResponder("/v2/deltas/", delta)