Skip to content

Commit

Permalink
algod importer: Update sync on WaitForBlock error. (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Jul 21, 2023
1 parent bddfb21 commit f2711fa
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 70 deletions.
167 changes: 113 additions & 54 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
_ "embed" // used to embed config
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -40,9 +41,8 @@ const (
followerMode
)

// Retry
const (
retries = 5
var (
waitForRoundTimeout = 5 * time.Second
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand Down Expand Up @@ -320,11 +320,14 @@ func (algodImp *algodImporter) catchupNode(network string, targetRound uint64) e
}
}

_, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx)
status, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.catchupNode() called StatusAfterBlock(targetRound=%d) err: %v", targetRound, err)
if err != nil {
err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err)
}
if status.LastRound < targetRound {
err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round %d != %d", status.LastRound, targetRound)
}
return err
}

Expand Down Expand Up @@ -414,69 +417,125 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error
return delta, nil
}

func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
var blockbytes []byte
var err error
var status models.NodeStatus
var blk data.BlockData
// SyncError is used to indicate algod and conduit are not synchronized.
type SyncError struct {
rnd uint64
expected uint64
}

for r := 0; r < retries; r++ {
status, err = algodImp.aclient.StatusAfterBlock(rnd - 1).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.GetBlock() called StatusAfterBlock(%d) err: %v", rnd-1, err)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
err = fmt.Errorf("error getting status for round: %w", err)
algodImp.logger.Errorf("error getting status for round %d (attempt %d): %s", rnd, r, err.Error())
continue
func (e *SyncError) Error() string {
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
}

func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
ctxWithTimeout, cf := context.WithTimeout(ctx, to)
defer cf()
status, err := c.StatusAfterBlock(rnd - 1).Do(ctxWithTimeout)
l.Tracef("importer algod.waitForRoundWithTimeout() called StatusAfterBlock(%d) err: %v", rnd-1, err)

if err == nil {
// When c.StatusAfterBlock has a server-side timeout it returns the current status.
// We use a context with timeout and the algod default timeout is 1 minute, so technically
// with the current versions, this check should never be required.
if rnd <= status.LastRound {
return status.LastRound, nil
}
start := time.Now()
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.GetBlock() called BlockRaw(%d) err: %v", rnd, err)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d (attempt %d): %s", rnd, r, err.Error())
continue
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
status2, err2 := c.Status().Do(ctx)
l.Tracef("importer algod.waitForRoundWithTimeout() called Status() err: %v", err2)
if err2 != nil {
// If there was an error getting status, return the original error.
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
}
if status2.LastRound < rnd {
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
}

// This is probably a connection error, not a SyncError.
return 0, fmt.Errorf("unknown errors: StatusAfterBlock(%w), Status(%w)", err, err2)
}

func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) {
var blockbytes []byte
var blk data.BlockData

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
start := time.Now()
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.GetBlock() called BlockRaw(%d) err: %v", rnd, err)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

blk.BlockHeader = tmpBlk.Block.BlockHeader
blk.Payset = tmpBlk.Block.Payset
blk.Certificate = tmpBlk.Cert

blk.BlockHeader = tmpBlk.Block.BlockHeader
blk.Payset = tmpBlk.Block.Payset
blk.Certificate = tmpBlk.Cert

if algodImp.mode == followerMode {
// Round 0 has no delta associated with it
if rnd != 0 {
var delta sdk.LedgerStateDelta
delta, err = algodImp.getDelta(rnd)
if err != nil {
if status.LastRound < rnd {
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", status.LastRound, rnd, err)
} else {
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", status.LastRound, rnd, err)
}
algodImp.logger.Error(err.Error())
return data.BlockData{}, err
if algodImp.mode == followerMode {
// Round 0 has no delta associated with it
if rnd != 0 {
var delta sdk.LedgerStateDelta
delta, err = algodImp.getDelta(rnd)
if err != nil {
if nodeRound < rnd {
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)
} else {
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
}
blk.Delta = &delta
algodImp.logger.Error(err.Error())
return data.BlockData{}, err
}
blk.Delta = &delta
}

return blk, err
}

err = fmt.Errorf("failed to get block for round %d after %d attempts, check node configuration: %s", rnd, retries, err)
algodImp.logger.Errorf(err.Error())
return blk, err
}

func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
blk, err := algodImp.getBlockInner(rnd)

if err != nil {
target := &SyncError{}
if errors.As(err, &target) {
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
} else {
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
algodImp.logger.Errorf(err.Error())
}
return data.BlockData{}, err
}

return blk, nil

}

func (algodImp *algodImporter) ProvideMetrics(subsystem string) []prometheus.Collector {
getAlgodRawBlockTimeSeconds = initGetAlgodRawBlockTimeSeconds(subsystem)
return []prometheus.Collector{
Expand Down
66 changes: 51 additions & 15 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package algodimporter
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
Expand Down Expand Up @@ -277,10 +277,11 @@ func TestInitCatchup(t *testing.T) {
algodServer: NewAlgodServer(
GenesisResponder,
MakePostSyncRoundResponder(http.StatusOK),
MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
MakeJsonResponderSeries("/v2/status", []int{http.StatusOK, http.StatusOK, http.StatusBadRequest}, []interface{}{models.NodeStatus{LastRound: 1235}}),
MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, nil)),
netAddr: "",
errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: HTTP 400",
errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round",
errGetGen: "",
logs: []string{},
},
Expand All @@ -291,6 +292,7 @@ func TestInitCatchup(t *testing.T) {
catchpoint: "1236#abcd",
algodServer: NewAlgodServer(
GenesisResponder,
MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
MakePostSyncRoundResponder(http.StatusOK),
MakeJsonResponderSeries("/v2/status", []int{http.StatusOK}, []interface{}{
models.NodeStatus{LastRound: 1235},
Expand All @@ -302,6 +304,33 @@ func TestInitCatchup(t *testing.T) {
}),
MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, "")),
netAddr: "",
errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round",
errGetGen: "",
logs: []string{
"catchup phase Processed Accounts: 1 / 1",
"catchup phase Verified Accounts: 1 / 1",
"catchup phase Acquired Blocks: 1 / 1",
"catchup phase Verified Blocks",
}},
{
name: "monitor catchup success",
adminToken: "admin",
targetRound: 1237,
catchpoint: "1236#abcd",
algodServer: NewAlgodServer(
GenesisResponder,
MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
MakePostSyncRoundResponder(http.StatusOK),
MakeJsonResponderSeries("/v2/status", []int{http.StatusOK}, []interface{}{
models.NodeStatus{LastRound: 1235},
models.NodeStatus{Catchpoint: "1236#abcd", CatchpointProcessedAccounts: 1, CatchpointTotalAccounts: 1},
models.NodeStatus{Catchpoint: "1236#abcd", CatchpointVerifiedAccounts: 1, CatchpointTotalAccounts: 1},
models.NodeStatus{Catchpoint: "1236#abcd", CatchpointAcquiredBlocks: 1, CatchpointTotalBlocks: 1},
models.NodeStatus{Catchpoint: "1236#abcd"},
models.NodeStatus{LastRound: 1237}, // this is the only difference from the previous test
}),
MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, "")),
netAddr: "",
errInit: "",
errGetGen: "",
logs: []string{
Expand Down Expand Up @@ -581,6 +610,10 @@ netaddr: %s
}

func TestGetBlockFailure(t *testing.T) {
// Note: There are panics in the log because the init function in these tests calls the
// delta endpoint and causes a panic in most cases. This causes the "needs catchup"
// function to send out a sync request at which point logic continues as normal and
// the GetBlock function is able to run for the test.
tests := []struct {
name string
algodServer *httptest.Server
Expand Down Expand Up @@ -634,6 +667,7 @@ func TestAlgodImporter_ProvideMetrics(t *testing.T) {
}

func TestGetBlockErrors(t *testing.T) {
waitForRoundTimeout = time.Hour
testcases := []struct {
name string
rnd uint64
Expand All @@ -644,28 +678,29 @@ func TestGetBlockErrors(t *testing.T) {
err string
}{
{
name: "Cannot get status",
name: "Cannot wait for block",
rnd: 123,
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{}}),
err: fmt.Sprintf("error getting status for round"),
logs: []string{"error getting status for round 123", "failed to get block for round 123 "},
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}),
err: fmt.Sprintf("error getting block for round 123"),
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get block",
rnd: 123,
blockAfterResponder: BlockAfterResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""),
err: fmt.Sprintf("failed to get block"),
logs: []string{"error getting block for round 123", "failed to get block for round 123 "},
err: fmt.Sprintf("error getting block for round 123"),
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get delta (node behind)",
name: "Cannot get delta (node behind, re-send sync)",
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("ledger state delta not found: node round (50) is behind required round (200)"),
logs: []string{"ledger state delta not found: node round (50) is behind required round (200)"},
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
},
{
name: "Cannot get delta (caught up)",
Expand Down Expand Up @@ -721,6 +756,7 @@ func TestGetBlockErrors(t *testing.T) {
for _, log := range tc.logs {
found := false
for _, entry := range hook.AllEntries() {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
}
noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log)
Expand Down
12 changes: 11 additions & 1 deletion conduit/plugins/importers/algod/mock_algod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,17 @@ func MakeNodeStatusResponder(status models.NodeStatus) algodCustomHandler {
return MakeJsonResponder("/v2/status", status)
}

var BlockAfterResponder = MakeBlockAfterResponder(models.NodeStatus{})
// BlockAfterResponder handles /v2/status requests and returns a NodeStatus object with the provided last round
func BlockAfterResponder(r *http.Request, w http.ResponseWriter) bool {
if strings.Contains(r.URL.Path, "/wait-for-block-after") {
rnd, _ := strconv.Atoi(path.Base(r.URL.Path))
w.WriteHeader(http.StatusOK)
status := models.NodeStatus{LastRound: uint64(rnd + 1)}
_, _ = w.Write(json.Encode(status))
return true
}
return false
}

func MakeLedgerStateDeltaResponder(delta types.LedgerStateDelta) algodCustomHandler {
return MakeMsgpResponder("/v2/deltas/", delta)
Expand Down

0 comments on commit f2711fa

Please sign in to comment.