Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCF-3052 - Job Based KV Store and juelsFeePerCoin reboot persistence #12392

Merged
merged 22 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add kv store implementation
  • Loading branch information
ilija42 committed Mar 11, 2024
commit 8742a210a5ca8a871ba2493247b6a047f05bc8ea
68 changes: 68 additions & 0 deletions core/services/job/kv_orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package job

import (
"encoding/json"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore

type KVStore interface {
Store(key string, val interface{}) error
Get(key string, dest interface{}) error
}

type jobKVStore struct {
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
jobID int32
q pg.Q
lggr logger.SugaredLogger
cfg pg.QConfig
}

var _ KVStore = (*jobKVStore)(nil)

func NewJobKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) KVStore {
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
namedLogger := logger.Sugared(lggr.Named("JobORM"))
return &jobKVStore{
jobID: jobID,
q: pg.NewQ(db, namedLogger, cfg),
lggr: namedLogger,
}
}

func (kv jobKVStore) Store(key string, val interface{}) error {
jsonVal, err := json.Marshal(val)
if err != nil {
return err
}

sql := `INSERT INTO job_kv_store (id, key, val)
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
VALUES ($1, $2, $3)
ON CONFLICT (id, key) DO UPDATE SET
val = EXCLUDED.val,
updated_at = $4
RETURNING id;`
ilija42 marked this conversation as resolved.
Show resolved Hide resolved

if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err)
}
return nil
}

func (kv jobKVStore) Get(key string, dest interface{}) error {
var ret types.JSONText

sql := "SELECT val FROM job_kv_store WHERE id = $1 AND key = $2"
if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil {
return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
}

return ret.Unmarshal(dest)
}
56 changes: 56 additions & 0 deletions core/services/job/kv_orm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package job_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/directrequest"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
)

func TestJobKVStore_Store(t *testing.T) {
config := configtest.NewTestGeneralConfig(t)
db := pgtest.NewSqlxDB(t)

lggr := logger.TestLogger(t)

pipelineORM := pipeline.NewORM(db, logger.TestLogger(t), config.Database(), config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db, logger.TestLogger(t), config.Database())

jobID := int32(1337)
kvStore := job.NewJobKVStore(jobID, db, config.Database(), lggr)
jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, cltest.NewKeyStore(t, db, config.Database()), config.Database())

jb, err := directrequest.ValidatedDirectRequestSpec(testspecs.GetDirectRequestSpec())
require.NoError(t, err)
jb.ID = jobID
require.NoError(t, jobORM.CreateJob(&jb))

key := "test_key"

type testData struct {
Test string `json:"test"`
}

td1 := testData{Test: "value1"}
td2 := testData{Test: "value2"}

var retData testData
require.NoError(t, kvStore.Store(key, td1))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td1, retData)

require.NoError(t, kvStore.Store(key, td2))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td2, retData)
ilija42 marked this conversation as resolved.
Show resolved Hide resolved

t.Fail()
}