Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func (a *AggSender) Info() types.AggsenderInfo {
return res
}

func (a *AggSender) ForceTriggerCertitificate() {
a.certificateSendTrigger.ForceTriggerEvent()
}

// GetRPCServices returns the list of services that the RPC provider exposes
func (a *AggSender) GetRPCServices() []jRPC.Service {
if !a.cfg.EnableRPC {
Expand Down
27 changes: 26 additions & 1 deletion aggsender/certificate_send_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/agglayer/aggkit/aggsender/types"
aggkitcommon "github.com/agglayer/aggkit/common"
"github.com/agglayer/aggkit/log"
aggkitsync "github.com/agglayer/aggkit/sync"
aggkittypes "github.com/agglayer/aggkit/types"
)

Expand Down Expand Up @@ -139,11 +140,17 @@ func (r *epochBasedTrigger) Setup(ctx context.Context) {
go r.epochNotifier.Start(ctx)
}

// ForceTriggerEvent forces the epoch notifier to publish an epoch event immediately.
func (r *epochBasedTrigger) ForceTriggerEvent() {
r.epochNotifier.ForcePublishEpochEvent()
}

// preconfTrigger handles preconfirmation operations by listening to L2 bridge synchronization
// and maintaining subscription state to the synchronized L2 bridge events.
type preconfTrigger struct {
log aggkitcommon.Logger
l2BridgeSync types.L2BridgeSyncer
ch chan types.CertificateTriggerEvent
}

// newPreconfTrigger creates and initializes a new preconfTrigger instance.
Expand Down Expand Up @@ -181,13 +188,18 @@ func (r *preconfTrigger) Setup(ctx context.Context) {
// notifications. Each value is a sync.Block (which implements CertificateTriggerEvent).
// The returned channel will be closed when the provided context is canceled.
func (r *preconfTrigger) TriggerCh(ctx context.Context) <-chan types.CertificateTriggerEvent {
ch := make(chan types.CertificateTriggerEvent)
syncSub := r.l2BridgeSync.SubscribeToSync("aggsender")

ch := make(chan types.CertificateTriggerEvent)
r.ch = ch

go func() {
for {
select {
case <-ctx.Done():
r.ch = nil
close(ch)

return
case epochEvent := <-syncSub:
ch <- epochEvent
Expand All @@ -197,3 +209,16 @@ func (r *preconfTrigger) TriggerCh(ctx context.Context) <-chan types.Certificate

return ch
}

// ForceTriggerEvent forces the preconf trigger to emit a synchronization event.
func (r *preconfTrigger) ForceTriggerEvent() {
blockNumber, err := r.l2BridgeSync.GetLastProcessedBlock(context.Background())
if err != nil {
r.log.Errorf("ForceTriggerEvent: Failed to get last processed block: %v", err)
return
}
if r.ch == nil {
return
}
r.ch <- aggkitsync.Block{Num: blockNumber}
}
69 changes: 69 additions & 0 deletions aggsender/certificate_send_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/agglayer/aggkit/aggsender/config"
"github.com/agglayer/aggkit/aggsender/mocks"
"github.com/agglayer/aggkit/aggsender/types"
"github.com/agglayer/aggkit/log"
"github.com/agglayer/aggkit/sync"
ethmanmocks "github.com/agglayer/aggkit/types/mocks"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -313,6 +314,7 @@ func TestPreconfRunner_TriggerCh(t *testing.T) {
mockEvent3 := sync.Block{Num: 126, Events: []any{}, Hash: common.HexToHash("0x4")}

// Send multiple events

syncCh <- mockEvent1
syncCh <- mockEvent2
syncCh <- mockEvent3
Expand Down Expand Up @@ -444,3 +446,70 @@ func TestEpochBasedRunner_TriggerCh(t *testing.T) {
require.Equal(t, mockEvent3, receivedEvents[2])
})
}

func TestEpochBasedTriggerForceTriggerEvent(t *testing.T) {
mockBlockNotifier := mocks.NewBlockNotifier(t)
logger := log.WithFields("test", "test")
mockEpochNotifier, err := NewEpochNotifierPerBlock(
mockBlockNotifier,
logger,
ConfigEpochNotifierPerBlock{
StartingEpochBlock: 1000,
NumBlockPerEpoch: 100,
EpochNotificationPercentage: 80.0,
},
nil,
)
require.NoError(t, err)
runner := &epochBasedTrigger{
epochNotifier: mockEpochNotifier,
blockNotifier: mockBlockNotifier,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

triggerCh := runner.TriggerCh(ctx)
mockBlockNotifier.EXPECT().GetCurrentBlockNumber().Return(uint64(1100)).Times(1)
runner.ForceTriggerEvent()

// Verify all events are forwarded
receivedEvents := make([]types.CertificateTriggerEvent, 0, 1)
for i := 0; i < 1; i++ {
select {
case event := <-triggerCh:
receivedEvents = append(receivedEvents, event)
case <-time.After(1 * time.Second):
t.Fatalf("Expected event was not received after 1 sec")
}
}

require.Len(t, receivedEvents, 1)
}

func TestPreconfTriggerForceTriggerEvent(t *testing.T) {
logger := log.WithFields("test", "test")
mockL2BridgeSync := mocks.NewL2BridgeSyncer(t)

// Create a mock subscription channel
syncCh := make(chan sync.Block, 3)
mockL2BridgeSync.EXPECT().SubscribeToSync("aggsender").Return(syncCh)
mockL2BridgeSync.EXPECT().GetLastProcessedBlock(mock.Anything).Return(uint64(12345), nil).Once()
sut := newPreconfTrigger(
logger,
mockL2BridgeSync,
)

ctx, cancel := context.WithTimeout(t.Context(), time.Second)
defer cancel()
triggerCh := sut.TriggerCh(ctx)
go sut.ForceTriggerEvent()

select {
case event := <-triggerCh:
t.Logf("Received event: %+v", event)
break
case <-ctx.Done():
t.Fatalf("Expected event was not received after 1 sec")
}
}
10 changes: 10 additions & 0 deletions aggsender/epoch_notifier_per_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ func (e *EpochNotifierPerBlock) GetEpochStatus() types.EpochStatus {
}
}

func (e *EpochNotifierPerBlock) ForcePublishEpochEvent() {
currentBlock := e.blockNotifier.GetCurrentBlockNumber()
info := e.infoEpoch(currentBlock, e.epochNumber(currentBlock))
event := &types.EpochEvent{
Epoch: e.epochNumber(currentBlock),
ExtraInfo: info,
}
e.Publish(*event)
}

func (e *EpochNotifierPerBlock) startInternal(ctx context.Context, eventNewBlockChannel <-chan types.EventNewBlock) {
status := internalStatus{
lastBlockSeen: e.Config.StartingEpochBlock,
Expand Down
12 changes: 12 additions & 0 deletions aggsender/epoch_notifier_per_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,18 @@ func TestBlockBeforeEpoch(t *testing.T) {
require.Equal(t, uint64(114), newStatus.lastBlockSeen)
}

func TestForcePublishEpochEvent(t *testing.T) {
testData := newNotifierPerBlockTestData(t, nil)
ch := testData.sut.Subscribe("test")
testData.blockNotifierMock.EXPECT().GetCurrentBlockNumber().Return(uint64(145))
testData.blockNotifierMock.EXPECT().Subscribe(mock.Anything).Return(make(chan types.EventNewBlock))
testData.sut.StartAsync(testData.ctx)
testData.sut.ForcePublishEpochEvent()
epochEvent := <-ch
require.Equal(t, uint64(15), epochEvent.Epoch)
testData.ctx.Done()
}

func TestGetEpochStatus(t *testing.T) {
testData := newNotifierPerBlockTestData(t, nil)
testData.blockNotifierMock.EXPECT().GetCurrentBlockNumber().Return(uint64(105))
Expand Down
32 changes: 32 additions & 0 deletions aggsender/mocks/mock_aggsender_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions aggsender/mocks/mock_certificate_send_trigger.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions aggsender/mocks/mock_epoch_notifier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions aggsender/rpc/aggsender_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type AggsenderStorer interface {

type AggsenderInterface interface {
Info() types.AggsenderInfo
ForceTriggerCertitificate()
}

// AggsenderRPC is the RPC interface for the aggsender
Expand Down Expand Up @@ -44,6 +45,14 @@ func (b *AggsenderRPC) Status() (interface{}, rpc.Error) {
return info, nil
}

// TriggerCertitificate forces the publication of an epoch event to trigger certificate creation
// curl -X POST http://localhost:5576/ "Content-Type: application/json" \
// -d '{"method":"aggsender_triggerCertitificate", "params":[], "id":1}'
func (b *AggsenderRPC) TriggerCertitificate() (interface{}, rpc.Error) {
b.aggsender.ForceTriggerCertitificate()
return nil, nil
}

// GetCertificateHeaderPerHeight returns the certificate header for the given height
// if param is `nil` it returns the last sent certificate
// latest:
Expand Down
7 changes: 7 additions & 0 deletions aggsender/rpc/aggsender_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ func TestAggsenderRPCStatus(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)
}
func TestAggsenderRPCTriggerCertitificate(t *testing.T) {
testData := newAggsenderData(t)
testData.mockAggsender.EXPECT().ForceTriggerCertitificate().Return()
res, err := testData.sut.TriggerCertitificate()
require.NoError(t, err)
require.Nil(t, res)
}

func TestAggsenderRPCGetCertificateHeaderPerHeight(t *testing.T) {
testData := newAggsenderData(t)
Expand Down
14 changes: 3 additions & 11 deletions aggsender/types/certificate_build_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,9 @@ func (c *CertificateBuildParams) EstimatedSize() uint {
if c == nil {
return 0
}
sizeBridges := float64(0)
for _, bridge := range c.Bridges {
sizeBridges += agglayertypes.EstimatedBridgeExitSize
sizeBridges += float64(len(bridge.Metadata))
}

sizeClaims := float64(0)
for _, claim := range c.Claims {
sizeClaims += agglayertypes.EstimatedImportedBridgeExitSize
sizeClaims += float64(len(claim.Metadata))
}
// common.HashLength represents the size of a metadata hash in bytes
sizeBridges := (agglayertypes.EstimatedBridgeExitSize + common.HashLength) * float64(len(c.Bridges))
sizeClaims := (agglayertypes.EstimatedImportedBridgeExitSize + common.HashLength) * float64(len(c.Claims))

sizeAggchainData := float64(0)
switch c.CertificateType {
Expand Down
13 changes: 13 additions & 0 deletions aggsender/types/certificate_build_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,16 @@ func TestAdjustToBlock(t *testing.T) {
})
}
}

func TestEstimateSize(t *testing.T) {
sut := &CertificateBuildParams{
FromBlock: 100,
ToBlock: 200,
Bridges: make([]bridgesync.Bridge, 50),
Claims: make([]bridgesync.Claim, 150),
}

estimatedSize := sut.EstimatedSize()

require.Equal(t, uint(0x6bb47), estimatedSize, "Estimated size should match expected size")
}
1 change: 1 addition & 0 deletions aggsender/types/epoch_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ type EpochNotifier interface {
// GetEpochStatus returns the current status of the epoch
GetEpochStatus() EpochStatus
String() string
ForcePublishEpochEvent()
}
Loading
Loading