Skip to content

Commit

Permalink
Commit plugin improvements (#297)
Browse files Browse the repository at this point in the history
* unexport types and delete redundant oracleID param

* named logger for ccipChainSupport

* smarter rmn parameters computation
  • Loading branch information
dimkouv authored Nov 5, 2024
1 parent fa3a001 commit 760e17a
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 28 deletions.
1 change: 0 additions & 1 deletion commit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func (p *PluginFactory) NewReportingPlugin(ctx context.Context, config ocr3types

return NewPlugin(
p.donID,
config.OracleID,
oracleIDToP2PID,
offchainConfig,
p.ocrConfig.Config.ChainSelector,
Expand Down
5 changes: 5 additions & 0 deletions commit/merkleroot/rmn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func NewController(
observationsInitialRequestTimerDuration time.Duration,
reportsInitialRequestTimerDuration time.Duration,
) Controller {

lggr.Infow("creating new RMN controller",
"observationsInitialRequestTimerDuration", observationsInitialRequestTimerDuration,
"reportsInitialRequestTimerDuration", reportsInitialRequestTimerDuration)

return &controller{
lggr: lggr,
rmnCrypto: rmnCrypto,
Expand Down
76 changes: 58 additions & 18 deletions commit/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

type MerkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation]
type TokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation]
type ChainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation]
type merkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation]
type tokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation]
type chainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation]

type Plugin struct {
donID plugintypes.DonID
Expand All @@ -57,7 +57,6 @@ type Plugin struct {

func NewPlugin(
donID plugintypes.DonID,
oracleID commontypes.OracleID,
oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID,
offchainCfg pluginconfig.CommitOffchainConfig,
destChain cciptypes.ChainSelector,
Expand All @@ -83,10 +82,10 @@ func NewPlugin(
}

chainSupport := plugincommon.NewCCIPChainSupport(
lggr,
logger.Named(lggr, "CCIPChainSupport"),
homeChain,
oracleIDToP2pID,
oracleID,
reportingCfg.OracleID,
destChain,
)

Expand All @@ -96,12 +95,12 @@ func NewPlugin(
offchainCfg.SignObservationPrefix,
rmnPeerClient,
rmnHomeReader,
2*time.Second, /* observationsInitialRequestTimerDuration */
2*time.Second, /* observationsRequestTimerDuration */
observationsInitialRequestTimerDuration(reportingCfg.MaxDurationQuery),
reportsInitialRequestTimerDuration(reportingCfg.MaxDurationQuery),
)

merkleRootProcessor := merkleroot.NewProcessor(
oracleID,
reportingCfg.OracleID,
oracleIDToP2pID,
logger.Named(lggr, "MerkleRootProcessor"),
offchainCfg,
Expand All @@ -117,7 +116,7 @@ func NewPlugin(
)

tokenPriceProcessor := tokenprice.NewProcessor(
oracleID,
reportingCfg.OracleID,
lggr,
offchainCfg,
destChain,
Expand All @@ -138,7 +137,7 @@ func NewPlugin(

chainFeeProcessr := chainfee.NewProcessor(
lggr,
oracleID,
reportingCfg.OracleID,
destChain,
homeChain,
ccipReader,
Expand All @@ -149,7 +148,7 @@ func NewPlugin(

return &Plugin{
donID: donID,
oracleID: oracleID,
oracleID: reportingCfg.OracleID,
oracleIDToP2PID: oracleIDToP2pID,
lggr: lggr,
offchainCfg: offchainCfg,
Expand Down Expand Up @@ -291,9 +290,9 @@ func (p *Plugin) Outcome(
return nil, fmt.Errorf("decode query: %w", err)
}

var merkleObservations []MerkleRootObservation
var tokensObservations []TokenPricesObservation
var feeObservations []ChainFeeObservation
var merkleObservations []merkleRootObservation
var tokensObservations []tokenPricesObservation
var feeObservations []chainFeeObservation
var discoveryObservations []plugincommon.AttributedObservation[dt.Observation]

for _, ao := range aos {
Expand All @@ -304,21 +303,21 @@ func (p *Plugin) Outcome(
}
p.lggr.Debugw("Commit plugin outcome decoded observation", "observation", obs)
merkleObservations = append(merkleObservations,
MerkleRootObservation{
merkleRootObservation{
OracleID: ao.Observer,
Observation: obs.MerkleRootObs,
},
)

tokensObservations = append(tokensObservations,
TokenPricesObservation{
tokenPricesObservation{
OracleID: ao.Observer,
Observation: obs.TokenPriceObs,
},
)

feeObservations = append(feeObservations,
ChainFeeObservation{
chainFeeObservation{
OracleID: ao.Observer,
Observation: obs.ChainFeeObs,
},
Expand Down Expand Up @@ -400,5 +399,46 @@ func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) Outcome {
return decodedOutcome
}

// Assuming that we have to delegate a specific amount of time to the observation requests and the report requests.
// We define some percentages in order to help us calculate the time we have to delegate to each request timer.
const (
observationDurationPercentage = 0.55
reportDurationPercentage = 0.4
// remaining 5% for other query processing

maxAllowedObservationTimeout = 3 * time.Second
maxAllowedReportTimeout = 2 * time.Second
)

func observationsInitialRequestTimerDuration(maxQueryDuration time.Duration) time.Duration {
// we have queryCapacityForObservations to make the initial observation request and potentially a secondary request
queryCapacityForObservations := time.Duration(observationDurationPercentage * float64(maxQueryDuration))

// we divide in two parts one for the initial observation and one for the retry
queryCapacityForInitialObservations := queryCapacityForObservations / 2

// if the capacity is greater than the maximum allowed we return the max allowed
if queryCapacityForInitialObservations < maxAllowedObservationTimeout {
return queryCapacityForObservations
}

return maxAllowedObservationTimeout
}

func reportsInitialRequestTimerDuration(maxQueryDuration time.Duration) time.Duration {
// we have queryCapacityForReports to make the initial reports request and potentially a secondary request
queryCapacityForReports := time.Duration(reportDurationPercentage * float64(maxQueryDuration))

// we divide in two parts one for the initial signatures request and one for the retry
queryCapacityForInitialObservations := queryCapacityForReports / 2

// if the capacity is greater than the maximum allowed we return the max allowed
if queryCapacityForInitialObservations < maxAllowedReportTimeout {
return queryCapacityForInitialObservations
}

return maxAllowedReportTimeout
}

// Interface compatibility checks.
var _ ocr3types.ReportingPlugin[[]byte] = &Plugin{}
17 changes: 11 additions & 6 deletions commit/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,10 @@ func TestPlugin_E2E_AllNodesAgree_MerkleRoots(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
var reportCodec ccipocr3.CommitPluginCodec
for i := range oracleIDs {
params.enableDiscovery = tc.enableDiscovery
n := setupNode(params, oracleIDs[i])
paramsCp := params
paramsCp.enableDiscovery = tc.enableDiscovery
paramsCp.reportingCfg.OracleID = oracleIDs[i]
n := setupNode(paramsCp)
nodes[i] = n.node
if i == 0 {
reportCodec = n.reportCodec
Expand Down Expand Up @@ -367,7 +369,9 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
var reportCodec ccipocr3.CommitPluginCodec
for i := range oracleIDs {
n := setupNode(params, oracleIDs[i])
paramsCp := params
paramsCp.reportingCfg.OracleID = oracleIDs[i]
n := setupNode(paramsCp)
nodes[i] = n.node
if i == 0 {
reportCodec = n.reportCodec
Expand Down Expand Up @@ -592,7 +596,9 @@ func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
for i := range oracleIDs {
n := setupNode(params, oracleIDs[i])
paramsCp := params
paramsCp.reportingCfg.OracleID = oracleIDs[i]
n := setupNode(paramsCp)
nodes[i] = n.node

prepareCcipReaderMock(params.ctx, n.ccipReader, true, false, false)
Expand Down Expand Up @@ -699,7 +705,7 @@ type SetupNodeParams struct {
}

//nolint:gocyclo // todo
func setupNode(params SetupNodeParams, nodeID commontypes.OracleID) nodeSetup {
func setupNode(params SetupNodeParams) nodeSetup {
ccipReader := readerpkg_mock.NewMockCCIPReader(params.t)
tokenPricesReader := readerpkg_mock.NewMockPriceReader(params.t)
reportCodec := mocks.NewCommitPluginJSONReportCodec()
Expand Down Expand Up @@ -795,7 +801,6 @@ func setupNode(params SetupNodeParams, nodeID commontypes.OracleID) nodeSetup {

p := NewPlugin(
params.donID,
nodeID,
params.oracleIDToP2pID,
params.offchainCfg,
destChain,
Expand Down
46 changes: 46 additions & 0 deletions commit/plugin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package commit

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_queryPhaseRmnRelatedTimers(t *testing.T) {
testCases := []struct {
maxQueryDuration time.Duration
expInitialObservationTimer time.Duration
expInitialReportTimer time.Duration
}{
{
maxQueryDuration: 1 * time.Second,
expInitialObservationTimer: 550 * time.Millisecond,
expInitialReportTimer: 200 * time.Millisecond,
},
{
maxQueryDuration: 3 * time.Second,
expInitialObservationTimer: 1650 * time.Millisecond,
expInitialReportTimer: 600 * time.Millisecond,
},
{
maxQueryDuration: 5 * time.Second,
expInitialObservationTimer: 2750 * time.Millisecond,
expInitialReportTimer: 1000 * time.Millisecond,
},
{
maxQueryDuration: 15 * time.Second,
expInitialObservationTimer: 3 * time.Second,
expInitialReportTimer: 2 * time.Second,
},
}

for _, tc := range testCases {
t.Run(tc.maxQueryDuration.String(), func(t *testing.T) {
obsTimer := observationsInitialRequestTimerDuration(tc.maxQueryDuration).Round(time.Millisecond)
sigTimer := reportsInitialRequestTimerDuration(tc.maxQueryDuration).Round(time.Millisecond)
assert.Equal(t, tc.expInitialObservationTimer, obsTimer)
assert.Equal(t, tc.expInitialReportTimer, sigTimer)
})
}
}
6 changes: 3 additions & 3 deletions commit/validate_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (p *Plugin) ValidateObservation(
return fmt.Errorf("failed to validate FChain: %w", err)
}

merkleObs := MerkleRootObservation{
merkleObs := merkleRootObservation{
OracleID: ao.Observer,
Observation: obs.MerkleRootObs,
}
Expand All @@ -42,7 +42,7 @@ func (p *Plugin) ValidateObservation(
return fmt.Errorf("validate merkle roots observation: %w", err)
}

tokenObs := TokenPricesObservation{
tokenObs := tokenPricesObservation{
OracleID: ao.Observer,
Observation: obs.TokenPriceObs,
}
Expand All @@ -51,7 +51,7 @@ func (p *Plugin) ValidateObservation(
return fmt.Errorf("validate token prices observation: %w", err)
}

gasObs := ChainFeeObservation{
gasObs := chainFeeObservation{
OracleID: ao.Observer,
Observation: obs.ChainFeeObs,
}
Expand Down

0 comments on commit 760e17a

Please sign in to comment.