From 760e17aae91bb624b7c648308116d697063344ff Mon Sep 17 00:00:00 2001 From: dimitris Date: Tue, 5 Nov 2024 14:01:58 +0200 Subject: [PATCH] Commit plugin improvements (#297) * unexport types and delete redundant oracleID param * named logger for ccipChainSupport * smarter rmn parameters computation --- commit/factory.go | 1 - commit/merkleroot/rmn/controller.go | 5 ++ commit/plugin.go | 76 ++++++++++++++++++++++------- commit/plugin_e2e_test.go | 17 ++++--- commit/plugin_test.go | 46 +++++++++++++++++ commit/validate_observation.go | 6 +-- 6 files changed, 123 insertions(+), 28 deletions(-) create mode 100644 commit/plugin_test.go diff --git a/commit/factory.go b/commit/factory.go index 72c61a591..5a7308926 100644 --- a/commit/factory.go +++ b/commit/factory.go @@ -223,7 +223,6 @@ func (p *PluginFactory) NewReportingPlugin(ctx context.Context, config ocr3types return NewPlugin( p.donID, - config.OracleID, oracleIDToP2PID, offchainConfig, p.ocrConfig.Config.ChainSelector, diff --git a/commit/merkleroot/rmn/controller.go b/commit/merkleroot/rmn/controller.go index 53dc4df26..7b0676d5a 100644 --- a/commit/merkleroot/rmn/controller.go +++ b/commit/merkleroot/rmn/controller.go @@ -127,6 +127,11 @@ func NewController( observationsInitialRequestTimerDuration time.Duration, reportsInitialRequestTimerDuration time.Duration, ) Controller { + + lggr.Infow("creating new RMN controller", + "observationsInitialRequestTimerDuration", observationsInitialRequestTimerDuration, + "reportsInitialRequestTimerDuration", reportsInitialRequestTimerDuration) + return &controller{ lggr: lggr, rmnCrypto: rmnCrypto, diff --git a/commit/plugin.go b/commit/plugin.go index c2ca22c55..983d45c13 100644 --- a/commit/plugin.go +++ b/commit/plugin.go @@ -29,9 +29,9 @@ import ( "github.com/smartcontractkit/chainlink-ccip/pluginconfig" ) -type MerkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation] -type TokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation] -type ChainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation] +type merkleRootObservation = plugincommon.AttributedObservation[merkleroot.Observation] +type tokenPricesObservation = plugincommon.AttributedObservation[tokenprice.Observation] +type chainFeeObservation = plugincommon.AttributedObservation[chainfee.Observation] type Plugin struct { donID plugintypes.DonID @@ -57,7 +57,6 @@ type Plugin struct { func NewPlugin( donID plugintypes.DonID, - oracleID commontypes.OracleID, oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID, offchainCfg pluginconfig.CommitOffchainConfig, destChain cciptypes.ChainSelector, @@ -83,10 +82,10 @@ func NewPlugin( } chainSupport := plugincommon.NewCCIPChainSupport( - lggr, + logger.Named(lggr, "CCIPChainSupport"), homeChain, oracleIDToP2pID, - oracleID, + reportingCfg.OracleID, destChain, ) @@ -96,12 +95,12 @@ func NewPlugin( offchainCfg.SignObservationPrefix, rmnPeerClient, rmnHomeReader, - 2*time.Second, /* observationsInitialRequestTimerDuration */ - 2*time.Second, /* observationsRequestTimerDuration */ + observationsInitialRequestTimerDuration(reportingCfg.MaxDurationQuery), + reportsInitialRequestTimerDuration(reportingCfg.MaxDurationQuery), ) merkleRootProcessor := merkleroot.NewProcessor( - oracleID, + reportingCfg.OracleID, oracleIDToP2pID, logger.Named(lggr, "MerkleRootProcessor"), offchainCfg, @@ -117,7 +116,7 @@ func NewPlugin( ) tokenPriceProcessor := tokenprice.NewProcessor( - oracleID, + reportingCfg.OracleID, lggr, offchainCfg, destChain, @@ -138,7 +137,7 @@ func NewPlugin( chainFeeProcessr := chainfee.NewProcessor( lggr, - oracleID, + reportingCfg.OracleID, destChain, homeChain, ccipReader, @@ -149,7 +148,7 @@ func NewPlugin( return &Plugin{ donID: donID, - oracleID: oracleID, + oracleID: reportingCfg.OracleID, oracleIDToP2PID: oracleIDToP2pID, lggr: lggr, offchainCfg: offchainCfg, @@ -291,9 +290,9 @@ func (p *Plugin) Outcome( return nil, fmt.Errorf("decode query: %w", err) } - var merkleObservations []MerkleRootObservation - var tokensObservations []TokenPricesObservation - var feeObservations []ChainFeeObservation + var merkleObservations []merkleRootObservation + var tokensObservations []tokenPricesObservation + var feeObservations []chainFeeObservation var discoveryObservations []plugincommon.AttributedObservation[dt.Observation] for _, ao := range aos { @@ -304,21 +303,21 @@ func (p *Plugin) Outcome( } p.lggr.Debugw("Commit plugin outcome decoded observation", "observation", obs) merkleObservations = append(merkleObservations, - MerkleRootObservation{ + merkleRootObservation{ OracleID: ao.Observer, Observation: obs.MerkleRootObs, }, ) tokensObservations = append(tokensObservations, - TokenPricesObservation{ + tokenPricesObservation{ OracleID: ao.Observer, Observation: obs.TokenPriceObs, }, ) feeObservations = append(feeObservations, - ChainFeeObservation{ + chainFeeObservation{ OracleID: ao.Observer, Observation: obs.ChainFeeObs, }, @@ -400,5 +399,46 @@ func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) Outcome { return decodedOutcome } +// Assuming that we have to delegate a specific amount of time to the observation requests and the report requests. +// We define some percentages in order to help us calculate the time we have to delegate to each request timer. +const ( + observationDurationPercentage = 0.55 + reportDurationPercentage = 0.4 + // remaining 5% for other query processing + + maxAllowedObservationTimeout = 3 * time.Second + maxAllowedReportTimeout = 2 * time.Second +) + +func observationsInitialRequestTimerDuration(maxQueryDuration time.Duration) time.Duration { + // we have queryCapacityForObservations to make the initial observation request and potentially a secondary request + queryCapacityForObservations := time.Duration(observationDurationPercentage * float64(maxQueryDuration)) + + // we divide in two parts one for the initial observation and one for the retry + queryCapacityForInitialObservations := queryCapacityForObservations / 2 + + // if the capacity is greater than the maximum allowed we return the max allowed + if queryCapacityForInitialObservations < maxAllowedObservationTimeout { + return queryCapacityForObservations + } + + return maxAllowedObservationTimeout +} + +func reportsInitialRequestTimerDuration(maxQueryDuration time.Duration) time.Duration { + // we have queryCapacityForReports to make the initial reports request and potentially a secondary request + queryCapacityForReports := time.Duration(reportDurationPercentage * float64(maxQueryDuration)) + + // we divide in two parts one for the initial signatures request and one for the retry + queryCapacityForInitialObservations := queryCapacityForReports / 2 + + // if the capacity is greater than the maximum allowed we return the max allowed + if queryCapacityForInitialObservations < maxAllowedReportTimeout { + return queryCapacityForInitialObservations + } + + return maxAllowedReportTimeout +} + // Interface compatibility checks. var _ ocr3types.ReportingPlugin[[]byte] = &Plugin{} diff --git a/commit/plugin_e2e_test.go b/commit/plugin_e2e_test.go index 6abbec74b..0b57d4490 100644 --- a/commit/plugin_e2e_test.go +++ b/commit/plugin_e2e_test.go @@ -214,8 +214,10 @@ func TestPlugin_E2E_AllNodesAgree_MerkleRoots(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var reportCodec ccipocr3.CommitPluginCodec for i := range oracleIDs { - params.enableDiscovery = tc.enableDiscovery - n := setupNode(params, oracleIDs[i]) + paramsCp := params + paramsCp.enableDiscovery = tc.enableDiscovery + paramsCp.reportingCfg.OracleID = oracleIDs[i] + n := setupNode(paramsCp) nodes[i] = n.node if i == 0 { reportCodec = n.reportCodec @@ -367,7 +369,9 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var reportCodec ccipocr3.CommitPluginCodec for i := range oracleIDs { - n := setupNode(params, oracleIDs[i]) + paramsCp := params + paramsCp.reportingCfg.OracleID = oracleIDs[i] + n := setupNode(paramsCp) nodes[i] = n.node if i == 0 { reportCodec = n.reportCodec @@ -592,7 +596,9 @@ func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { for i := range oracleIDs { - n := setupNode(params, oracleIDs[i]) + paramsCp := params + paramsCp.reportingCfg.OracleID = oracleIDs[i] + n := setupNode(paramsCp) nodes[i] = n.node prepareCcipReaderMock(params.ctx, n.ccipReader, true, false, false) @@ -699,7 +705,7 @@ type SetupNodeParams struct { } //nolint:gocyclo // todo -func setupNode(params SetupNodeParams, nodeID commontypes.OracleID) nodeSetup { +func setupNode(params SetupNodeParams) nodeSetup { ccipReader := readerpkg_mock.NewMockCCIPReader(params.t) tokenPricesReader := readerpkg_mock.NewMockPriceReader(params.t) reportCodec := mocks.NewCommitPluginJSONReportCodec() @@ -795,7 +801,6 @@ func setupNode(params SetupNodeParams, nodeID commontypes.OracleID) nodeSetup { p := NewPlugin( params.donID, - nodeID, params.oracleIDToP2pID, params.offchainCfg, destChain, diff --git a/commit/plugin_test.go b/commit/plugin_test.go new file mode 100644 index 000000000..ce786532a --- /dev/null +++ b/commit/plugin_test.go @@ -0,0 +1,46 @@ +package commit + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_queryPhaseRmnRelatedTimers(t *testing.T) { + testCases := []struct { + maxQueryDuration time.Duration + expInitialObservationTimer time.Duration + expInitialReportTimer time.Duration + }{ + { + maxQueryDuration: 1 * time.Second, + expInitialObservationTimer: 550 * time.Millisecond, + expInitialReportTimer: 200 * time.Millisecond, + }, + { + maxQueryDuration: 3 * time.Second, + expInitialObservationTimer: 1650 * time.Millisecond, + expInitialReportTimer: 600 * time.Millisecond, + }, + { + maxQueryDuration: 5 * time.Second, + expInitialObservationTimer: 2750 * time.Millisecond, + expInitialReportTimer: 1000 * time.Millisecond, + }, + { + maxQueryDuration: 15 * time.Second, + expInitialObservationTimer: 3 * time.Second, + expInitialReportTimer: 2 * time.Second, + }, + } + + for _, tc := range testCases { + t.Run(tc.maxQueryDuration.String(), func(t *testing.T) { + obsTimer := observationsInitialRequestTimerDuration(tc.maxQueryDuration).Round(time.Millisecond) + sigTimer := reportsInitialRequestTimerDuration(tc.maxQueryDuration).Round(time.Millisecond) + assert.Equal(t, tc.expInitialObservationTimer, obsTimer) + assert.Equal(t, tc.expInitialReportTimer, sigTimer) + }) + } +} diff --git a/commit/validate_observation.go b/commit/validate_observation.go index c46e15a70..a570f0259 100644 --- a/commit/validate_observation.go +++ b/commit/validate_observation.go @@ -32,7 +32,7 @@ func (p *Plugin) ValidateObservation( return fmt.Errorf("failed to validate FChain: %w", err) } - merkleObs := MerkleRootObservation{ + merkleObs := merkleRootObservation{ OracleID: ao.Observer, Observation: obs.MerkleRootObs, } @@ -42,7 +42,7 @@ func (p *Plugin) ValidateObservation( return fmt.Errorf("validate merkle roots observation: %w", err) } - tokenObs := TokenPricesObservation{ + tokenObs := tokenPricesObservation{ OracleID: ao.Observer, Observation: obs.TokenPriceObs, } @@ -51,7 +51,7 @@ func (p *Plugin) ValidateObservation( return fmt.Errorf("validate token prices observation: %w", err) } - gasObs := ChainFeeObservation{ + gasObs := chainFeeObservation{ OracleID: ao.Observer, Observation: obs.ChainFeeObs, }