Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCF-3052 - Job Based KV Store and juelsFeePerCoin reboot persistence #12392

Merged
merged 22 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add ds cache test for cache value persistence
  • Loading branch information
ilija42 committed Mar 12, 2024
commit 4d65dac5589e20e7ab1193c8d99e83e5a05cd1aa
10 changes: 6 additions & 4 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
Expand Down Expand Up @@ -256,7 +257,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.latestUpdateErr = latestUpdateErr
// raise log severity
if previousUpdateErr != nil {
ds.lggr.Errorf("consecutive cache updates errored: previous err: %w new err: %w", previousUpdateErr, ds.latestUpdateErr)
ds.lggr.Errorf("consecutive cache updates errored: previous err: %s new err: %s", previousUpdateErr.Error(), ds.latestUpdateErr.Error())
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
}
Expand All @@ -269,7 +270,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
}

// backup in case data source fails continuously and node gets rebooted
if err = ds.kvStore.Store(dataSourceCacheKey, value); err != nil {
if err = ds.kvStore.Store(dataSourceCacheKey, serializablebig.New(value)); err != nil {
ds.lggr.Errorf("failed to persist latest task run value", err)
}

Expand All @@ -294,11 +295,12 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
return ds.latestResult, ds.latestTrrs
}

func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (val *big.Int, err error) {
func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) {
var val serializablebig.Big
latestResult, latestTrrs := ds.get(ctx)
if latestTrrs == nil {
ds.lggr.Errorf("cache is empty, returning persisted value now")
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
return val, ds.kvStore.Get(dataSourceCacheKey, val)
return val.ToInt(), ds.kvStore.Get(dataSourceCacheKey, &val)
}

setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{
Expand Down
92 changes: 69 additions & 23 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/job/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks"
Expand Down Expand Up @@ -47,13 +49,8 @@
}

func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
runner := pipelinemocks.NewRunner(t)

ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, time.Second*2)
require.NoError(t, err)

changeResultValue := func(value string, returnErr, once bool) {
changeResultValue := func(runner *pipelinemocks.Runner, value string, returnErr, once bool) {
result := pipeline.Result{
Value: value,
Error: nil,
Expand All @@ -74,23 +71,72 @@
call.Once()
}
}

mockVal := int64(1)
// Test if Observe notices that cache updater failed and can refresh the cache on its own
// 1. Set initial value
changeResultValue(fmt.Sprint(mockVal), false, true)
time.Sleep(time.Millisecond * 100)
val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, mockVal, val.Int64())
// 2. Set values again, but make it error in updater
changeResultValue(fmt.Sprint(mockVal+1), true, true)
time.Sleep(time.Second*2 + time.Millisecond*100)
// 3. Set value in between updates and call Observe (shouldn't flake because of huge wait time)
changeResultValue(fmt.Sprint(mockVal+2), false, false)
val, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, mockVal+2, val.Int64())
t.Run("test normal cache updater fail recovery", func(t *testing.T) {
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))
mockKVStore := mocks.KVStore{}

Check failure on line 77 in core/services/ocrcommon/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: mocks.KVStore
mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)

mockVal := int64(1)
// Test if Observe notices that cache updater failed and can refresh the cache on its own
// 1. Set initial value
changeResultValue(runner, fmt.Sprint(mockVal), false, true)
time.Sleep(time.Millisecond * 100)
val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, mockVal, val.Int64())
// 2. Set values again, but make it error in updater
changeResultValue(runner, fmt.Sprint(mockVal+1), true, true)
time.Sleep(time.Second*2 + time.Millisecond*100)
// 3. Set value in between updates and call Observe (shouldn't flake because of huge wait time)
changeResultValue(runner, fmt.Sprint(mockVal+2), false, false)
val, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, mockVal+2, val.Int64())
})

t.Run("test total updater fail with persisted value recovery", func(t *testing.T) {
persistedVal := big.NewInt(1337)
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}

Check failure on line 106 in core/services/ocrcommon/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: mocks.KVStore
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*serializablebig.Big)
arg.ToInt().Set(persistedVal)
})

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)

time.Sleep(time.Millisecond * 100)
val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, persistedVal.String(), val.String())

})

t.Run("test total updater fail with no persisted value ", func(t *testing.T) {
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}

Check failure on line 128 in core/services/ocrcommon/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

undefined: mocks.KVStore
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Return(assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)

time.Sleep(time.Millisecond * 100)
_, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.Error(t, err)
})
}

func Test_InMemoryDataSourceWithProm(t *testing.T) {
Expand Down
Loading