Skip to content

Commit

Permalink
feat: set sequencer mode by hub and not by a flag (#821)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored May 12, 2024
1 parent 426015b commit 5848812
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 158 deletions.
23 changes: 13 additions & 10 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,21 @@ func NewManager(
}

// Start starts the block manager.
func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
func (m *Manager) Start(ctx context.Context) error {
m.logger.Info("Starting the block manager")

// TODO (#283): set aggregator mode by proposer addr on the hub
if isAggregator {
// make sure local signing key is the registered on the hub
slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, _ := m.ProposerKey.GetPublic().Raw()
if !bytes.Equal(slProposerKey, localProposerKey) {
return fmt.Errorf("proposer key mismatch: settlement proposer key: %s, block manager proposer key: %s", slProposerKey, m.ProposerKey.GetPublic())
}
// Check if proposer key matches to the one in the settlement layer
var isAggregator bool
slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, err := m.ProposerKey.GetPublic().Raw()
if err != nil {
return fmt.Errorf("get local node public key: %w", err)
}
if bytes.Equal(slProposerKey, localProposerKey) {
m.logger.Info("Starting in aggregator mode")
isAggregator = true
} else {
m.logger.Info("Starting in non-aggregator mode")
}

// Check if InitChain flow is needed
Expand All @@ -172,7 +175,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

err := m.syncBlockManager()
err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
// Capture the error returned by manager.Start.
errChan := make(chan error, 1)
go func() {
errChan <- manager.Start(ctx, true)
errChan <- manager.Start(ctx)
err := <-errChan
assert.NoError(t, err)
}()
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestBlockProductionNodeHealth(t *testing.T) {
// Start the manager
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = manager.Start(ctx, true)
err = manager.Start(ctx)
require.NoError(err)
time.Sleep(100 * time.Millisecond)

Expand Down
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type NodeConfig struct {
P2P P2PConfig
RPC RPCConfig
// parameters below are dymint specific and read from config
Aggregator bool `mapstructure:"aggregator"`
BlockManagerConfig `mapstructure:",squash"`
DALayer string `mapstructure:"da_layer"`
DAConfig string `mapstructure:"da_config"`
Expand Down
2 changes: 0 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func TestViperAndCobra(t *testing.T) {
nc := config.DefaultConfig("", "")
config.EnsureRoot(dir, nc)

assert.NoError(cmd.Flags().Set(config.FlagAggregator, "true"))
assert.NoError(cmd.Flags().Set(config.FlagDALayer, "foobar"))
assert.NoError(cmd.Flags().Set(config.FlagDAConfig, `{"json":true}`))
assert.NoError(cmd.Flags().Set(config.FlagBlockTime, "1234s"))
Expand All @@ -35,7 +34,6 @@ func TestViperAndCobra(t *testing.T) {

assert.NoError(nc.GetViperConfig(cmd, dir))

assert.Equal(true, nc.Aggregator)
assert.Equal("foobar", nc.DALayer)
assert.Equal(`{"json":true}`, nc.DAConfig)
assert.Equal(1234*time.Second, nc.BlockTime)
Expand Down
1 change: 0 additions & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func DefaultConfig(home, chainId string) *NodeConfig {
ListenAddress: DefaultListenAddress,
Seeds: "",
},
Aggregator: true,
BlockManagerConfig: BlockManagerConfig{
BlockTime: 200 * time.Millisecond,
EmptyBlocksMaxTime: 3600 * time.Second,
Expand Down
5 changes: 0 additions & 5 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

const (
FlagAggregator = "dymint.aggregator"
FlagDALayer = "dymint.da_layer"
FlagDAConfig = "dymint.da_config"
FlagBlockTime = "dymint.block_time"
Expand Down Expand Up @@ -39,7 +38,6 @@ func AddNodeFlags(cmd *cobra.Command) {

def := DefaultNodeConfig

cmd.Flags().Bool(FlagAggregator, false, "run node in aggregator mode")
cmd.Flags().String(FlagDALayer, def.DALayer, "Data Availability Layer Client name (mock or grpc")
cmd.Flags().String(FlagDAConfig, def.DAConfig, "Data Availability Layer Client config")
cmd.Flags().Duration(FlagBlockTime, def.BlockTime, "block time (for aggregator mode)")
Expand All @@ -61,9 +59,6 @@ func AddNodeFlags(cmd *cobra.Command) {
}

func BindDymintFlags(cmd *cobra.Command, v *viper.Viper) error {
if err := v.BindPFlag("aggregator", cmd.Flags().Lookup(FlagAggregator)); err != nil {
return err
}
if err := v.BindPFlag("da_layer", cmd.Flags().Lookup(FlagDALayer)); err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ const defaultConfigTemplate = `
#######################################################
### Dymint Configuration Options ###
#######################################################
aggregator = "{{ .Aggregator }}"
# block production interval
block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block production interval in case of no transactions ("0s" produces empty blocks)
Expand Down
89 changes: 0 additions & 89 deletions node/integration_test.go

This file was deleted.

17 changes: 8 additions & 9 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ type Node struct {

baseLayerHealth baseLayerHealth

// keep context here only because of API compatibility
// - it's used in `OnStart` (defined in service.Service interface)
Ctx context.Context
// shared context for all dymint components
ctx context.Context
}

// NewNode creates new Dymint node.
Expand Down Expand Up @@ -248,7 +247,7 @@ func NewNode(
TxIndexer: txIndexer,
IndexerService: indexerService,
BlockIndexer: blockIndexer,
Ctx: ctx,
ctx: ctx,
}

node.BaseService = *service.NewBaseService(logger, "Node", node)
Expand Down Expand Up @@ -288,7 +287,7 @@ func (n *Node) initGenesisChunks() error {
// OnStart is a part of Service interface.
func (n *Node) OnStart() error {
n.Logger.Info("starting P2P client")
err := n.P2P.Start(n.Ctx)
err := n.P2P.Start(n.ctx)
if err != nil {
return fmt.Errorf("start P2P client: %w", err)
}
Expand All @@ -313,7 +312,7 @@ func (n *Node) OnStart() error {
n.startEventListener()

// start the block manager
err = n.blockManager.Start(n.Ctx, n.conf.Aggregator)
err = n.blockManager.Start(n.ctx)
if err != nil {
return fmt.Errorf("while starting block manager: %w", err)
}
Expand Down Expand Up @@ -409,8 +408,8 @@ func createAndStartIndexerService(

// All events listeners should be registered here
func (n *Node) startEventListener() {
go uevent.MustSubscribe(n.Ctx, n.PubsubServer, "settlementHealthStatusHandler", settlement.EventQuerySettlementHealthStatus, n.onBaseLayerHealthUpdate, n.Logger)
go uevent.MustSubscribe(n.Ctx, n.PubsubServer, "daHealthStatusHandler", da.EventQueryDAHealthStatus, n.onBaseLayerHealthUpdate, n.Logger)
go uevent.MustSubscribe(n.ctx, n.PubsubServer, "settlementHealthStatusHandler", settlement.EventQuerySettlementHealthStatus, n.onBaseLayerHealthUpdate, n.Logger)
go uevent.MustSubscribe(n.ctx, n.PubsubServer, "daHealthStatusHandler", da.EventQueryDAHealthStatus, n.onBaseLayerHealthUpdate, n.Logger)
}

func (n *Node) onBaseLayerHealthUpdate(event pubsub.Message) {
Expand All @@ -432,7 +431,7 @@ func (n *Node) onBaseLayerHealthUpdate(event pubsub.Message) {
if newStatus != nil {
n.Logger.Error("Node is unhealthy: base layer has problem.", "error", newStatus)
}
uevent.MustPublish(n.Ctx, n.PubsubServer, evt, events.HealthStatusList)
uevent.MustPublish(n.ctx, n.PubsubServer, evt, events.HealthStatusList)
}
}

Expand Down
11 changes: 5 additions & 6 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ func TestMempoolDirectly(t *testing.T) {
rollappID := "rollapp_1234-1"

nodeConfig := config.NodeConfig{
RootDir: "",
DBPath: "",
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
Aggregator: false,
RootDir: "",
DBPath: "",
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
BlockBatchSize: 2,
Expand Down Expand Up @@ -181,7 +180,7 @@ func TestHealthStatusEventHandler(t *testing.T) {
done := make(chan bool, 1)
ready := make(chan bool, 1)
go func() {
HealthSubscription, err := node.PubsubServer.Subscribe(node.Ctx, c.name, events.QueryHealthStatus)
HealthSubscription, err := node.PubsubServer.Subscribe(context.Background(), c.name, events.QueryHealthStatus)
ready <- true
assert.NoError(err)
select {
Expand Down
Loading

0 comments on commit 5848812

Please sign in to comment.