Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/actions/run-load-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,15 @@ runs:
JD_IMAGE: ${{ inputs.jd-image }}
run: |
cd cmd/ccv && go install . && cd -
ccv u ${{ inputs.env_name }} && ccv obs up
ccv u ${{ inputs.env_name }}.toml && ccv obs up --mode full
- name: Run load tests
id: load_test
shell: bash
working-directory: build/devenv/tests/e2e
env:
LOAD_TEST_OUT_FILE: ../../${{ inputs.env_name }}-out.toml
PROM_URL: http://localhost:9099
run: |
set -o pipefail
LOAD_TEST_OUT_FILE=../../${{ inputs.env_name_out }} go test -v -timeout ${{ inputs.timeout }} -count=1 -run 'TestE2ELoad/${{ inputs.subtest }}'
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test-load-nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ jobs:
test:
- subtest: rpc_latency
timeout: 1h10m
- subtest: gas
timeout: 10m
- subtest: burst
timeout: 10m
steps:
- name: Enable S3 Cache for Self-Hosted Runners
uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # v2.0.3
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- +goose Up
-- +goose StatementBegin

-- Add source_chain_block_timestamp column to commit_verification_records
-- This represents the timestamp when the message was included in a source chain block
-- Default to current time in milliseconds for existing rows
ALTER TABLE commit_verification_records
ADD COLUMN source_chain_block_timestamp TIMESTAMP NOT NULL DEFAULT now();

-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin

ALTER TABLE commit_verification_records
DROP COLUMN source_chain_block_timestamp;

-- +goose StatementEnd
4 changes: 4 additions & 0 deletions aggregator/pkg/model/commit_aggregated_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (c *CommitAggregatedReport) GetMessageExecutorAddress() protocol.UnknownAdd
return c.Verifications[0].MessageExecutorAddress
}

func (c *CommitAggregatedReport) GetSourceChainBlockTimestamp() time.Time {
return c.Verifications[0].SourceChainBlockTimestamp
}

// It is assumed that all verifications in the report have the same message since otherwise the message ID would not match.
func (c *CommitAggregatedReport) GetProtoMessage() *pb.Message {
if len(c.Verifications) > 0 && c.Verifications[0].Message != nil {
Expand Down
17 changes: 9 additions & 8 deletions aggregator/pkg/model/commit_verification_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ func (c CommitVerificationRecordIdentifier) ToIdentifier() string {

// CommitVerificationRecord represents a record of a commit verification.
type CommitVerificationRecord struct {
MessageID MessageID
Message *protocol.Message
CCVVersion []byte
Signature []byte
MessageCCVAddresses []protocol.UnknownAddress
MessageExecutorAddress protocol.UnknownAddress
IdentifierSigner *IdentifierSigner
createdAt time.Time // Internal field for tracking creation time from DB
MessageID MessageID
Message *protocol.Message
CCVVersion []byte
Signature []byte
MessageCCVAddresses []protocol.UnknownAddress
MessageExecutorAddress protocol.UnknownAddress
SourceChainBlockTimestamp time.Time // Timestamp when message was included in source chain block (milliseconds)
IdentifierSigner *IdentifierSigner
createdAt time.Time // Internal field for tracking creation time from DB
}

// GetID retrieves the unique identifier for the commit verification record.
Expand Down
25 changes: 14 additions & 11 deletions aggregator/pkg/model/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ func MapAggregatedReportToCCVDataProto(report *CommitAggregatedReport, c *Commit
MessageExecutorAddress: []byte(report.GetMessageExecutorAddress()),
CcvData: ccvData,
Metadata: &pb.VerifierResultMetadata{
Timestamp: timeToTimestampMillis(report.WrittenAt),
VerifierSourceAddress: quorumConfig.GetSourceVerifierAddressBytes(),
VerifierDestAddress: quorumConfig.GetDestVerifierAddressBytes(),
Timestamp: timeToTimestampMillis(report.WrittenAt),
VerifierSourceAddress: quorumConfig.GetSourceVerifierAddressBytes(),
VerifierDestAddress: quorumConfig.GetDestVerifierAddressBytes(),
SourceChainBlockTimestamp: report.GetSourceChainBlockTimestamp().UnixMilli(),
},
}, nil
}
Expand All @@ -118,10 +119,11 @@ func CommitVerificationRecordFromProto(proto *pb.CommitteeVerifierNodeResult) (*
}

record := &CommitVerificationRecord{
CCVVersion: proto.CcvVersion,
Signature: proto.Signature,
MessageCCVAddresses: ccvAddresses,
MessageExecutorAddress: protocol.UnknownAddress(proto.ExecutorAddress),
CCVVersion: proto.CcvVersion,
Signature: proto.Signature,
MessageCCVAddresses: ccvAddresses,
MessageExecutorAddress: protocol.UnknownAddress(proto.ExecutorAddress),
SourceChainBlockTimestamp: time.UnixMilli(proto.SourceChainBlockTimestamp),
}
record.SetTimestampFromMillis(time.Now().UnixMilli())

Expand Down Expand Up @@ -152,10 +154,11 @@ func CommitVerificationRecordToProto(record *CommitVerificationRecord) *pb.Commi
}

proto := &pb.CommitteeVerifierNodeResult{
CcvVersion: record.CCVVersion,
Signature: record.Signature,
CcvAddresses: ccvAddresses,
ExecutorAddress: []byte(record.MessageExecutorAddress),
CcvVersion: record.CCVVersion,
Signature: record.Signature,
CcvAddresses: ccvAddresses,
ExecutorAddress: []byte(record.MessageExecutorAddress),
SourceChainBlockTimestamp: record.SourceChainBlockTimestamp.UnixMilli(),
}

if record.Message != nil {
Expand Down
9 changes: 7 additions & 2 deletions aggregator/pkg/storage/postgres/database_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func (d *DatabaseStorage) SaveCommitVerification(ctx context.Context, record *mo
stmt := `INSERT INTO commit_verification_records
(message_id, signer_address,
signature_r, signature_s, aggregation_key,
ccv_version, signature, message_ccv_addresses, message_executor_address, message_data)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ccv_version, signature, message_ccv_addresses, message_executor_address, message_data,
source_chain_block_timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (message_id, signer_address, aggregation_key)
DO NOTHING
RETURNING id`
Expand All @@ -125,6 +126,7 @@ func (d *DatabaseStorage) SaveCommitVerification(ctx context.Context, record *mo
params["message_ccv_addresses"],
params["message_executor_address"],
params["message_data"],
params["source_chain_block_timestamp"],
)
if err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -241,6 +243,7 @@ func (d *DatabaseStorage) QueryAggregatedReports(ctx context.Context, sinceSeque
&verRow.MessageExecutorAddress,
&verRow.MessageData,
&verRow.ID,
&verRow.SourceChainBlockTimestamp,
&verRow.CreatedAt,
)
if err != nil {
Expand Down Expand Up @@ -353,6 +356,7 @@ func (d *DatabaseStorage) GetCCVData(ctx context.Context, messageID model.Messag
&verRow.MessageExecutorAddress,
&verRow.MessageData,
&verRow.ID,
&verRow.SourceChainBlockTimestamp,
&verRow.CreatedAt,
)
if err != nil {
Expand Down Expand Up @@ -454,6 +458,7 @@ func (d *DatabaseStorage) GetBatchCCVData(ctx context.Context, messageIDs []mode
&verRow.MessageExecutorAddress,
&verRow.MessageData,
&verRow.ID,
&verRow.SourceChainBlockTimestamp,
&verRow.CreatedAt,
)
if err != nil {
Expand Down
67 changes: 35 additions & 32 deletions aggregator/pkg/storage/postgres/database_storage_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@ import (
)

type commitVerificationRecordRow struct {
ID int64 `db:"id"`
SeqNum int64 `db:"seq_num"`
MessageID string `db:"message_id"`
SignerAddress string `db:"signer_address"`
SignatureR []byte `db:"signature_r"`
SignatureS []byte `db:"signature_s"`
AggregationKey string `db:"aggregation_key"`
CCVVersion []byte `db:"ccv_version"`
Signature []byte `db:"signature"`
MessageCCVAddresses pq.StringArray `db:"message_ccv_addresses"`
MessageExecutorAddress string `db:"message_executor_address"`
MessageData []byte `db:"message_data"`
CreatedAt time.Time `db:"created_at"`
ID int64 `db:"id"`
SeqNum int64 `db:"seq_num"`
MessageID string `db:"message_id"`
SignerAddress string `db:"signer_address"`
SignatureR []byte `db:"signature_r"`
SignatureS []byte `db:"signature_s"`
AggregationKey string `db:"aggregation_key"`
CCVVersion []byte `db:"ccv_version"`
Signature []byte `db:"signature"`
MessageCCVAddresses pq.StringArray `db:"message_ccv_addresses"`
MessageExecutorAddress string `db:"message_executor_address"`
MessageData []byte `db:"message_data"`
SourceChainBlockTimestamp time.Time `db:"source_chain_block_timestamp"`
CreatedAt time.Time `db:"created_at"`
}

func rowToCommitVerificationRecord(row *commitVerificationRecordRow) (*model.CommitVerificationRecord, error) {
Expand Down Expand Up @@ -68,13 +69,14 @@ func rowToCommitVerificationRecord(row *commitVerificationRecordRow) (*model.Com
}

record := &model.CommitVerificationRecord{
MessageID: messageID,
Message: &message,
CCVVersion: row.CCVVersion,
Signature: row.Signature,
MessageCCVAddresses: messageCCVAddresses,
MessageExecutorAddress: messageExecutorAddress,
IdentifierSigner: identifierSigner,
MessageID: messageID,
Message: &message,
CCVVersion: row.CCVVersion,
Signature: row.Signature,
MessageCCVAddresses: messageCCVAddresses,
MessageExecutorAddress: messageExecutorAddress,
SourceChainBlockTimestamp: row.SourceChainBlockTimestamp,
IdentifierSigner: identifierSigner,
}
record.SetTimestampFromMillis(row.CreatedAt.UnixMilli())
return record, nil
Expand Down Expand Up @@ -109,28 +111,29 @@ func recordToInsertParams(record *model.CommitVerificationRecord, aggregationKey
messageExecutorAddressHex := record.MessageExecutorAddress.String()

params := map[string]any{
"message_id": messageIDHex,
"signer_address": signerAddressHex,
"signature_r": record.IdentifierSigner.SignatureR[:],
"signature_s": record.IdentifierSigner.SignatureS[:],
"aggregation_key": aggregationKey,
"ccv_version": record.CCVVersion,
"signature": record.Signature,
"message_ccv_addresses": pq.Array(messageCCVAddressesHex),
"message_executor_address": messageExecutorAddressHex,
"message_data": messageDataJSON,
"message_id": messageIDHex,
"signer_address": signerAddressHex,
"signature_r": record.IdentifierSigner.SignatureR[:],
"signature_s": record.IdentifierSigner.SignatureS[:],
"aggregation_key": aggregationKey,
"ccv_version": record.CCVVersion,
"signature": record.Signature,
"message_ccv_addresses": pq.Array(messageCCVAddressesHex),
"message_executor_address": messageExecutorAddressHex,
"message_data": messageDataJSON,
"source_chain_block_timestamp": record.SourceChainBlockTimestamp,
}

return params, nil
}

const allVerificationRecordColumns = `message_id, signer_address,
signature_r, signature_s, aggregation_key,
ccv_version, signature, message_ccv_addresses, message_executor_address, message_data, id, created_at`
ccv_version, signature, message_ccv_addresses, message_executor_address, message_data, id, source_chain_block_timestamp, created_at`

const allVerificationRecordColumnsQualified = `cvr.message_id, cvr.signer_address,
cvr.signature_r, cvr.signature_s, cvr.aggregation_key,
cvr.ccv_version, cvr.signature, cvr.message_ccv_addresses, cvr.message_executor_address, cvr.message_data, cvr.id, cvr.created_at`
cvr.ccv_version, cvr.signature, cvr.message_ccv_addresses, cvr.message_executor_address, cvr.message_data, cvr.id, cvr.source_chain_block_timestamp, cvr.created_at`

func mustParseUint64(s string) uint64 {
var result uint64
Expand Down
30 changes: 16 additions & 14 deletions build/devenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
DefaultAnvilKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
DefaultLokiURL = "http://localhost:3030/loki/api/v1/push"
DefaultTempoURL = "http://localhost:4318/v1/traces"
DefaultPromURL = "http://localhost:9099"
)

var L = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}).Level(zerolog.InfoLevel)
Expand Down Expand Up @@ -101,23 +102,24 @@ func LoadOutput[T any](outputPath string) (*T, error) {
}

// Load addresses into the datastore so that tests can query them appropriately.
if c, ok := any(config).(*Cfg); ok {
if len(c.CLDF.Addresses) > 0 {
ds := datastore.NewMemoryDataStore()
for _, addrRefJSON := range c.CLDF.Addresses {
var addrs []datastore.AddressRef
if err := json.Unmarshal([]byte(addrRefJSON), &addrs); err != nil {
return nil, fmt.Errorf("failed to unmarshal addresses from config: %w", err)
}
for _, addr := range addrs {
if err := ds.Addresses().Add(addr); err != nil {
return nil, fmt.Errorf("failed to set address in datastore: %w", err)
}
}
c, ok := any(config).(*Cfg)
if !ok || len(c.CLDF.Addresses) == 0 {
return config, nil
}

ds := datastore.NewMemoryDataStore()
for _, addrRefJSON := range c.CLDF.Addresses {
var addrs []datastore.AddressRef
if err := json.Unmarshal([]byte(addrRefJSON), &addrs); err != nil {
return nil, fmt.Errorf("failed to unmarshal addresses from config: %w", err)
}
for _, addr := range addrs {
if err := ds.Addresses().Add(addr); err != nil {
return nil, fmt.Errorf("failed to set address in datastore: %w", err)
}
c.CLDF.DataStore = ds.Seal()
}
}
c.CLDF.DataStore = ds.Seal()

return config, nil
}
Expand Down
4 changes: 2 additions & 2 deletions build/devenv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ require (
github.com/go-resty/resty/v2 v2.16.5
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/prometheus/common v1.20.99
github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm v0.0.0-20251127040717-30244f57ea7a
github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/deployment v0.0.0-20251127040717-30244f57ea7a
github.com/smartcontractkit/chainlink-ccv v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a
github.com/smartcontractkit/chainlink-testing-framework/wasp v1.51.1
google.golang.org/grpc v1.76.0
)
Expand Down Expand Up @@ -302,7 +303,6 @@ require (
github.com/pressly/goose/v3 v3.26.0 // indirect
github.com/prometheus/alertmanager v0.28.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v1.20.99 // indirect
github.com/prometheus/exporter-toolkit v0.13.2 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/prometheus/prometheus v0.302.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions build/devenv/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1071,8 +1071,8 @@ github.com/smartcontractkit/chainlink-deployments-framework v0.66.0 h1:tJvjPiQsC
github.com/smartcontractkit/chainlink-deployments-framework v0.66.0/go.mod h1:8EXTqTr/5T5WLZpWg6npDvmcR3/wLl1A8eculNCn5GA=
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3 h1:mJP6yJq2woOZchX0KvhLiKxDPaS0Vy4vTDFH4nnFkXs=
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3/go.mod h1:3Lsp38qxen9PABVF+O5eocveQev+hyo9HLAgRodBD4Q=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb h1:vGLgImXYmzK8eow7kShHGZO948818NAI3FPihEH1v7c=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251126123859-d079d6815edb/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a h1:vGPF2Tlg1SBcCBPUP51m/PTO9IokKBuSaTjRV3CKyQ0=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251127174315-8ee658a4af6a/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q=
github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0 h1:xHPmFDhff7QpeFxKsZfk+24j4AlnQiFjjRh5O87Peu4=
Expand Down
Loading
Loading