Skip to content

Commit b9a3c0e

Browse files
Convert consensus+decision dispatchers into block+tx+vertex dispatchers (ava-labs#2745)
1 parent 401073a commit b9a3c0e

File tree

13 files changed

+214
-139
lines changed

13 files changed

+214
-139
lines changed

chains/manager.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,9 @@ type ManagerConfig struct {
164164
Log logging.Logger
165165
LogFactory logging.Factory
166166
VMManager vms.Manager // Manage mappings from vm ID --> vm
167-
DecisionAcceptorGroup snow.AcceptorGroup
168-
ConsensusAcceptorGroup snow.AcceptorGroup
167+
BlockAcceptorGroup snow.AcceptorGroup
168+
TxAcceptorGroup snow.AcceptorGroup
169+
VertexAcceptorGroup snow.AcceptorGroup
169170
DBManager dbManager.Manager
170171
MsgCreator message.OutboundMsgBuilder // message creator, shared with network
171172
Router router.Router // Routes incoming messages to the appropriate chain
@@ -466,8 +467,9 @@ func (m *manager) buildChain(chainParams ChainParameters, sb subnets.Subnet) (*c
466467
ValidatorState: m.validatorState,
467468
ChainDataDir: chainDataDir,
468469
},
469-
DecisionAcceptor: m.DecisionAcceptorGroup,
470-
ConsensusAcceptor: m.ConsensusAcceptorGroup,
470+
BlockAcceptor: m.BlockAcceptorGroup,
471+
TxAcceptor: m.TxAcceptorGroup,
472+
VertexAcceptor: m.VertexAcceptorGroup,
471473
Registerer: consensusMetrics,
472474
AvalancheRegisterer: avalancheConsensusMetrics,
473475
}
@@ -631,7 +633,7 @@ func (m *manager) createAvalancheChain(
631633
messageSender = sender.Trace(messageSender, m.Tracer)
632634
}
633635

634-
if err := m.ConsensusAcceptorGroup.RegisterAcceptor(ctx.ChainID, "gossip", messageSender, false); err != nil { // Set up the event dipatcher
636+
if err := m.VertexAcceptorGroup.RegisterAcceptor(ctx.ChainID, "gossip", messageSender, false); err != nil { // Set up the event dispatcher
635637
return nil, fmt.Errorf("problem initializing event dispatcher: %w", err)
636638
}
637639

@@ -847,7 +849,7 @@ func (m *manager) createSnowmanChain(
847849
messageSender = sender.Trace(messageSender, m.Tracer)
848850
}
849851

850-
if err := m.ConsensusAcceptorGroup.RegisterAcceptor(ctx.ChainID, "gossip", messageSender, false); err != nil { // Set up the event dipatcher
852+
if err := m.BlockAcceptorGroup.RegisterAcceptor(ctx.ChainID, "gossip", messageSender, false); err != nil { // Set up the event dispatcher
851853
return nil, fmt.Errorf("problem initializing event dispatcher: %w", err)
852854
}
853855

indexer/indexer.go

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ var (
5757

5858
// Config for an indexer
5959
type Config struct {
60-
DB database.Database
61-
Log logging.Logger
62-
IndexingEnabled bool
63-
AllowIncompleteIndex bool
64-
DecisionAcceptorGroup snow.AcceptorGroup
65-
ConsensusAcceptorGroup snow.AcceptorGroup
66-
APIServer server.PathAdder
67-
ShutdownF func()
60+
DB database.Database
61+
Log logging.Logger
62+
IndexingEnabled bool
63+
AllowIncompleteIndex bool
64+
BlockAcceptorGroup snow.AcceptorGroup
65+
TxAcceptorGroup snow.AcceptorGroup
66+
VertexAcceptorGroup snow.AcceptorGroup
67+
APIServer server.PathAdder
68+
ShutdownF func()
6869
}
6970

7071
// Indexer causes accepted containers for a given chain
@@ -80,18 +81,19 @@ type Indexer interface {
8081
// NewIndexer returns a new Indexer and registers a new endpoint on the given API server.
8182
func NewIndexer(config Config) (Indexer, error) {
8283
indexer := &indexer{
83-
codec: codec.NewManager(codecMaxSize),
84-
log: config.Log,
85-
db: config.DB,
86-
allowIncompleteIndex: config.AllowIncompleteIndex,
87-
indexingEnabled: config.IndexingEnabled,
88-
decisionAcceptorGroup: config.DecisionAcceptorGroup,
89-
consensusAcceptorGroup: config.ConsensusAcceptorGroup,
90-
txIndices: map[ids.ID]Index{},
91-
vtxIndices: map[ids.ID]Index{},
92-
blockIndices: map[ids.ID]Index{},
93-
pathAdder: config.APIServer,
94-
shutdownF: config.ShutdownF,
84+
codec: codec.NewManager(codecMaxSize),
85+
log: config.Log,
86+
db: config.DB,
87+
allowIncompleteIndex: config.AllowIncompleteIndex,
88+
indexingEnabled: config.IndexingEnabled,
89+
blockAcceptorGroup: config.BlockAcceptorGroup,
90+
txAcceptorGroup: config.TxAcceptorGroup,
91+
vertexAcceptorGroup: config.VertexAcceptorGroup,
92+
txIndices: map[ids.ID]Index{},
93+
vtxIndices: map[ids.ID]Index{},
94+
blockIndices: map[ids.ID]Index{},
95+
pathAdder: config.APIServer,
96+
shutdownF: config.ShutdownF,
9597
}
9698

9799
if err := indexer.codec.RegisterCodec(
@@ -139,10 +141,12 @@ type indexer struct {
139141
// Chain ID --> index of txs of that chain (if applicable)
140142
txIndices map[ids.ID]Index
141143

144+
// Notifies of newly accepted blocks
145+
blockAcceptorGroup snow.AcceptorGroup
142146
// Notifies of newly accepted transactions
143-
decisionAcceptorGroup snow.AcceptorGroup
144-
// Notifies of newly accepted blocks and vertices
145-
consensusAcceptorGroup snow.AcceptorGroup
147+
txAcceptorGroup snow.AcceptorGroup
148+
// Notifies of newly accepted vertices
149+
vertexAcceptorGroup snow.AcceptorGroup
146150
}
147151

148152
// Assumes [ctx.Lock] is not held
@@ -262,7 +266,7 @@ func (i *indexer) RegisterChain(chainName string, ctx *snow.ConsensusContext, vm
262266

263267
switch vm.(type) {
264268
case vertex.DAGVM:
265-
vtxIndex, err := i.registerChainHelper(chainID, vtxPrefix, chainName, "vtx", i.consensusAcceptorGroup)
269+
vtxIndex, err := i.registerChainHelper(chainID, vtxPrefix, chainName, "vtx", i.vertexAcceptorGroup)
266270
if err != nil {
267271
i.log.Fatal("couldn't create index",
268272
zap.String("chainName", chainName),
@@ -278,7 +282,7 @@ func (i *indexer) RegisterChain(chainName string, ctx *snow.ConsensusContext, vm
278282
}
279283
i.vtxIndices[chainID] = vtxIndex
280284

281-
txIndex, err := i.registerChainHelper(chainID, txPrefix, chainName, "tx", i.decisionAcceptorGroup)
285+
txIndex, err := i.registerChainHelper(chainID, txPrefix, chainName, "tx", i.txAcceptorGroup)
282286
if err != nil {
283287
i.log.Fatal("couldn't create index",
284288
zap.String("chainName", chainName),
@@ -294,7 +298,7 @@ func (i *indexer) RegisterChain(chainName string, ctx *snow.ConsensusContext, vm
294298
}
295299
i.txIndices[chainID] = txIndex
296300
case block.ChainVM:
297-
index, err := i.registerChainHelper(chainID, blockPrefix, chainName, "block", i.consensusAcceptorGroup)
301+
index, err := i.registerChainHelper(chainID, blockPrefix, chainName, "block", i.blockAcceptorGroup)
298302
if err != nil {
299303
i.log.Fatal("failed to create index",
300304
zap.String("chainName", chainName),
@@ -382,19 +386,19 @@ func (i *indexer) close() error {
382386
for chainID, txIndex := range i.txIndices {
383387
errs.Add(
384388
txIndex.Close(),
385-
i.decisionAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
389+
i.txAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
386390
)
387391
}
388392
for chainID, vtxIndex := range i.vtxIndices {
389393
errs.Add(
390394
vtxIndex.Close(),
391-
i.consensusAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
395+
i.vertexAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
392396
)
393397
}
394398
for chainID, blockIndex := range i.blockIndices {
395399
errs.Add(
396400
blockIndex.Close(),
397-
i.consensusAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
401+
i.blockAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
398402
)
399403
}
400404
errs.Add(i.db.Close())

indexer/indexer_test.go

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,15 @@ func (*apiServerMock) AddAliases(string, ...string) error {
5454
func TestNewIndexer(t *testing.T) {
5555
require := require.New(t)
5656
config := Config{
57-
IndexingEnabled: true,
58-
AllowIncompleteIndex: true,
59-
Log: logging.NoLog{},
60-
DB: memdb.New(),
61-
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
62-
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
63-
APIServer: &apiServerMock{},
64-
ShutdownF: func() {},
57+
IndexingEnabled: true,
58+
AllowIncompleteIndex: true,
59+
Log: logging.NoLog{},
60+
DB: memdb.New(),
61+
BlockAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
62+
TxAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
63+
VertexAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
64+
APIServer: &apiServerMock{},
65+
ShutdownF: func() {},
6566
}
6667

6768
idxrIntf, err := NewIndexer(config)
@@ -81,8 +82,9 @@ func TestNewIndexer(t *testing.T) {
8182
require.Len(idxr.txIndices, 0)
8283
require.NotNil(idxr.vtxIndices)
8384
require.Len(idxr.vtxIndices, 0)
84-
require.NotNil(idxr.consensusAcceptorGroup)
85-
require.NotNil(idxr.decisionAcceptorGroup)
85+
require.NotNil(idxr.blockAcceptorGroup)
86+
require.NotNil(idxr.txAcceptorGroup)
87+
require.NotNil(idxr.vertexAcceptorGroup)
8688
require.NotNil(idxr.shutdownF)
8789
require.False(idxr.hasRunBefore)
8890
}
@@ -95,13 +97,14 @@ func TestMarkHasRunAndShutdown(t *testing.T) {
9597
shutdown := &sync.WaitGroup{}
9698
shutdown.Add(1)
9799
config := Config{
98-
IndexingEnabled: true,
99-
Log: logging.NoLog{},
100-
DB: db,
101-
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
102-
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
103-
APIServer: &apiServerMock{},
104-
ShutdownF: shutdown.Done,
100+
IndexingEnabled: true,
101+
Log: logging.NoLog{},
102+
DB: db,
103+
BlockAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
104+
TxAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
105+
VertexAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
106+
APIServer: &apiServerMock{},
107+
ShutdownF: shutdown.Done,
105108
}
106109

107110
idxrIntf, err := NewIndexer(config)
@@ -132,14 +135,15 @@ func TestIndexer(t *testing.T) {
132135
baseDB := memdb.New()
133136
db := versiondb.New(baseDB)
134137
config := Config{
135-
IndexingEnabled: true,
136-
AllowIncompleteIndex: false,
137-
Log: logging.NoLog{},
138-
DB: db,
139-
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
140-
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
141-
APIServer: &apiServerMock{},
142-
ShutdownF: func() {},
138+
IndexingEnabled: true,
139+
AllowIncompleteIndex: false,
140+
Log: logging.NoLog{},
141+
DB: db,
142+
BlockAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
143+
TxAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
144+
VertexAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
145+
APIServer: &apiServerMock{},
146+
ShutdownF: func() {},
143147
}
144148

145149
// Create indexer
@@ -185,7 +189,7 @@ func TestIndexer(t *testing.T) {
185189
Timestamp: now.UnixNano(),
186190
}
187191

188-
require.NoError(config.ConsensusAcceptorGroup.Accept(chain1Ctx, blkID, blkBytes))
192+
require.NoError(config.BlockAcceptorGroup.Accept(chain1Ctx, blkID, blkBytes))
189193

190194
blkIdx := idxr.blockIndices[chain1Ctx.ChainID]
191195
require.NotNil(blkIdx)
@@ -283,7 +287,7 @@ func TestIndexer(t *testing.T) {
283287
Timestamp: now.UnixNano(),
284288
}
285289

286-
require.NoError(config.ConsensusAcceptorGroup.Accept(chain2Ctx, vtxID, vtxBytes))
290+
require.NoError(config.VertexAcceptorGroup.Accept(chain2Ctx, vtxID, vtxBytes))
287291

288292
vtxIdx := idxr.vtxIndices[chain2Ctx.ChainID]
289293
require.NotNil(vtxIdx)
@@ -332,7 +336,7 @@ func TestIndexer(t *testing.T) {
332336
}, nil,
333337
).AnyTimes()
334338

335-
require.NoError(config.DecisionAcceptorGroup.Accept(chain2Ctx, txID, blkBytes))
339+
require.NoError(config.TxAcceptorGroup.Accept(chain2Ctx, txID, blkBytes))
336340

337341
txIdx := idxr.txIndices[chain2Ctx.ChainID]
338342
require.NotNil(txIdx)
@@ -409,14 +413,15 @@ func TestIncompleteIndex(t *testing.T) {
409413

410414
baseDB := memdb.New()
411415
config := Config{
412-
IndexingEnabled: false,
413-
AllowIncompleteIndex: false,
414-
Log: logging.NoLog{},
415-
DB: versiondb.New(baseDB),
416-
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
417-
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
418-
APIServer: &apiServerMock{},
419-
ShutdownF: func() {},
416+
IndexingEnabled: false,
417+
AllowIncompleteIndex: false,
418+
Log: logging.NoLog{},
419+
DB: versiondb.New(baseDB),
420+
BlockAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
421+
TxAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
422+
VertexAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
423+
APIServer: &apiServerMock{},
424+
ShutdownF: func() {},
420425
}
421426
idxrIntf, err := NewIndexer(config)
422427
require.NoError(err)
@@ -492,14 +497,15 @@ func TestIgnoreNonDefaultChains(t *testing.T) {
492497
baseDB := memdb.New()
493498
db := versiondb.New(baseDB)
494499
config := Config{
495-
IndexingEnabled: true,
496-
AllowIncompleteIndex: false,
497-
Log: logging.NoLog{},
498-
DB: db,
499-
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
500-
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
501-
APIServer: &apiServerMock{},
502-
ShutdownF: func() {},
500+
IndexingEnabled: true,
501+
AllowIncompleteIndex: false,
502+
Log: logging.NoLog{},
503+
DB: db,
504+
BlockAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
505+
TxAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
506+
VertexAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
507+
APIServer: &apiServerMock{},
508+
ShutdownF: func() {},
503509
}
504510

505511
// Create indexer

ipcs/chainipc.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,33 @@ type context struct {
3535
// ChainIPCs maintains IPCs for a set of chains
3636
type ChainIPCs struct {
3737
context
38-
chains map[ids.ID]*EventSockets
39-
consensusAcceptorGroup snow.AcceptorGroup
40-
decisionAcceptorGroup snow.AcceptorGroup
38+
chains map[ids.ID]*EventSockets
39+
blockAcceptorGroup snow.AcceptorGroup
40+
txAcceptorGroup snow.AcceptorGroup
41+
vertexAcceptorGroup snow.AcceptorGroup
4142
}
4243

4344
// NewChainIPCs creates a new *ChainIPCs that writes consensus and decision
4445
// events to IPC sockets
45-
func NewChainIPCs(log logging.Logger, path string, networkID uint32, consensusAcceptorGroup, decisionAcceptorGroup snow.AcceptorGroup, defaultChainIDs []ids.ID) (*ChainIPCs, error) {
46+
func NewChainIPCs(
47+
log logging.Logger,
48+
path string,
49+
networkID uint32,
50+
blockAcceptorGroup snow.AcceptorGroup,
51+
txAcceptorGroup snow.AcceptorGroup,
52+
vertexAcceptorGroup snow.AcceptorGroup,
53+
defaultChainIDs []ids.ID,
54+
) (*ChainIPCs, error) {
4655
cipcs := &ChainIPCs{
4756
context: context{
4857
log: log,
4958
networkID: networkID,
5059
path: path,
5160
},
52-
chains: make(map[ids.ID]*EventSockets),
53-
consensusAcceptorGroup: consensusAcceptorGroup,
54-
decisionAcceptorGroup: decisionAcceptorGroup,
61+
chains: make(map[ids.ID]*EventSockets),
62+
blockAcceptorGroup: blockAcceptorGroup,
63+
txAcceptorGroup: txAcceptorGroup,
64+
vertexAcceptorGroup: vertexAcceptorGroup,
5565
}
5666
for _, chainID := range defaultChainIDs {
5767
if _, err := cipcs.Publish(chainID); err != nil {
@@ -70,7 +80,13 @@ func (cipcs *ChainIPCs) Publish(chainID ids.ID) (*EventSockets, error) {
7080
return es, nil
7181
}
7282

73-
es, err := newEventSockets(cipcs.context, chainID, cipcs.consensusAcceptorGroup, cipcs.decisionAcceptorGroup)
83+
es, err := newEventSockets(
84+
cipcs.context,
85+
chainID,
86+
cipcs.blockAcceptorGroup,
87+
cipcs.txAcceptorGroup,
88+
cipcs.vertexAcceptorGroup,
89+
)
7490
if err != nil {
7591
cipcs.log.Error("can't create ipcs",
7692
zap.Error(err),

0 commit comments

Comments
 (0)