Skip to content

Commit

Permalink
refactor!: move clock events to clock class (#5446)
Browse files Browse the repository at this point in the history
* Generalize clock for network thread

* Move clock events to clock

---------

Co-authored-by: Cayman <caymannava@gmail.com>
  • Loading branch information
dapplion and wemeetagain authored May 1, 2023
1 parent ed047c3 commit 1e4ca97
Show file tree
Hide file tree
Showing 36 changed files with 189 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {ChainForkConfig} from "@lodestar/config";
import {ErrorAborted, Logger} from "@lodestar/utils";
import {IExecutionEngine} from "../../execution/engine/index.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BeaconClock} from "../clock/index.js";
import {IClock} from "../../util/clock.js";
import {BlockProcessOpts} from "../options.js";
import {ExecutePayloadStatus} from "../../execution/engine/interface.js";
import {IEth1ForBlockProduction} from "../../eth1/index.js";
Expand All @@ -30,7 +30,7 @@ import {ImportBlockOpts} from "./types.js";
export type VerifyBlockExecutionPayloadModules = {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
clock: BeaconClock;
clock: IClock;
logger: Logger;
metrics: Metrics | null;
forkChoice: IForkChoice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {ChainForkConfig} from "@lodestar/config";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Slot} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {IClock} from "../../util/clock.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockInput, ImportBlockOpts} from "./types.js";

Expand All @@ -20,7 +20,7 @@ import {BlockInput, ImportBlockOpts} from "./types.js";
* - Not already known
*/
export function verifyBlocksSanityChecks(
chain: {forkChoice: IForkChoice; clock: BeaconClock; config: ChainForkConfig},
chain: {forkChoice: IForkChoice; clock: IClock; config: ChainForkConfig},
blocks: BlockInput[],
opts: ImportBlockOpts
): {relevantBlocks: BlockInput[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
Expand Down
12 changes: 6 additions & 6 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import {wrapError} from "../util/wrapError.js";
import {ckzg} from "../util/kzg.js";
import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder, TransitionConfigurationV1} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {CheckpointStateCache, StateContextCache} from "./stateCache/index.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {BeaconClock, LocalClock} from "./clock/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData} from "./interface.js";
import {IChainOptions} from "./options.js";
Expand Down Expand Up @@ -88,7 +88,7 @@ export class BeaconChain implements IBeaconChain {

readonly bls: IBlsVerifier;
readonly forkChoice: IForkChoice;
readonly clock: BeaconClock;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
Expand Down Expand Up @@ -153,7 +153,7 @@ export class BeaconChain implements IBeaconChain {
logger: Logger;
processShutdownCallback: ProcessShutdownCallback;
/** Used for testing to supply fake clock */
clock?: BeaconClock;
clock?: IClock;
metrics: Metrics | null;
anchorState: BeaconStateAllForks;
eth1: IEth1ForBlockProduction;
Expand Down Expand Up @@ -185,7 +185,7 @@ export class BeaconChain implements IBeaconChain {
? new BlsSingleThreadVerifier({metrics})
: new BlsMultiThreadWorkerPool(opts, {logger, metrics});

if (!clock) clock = new LocalClock({config, emitter, genesisTime: this.genesisTime, signal});
if (!clock) clock = new Clock({config, genesisTime: this.genesisTime, signal});

const preAggregateCutOffTime = (2 / 3) * this.config.SECONDS_PER_SLOT;
this.attestationPool = new AttestationPool(clock, preAggregateCutOffTime, this.opts?.preaggregateSlotDistance);
Expand Down Expand Up @@ -271,8 +271,8 @@ export class BeaconChain implements IBeaconChain {
metrics?.opPool.aggregatedAttestationPoolSize.addCollect(() => this.onScrapeMetrics());

// Event handlers. emitter is created internally and dropped on close(). Not need to .removeListener()
emitter.addListener(ChainEvent.clockSlot, this.onClockSlot.bind(this));
emitter.addListener(ChainEvent.clockEpoch, this.onClockEpoch.bind(this));
clock.addListener(ClockEvent.slot, this.onClockSlot.bind(this));
clock.addListener(ClockEvent.epoch, this.onClockEpoch.bind(this));
emitter.addListener(ChainEvent.forkChoiceFinalized, this.onForkChoiceFinalized.bind(this));
emitter.addListener(ChainEvent.forkChoiceJustified, this.onForkChoiceJustified.bind(this));
}
Expand Down
2 changes: 0 additions & 2 deletions packages/beacon-node/src/chain/clock/index.ts

This file was deleted.

36 changes: 0 additions & 36 deletions packages/beacon-node/src/chain/clock/interface.ts

This file was deleted.

17 changes: 1 addition & 16 deletions packages/beacon-node/src/chain/emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {EventEmitter} from "events";
import StrictEventEmitter from "strict-event-emitter-types";

import {routes} from "@lodestar/api";
import {phase0, Epoch, Slot} from "@lodestar/types";
import {phase0} from "@lodestar/types";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";

Expand All @@ -22,18 +22,6 @@ export const enum ChainEvent {
* This event is guaranteed to be called after _any_ checkpoint is processed, including skip-slot checkpoints, checkpoints that are formed as a result of processing blocks, etc.
*/
checkpoint = "checkpoint",
/**
* This event signals the start of a new slot, and that subsequent calls to `clock.currentSlot` will equal `slot`.
*
* This event is guaranteed to be emitted every `SECONDS_PER_SLOT` seconds.
*/
clockSlot = "clock:slot",
/**
* This event signals the start of a new epoch, and that subsequent calls to `clock.currentEpoch` will return `epoch`.
*
* This event is guaranteed to be emitted every `SECONDS_PER_SLOT * SLOTS_PER_EPOCH` seconds.
*/
clockEpoch = "clock:epoch",
/**
* This event signals that the fork choice store has been updated.
*
Expand All @@ -57,9 +45,6 @@ type ApiEvents = {[K in routes.events.EventType]: (data: routes.events.EventData
export type IChainEvents = ApiEvents & {
[ChainEvent.checkpoint]: (checkpoint: phase0.Checkpoint, state: CachedBeaconStateAllForks) => void;

[ChainEvent.clockSlot]: (slot: Slot) => void;
[ChainEvent.clockEpoch]: (epoch: Epoch) => void;

[ChainEvent.forkChoiceJustified]: (checkpoint: CheckpointWithHex) => void;
[ChainEvent.forkChoiceFinalized]: (checkpoint: CheckpointWithHex) => void;
};
Expand Down
1 change: 0 additions & 1 deletion packages/beacon-node/src/chain/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
export * from "./interface.js";
export * from "./emitter.js";
export * from "./chain.js";
export {BeaconClock} from "./clock/index.js";
export * from "./forkChoice/index.js";
export * from "./initState.js";
export * from "./stateCache/index.js";
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Metrics} from "../metrics/metrics.js";
import {BeaconClock} from "./clock/interface.js";
import {IClock} from "../util/clock.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator, RegenCaller} from "./regen/index.js";
import {StateContextCache, CheckpointStateCache} from "./stateCache/index.js";
Expand Down Expand Up @@ -62,7 +62,7 @@ export interface IBeaconChain {

readonly bls: IBlsVerifier;
readonly forkChoice: IForkChoice;
readonly clock: BeaconClock;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {PointFormat, Signature} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {BitArray, toHexString} from "@chainsafe/ssz";
import {MapDef} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

Expand Down Expand Up @@ -62,7 +62,7 @@ export class AttestationPool {
private lowestPermissibleSlot = 0;

constructor(
private readonly clock: BeaconClock,
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {SYNC_COMMITTEE_SIZE, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params
import {altair, Root, Slot, SubcommitteeIndex} from "@lodestar/types";
import {BitArray, toHexString} from "@chainsafe/ssz";
import {MapDef} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

Expand Down Expand Up @@ -46,7 +46,7 @@ export class SyncCommitteeMessagePool {
private lowestPermissibleSlot = 0;

constructor(
private readonly clock: BeaconClock,
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
) {}
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {routes} from "@lodestar/api";
import {GENESIS_SLOT, ZERO_HASH_HEX} from "../constants/constants.js";
import {Metrics} from "../metrics/index.js";
import {TransitionConfigurationV1} from "../execution/engine/interface.js";
import {ChainEvent} from "./emitter.js";
import {ClockEvent} from "../util/clock.js";
import {prepareExecutionPayload, getPayloadAttributesForSSE} from "./produceBlock/produceBlockBody.js";
import {IBeaconChain} from "./interface.js";
import {RegenCaller} from "./regen/index.js";
Expand Down Expand Up @@ -38,11 +38,11 @@ export class PrepareNextSlotScheduler {
private readonly logger: Logger,
private readonly signal: AbortSignal
) {
this.chain.emitter.on(ChainEvent.clockSlot, this.prepareForNextSlot);
this.chain.clock.on(ClockEvent.slot, this.prepareForNextSlot);
this.signal.addEventListener(
"abort",
() => {
this.chain.emitter.off(ChainEvent.clockSlot, this.prepareForNextSlot);
this.chain.clock.off(ClockEvent.slot, this.prepareForNextSlot);
},
{once: true}
);
Expand Down
13 changes: 7 additions & 6 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import {deneb, Epoch, phase0, allForks, altair} from "@lodestar/types";
import {routes} from "@lodestar/api";
import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/score";
import {Metrics} from "../metrics/index.js";
import {ChainEvent, IBeaconChain, BeaconClock} from "../chain/index.js";
import {ClockEvent, IClock} from "../util/clock.js";
import {IBeaconChain} from "../chain/index.js";
import {BlockInput, BlockInputType} from "../chain/blocks/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js";
import {formatNodePeer} from "../api/impl/node/utils.js";
Expand Down Expand Up @@ -92,7 +93,7 @@ export class Network implements INetwork {
private readonly libp2p: Libp2p;
private readonly logger: Logger;
private readonly config: BeaconConfig;
private readonly clock: BeaconClock;
private readonly clock: IClock;
private readonly chain: IBeaconChain;
private readonly signal: AbortSignal;

Expand Down Expand Up @@ -136,7 +137,7 @@ export class Network implements INetwork {
this.syncnetsService = syncnetsService;
this.peerManager = peerManager;

this.chain.emitter.on(ChainEvent.clockEpoch, this.onEpoch);
this.chain.clock.on(ClockEvent.epoch, this.onEpoch);
this.chain.emitter.on(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.on(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
modules.signal.addEventListener("abort", this.close.bind(this), {once: true});
Expand Down Expand Up @@ -195,7 +196,7 @@ export class Network implements INetwork {
},
};

const attnetsService = new AttnetsService(config, chain, _gossip, metadata, logger, metricsCore, opts);
const attnetsService = new AttnetsService(config, chain.clock, _gossip, metadata, logger, metricsCore, opts);

gossip = new Eth2Gossipsub(opts, {
config,
Expand All @@ -211,7 +212,7 @@ export class Network implements INetwork {
events: networkEventBus,
});

const syncnetsService = new SyncnetsService(config, chain, gossip, metadata, logger, metricsCore, opts);
const syncnetsService = new SyncnetsService(config, chain.clock, gossip, metadata, logger, metricsCore, opts);

const peerManager = new PeerManager(
{
Expand Down Expand Up @@ -283,7 +284,7 @@ export class Network implements INetwork {
async close(): Promise<void> {
if (this.closed) return;

this.chain.emitter.off(ChainEvent.clockEpoch, this.onEpoch);
this.chain.emitter.off(ClockEvent.epoch, this.onEpoch);
this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);

Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {IBeaconChain} from "../../chain/interface.js";
import {Metrics} from "../../metrics/metrics.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {GossipType} from "../gossip/interface.js";
import {ChainEvent} from "../../chain/emitter.js";
import {ClockEvent} from "../../util/clock.js";
import {GossipErrorCode} from "../../chain/errors/gossipValidation.js";
import {createGossipQueues} from "./gossipQueues.js";
import {NetworkWorker, NetworkWorkerModules} from "./worker.js";
Expand Down Expand Up @@ -129,7 +129,7 @@ export class NetworkProcessor {

events.on(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage.bind(this));
this.chain.emitter.on(routes.events.EventType.block, this.onBlockProcessed.bind(this));
this.chain.emitter.on(ChainEvent.clockSlot, this.onClockSlot.bind(this));
this.chain.clock.on(ClockEvent.slot, this.onClockSlot.bind(this));

this.awaitingGossipsubMessagesByRootBySlot = new MapDef(
() => new MapDef<RootHex, Set<PendingGossipsubMessage>>(() => new Set())
Expand All @@ -154,7 +154,7 @@ export class NetworkProcessor {
async stop(): Promise<void> {
this.events.off(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage);
this.chain.emitter.off(routes.events.EventType.block, this.onBlockProcessed);
this.chain.emitter.off(ChainEvent.clockSlot, this.onClockSlot);
this.chain.emitter.off(ClockEvent.slot, this.onClockSlot);
}

dropAllJobs(): void {
Expand Down
Loading

0 comments on commit 1e4ca97

Please sign in to comment.