From c3c1d6e52f05b4e4203cde152982964695ad69bb Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 16 Feb 2022 14:49:22 +0100 Subject: [PATCH] make `SyncQueue` / `SyncManager` generic The libp2p light client sync protocol defines an endpoint for syncing historic data that is structured similarly to `beaconBlocksByRange`, i.e., it uses a start/count/step tuple to sync from finalized to head. See https://github.com/ethereum/consensus-specs/pull/2802 As preparation, this patch extends the `SyncQueue` and `SyncManager` implementations to work with such new `ByRange` endpoints as well. --- AllTests-mainnet.md | 2 +- beacon_chain/beacon_node.nim | 6 +- beacon_chain/networking/peer_scores.nim | 16 +- beacon_chain/nimbus_beacon_node.nim | 4 +- beacon_chain/sync/sync_manager.nim | 543 ++++++++++------- beacon_chain/sync/sync_queue.nim | 766 +++++++++++++----------- tests/test_sync_manager.nim | 203 ++++--- 7 files changed, 866 insertions(+), 674 deletions(-) diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 56243951ef..9bfef9af56 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -414,7 +414,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4 + [SyncQueue#Forward] getRewindPoint() test OK + [SyncQueue] checkResponse() test OK + [SyncQueue] contains() test OK -+ [SyncQueue] getLastNonEmptySlot() test OK ++ [SyncQueue] getLastNonEmptyKey() test OK + [SyncQueue] hasEndGap() test OK ``` OK: 19/19 Fail: 0/19 Skip: 0/19 diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 674714d009..c56a3890ec 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2021 Status Research & Development GmbH +# Copyright (c) 2018-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -59,8 +59,8 @@ type eventBus*: AsyncEventBus vcProcess*: Process requestManager*: RequestManager - syncManager*: SyncManager[Peer, PeerID] - backfiller*: SyncManager[Peer, PeerID] + syncManager*: BeaconBlocksSyncManager[Peer, PeerID] + backfiller*: BeaconBlocksSyncManager[Peer, PeerID] genesisSnapshotContent*: string actionTracker*: ActionTracker processor*: ref Eth2Processor diff --git a/beacon_chain/networking/peer_scores.nim b/beacon_chain/networking/peer_scores.nim index a356bb74dc..dde00c0742 100644 --- a/beacon_chain/networking/peer_scores.nim +++ b/beacon_chain/networking/peer_scores.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2021 Status Research & Development GmbH +# Copyright (c) 2018-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -18,7 +18,7 @@ const ## This peer is sending malformed or nonsensical data PeerScoreHeadTooNew* = -100 - ## The peer reports a head newer than our wall clock slot + ## The peer reports a head newer than our wall clock PeerScoreNoStatus* = -100 ## Peer did not answer `status` request. PeerScoreStaleStatus* = -50 @@ -28,18 +28,18 @@ const PeerScoreGoodStatus* = 50 ## Peer's `status` answer is fine. PeerScoreNoBlocks* = -100 - ## Peer did not respond in time on `blocksByRange` request. + ## Peer did not respond in time to `ByRange` request. PeerScoreGoodBlocks* = 100 - ## Peer's `blocksByRange` answer is fine. + ## Peer's `ByRange` answer is fine. PeerScoreBadBlocks* = -1000 - ## Peer's response contains incorrect blocks. + ## Peer's response contains incorrect values. PeerScoreBadResponse* = -1000 ## Peer's response is not in requested range. PeerScoreMissingBlocks* = -25 - ## Peer response contains too many empty blocks - this can happen either + ## Peer response contains too many empty values - this can happen either ## because a long reorg happened or the peer is falsely trying to convince ## us that a long reorg happened. - ## Peer's `blocksByRange` answer is fine. + ## Peer's `ByRange` answer is fine. PeerScoreUnviableFork* = -200 - ## Peer responded with blocks from an unviable fork - are they on a + ## Peer responded with values from an unviable fork - are they on a ## different chain? diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index d5eb6d6af4..041446e0c6 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -490,11 +490,11 @@ proc init*(T: type BeaconNode, blockProcessor, validatorMonitor, dag, attestationPool, exitPool, validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool) - syncManager = newSyncManager[Peer, PeerID]( + syncManager = newBeaconBlocksSyncManager[Peer, PeerID]( network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, dag.tail.slot, blockVerifier) - backfiller = newSyncManager[Peer, PeerID]( + backfiller = newBeaconBlocksSyncManager[Peer, PeerID]( network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 76433010bd..13ea938e94 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -42,24 +42,24 @@ type future: Future[void] status: SyncWorkerStatus - SyncManager*[A, B] = ref object + SyncManager*[A, B; E: SyncEndpoint] = ref object pool: PeerPool[A, B] responseTimeout: chronos.Duration maxHeadAge: uint64 toleranceValue: uint64 - getLocalHeadSlot: GetSlotCallback - getLocalWallSlot: GetSlotCallback - getSafeSlot: GetSlotCallback - getFirstSlot: GetSlotCallback - getLastSlot: GetSlotCallback - progressPivot: Slot + getLocalHeadKey: GetSyncKeyCallback[E.K] + getLocalWallKey: GetSyncKeyCallback[E.K] + getSafeKey: GetSyncKeyCallback[E.K] + getFirstKey: GetSyncKeyCallback[E.K] + getLastKey: GetSyncKeyCallback[E.K] + progressPivot*: E.K workers: array[SyncWorkersCount, SyncWorker[A, B]] notInSyncEvent: AsyncEvent rangeAge: uint64 chunkSize: uint64 - queue: SyncQueue[A] + queue: SyncQueue[A, E] syncFut: Future[void] - blockVerifier: BlockVerifier + valueVerifier: SyncValueVerifier[E.V] inProgress*: bool insSyncSpeed*: float avgSyncSpeed*: float @@ -71,7 +71,32 @@ type slots*: uint64 SyncManagerError* = object of CatchableError - BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]] + SyncValueRes*[R] = NetRes[seq[R]] + +template declareSyncManager(name: untyped): untyped {.dirty.} = + type + `name SyncManager`*[A, B] = SyncManager[A, B, `name SyncEndpoint`] + `name Res`* = SyncValueRes[`name SyncEndpoint`.R] + + template `new name SyncManager`*[A, B]( + pool: PeerPool[A, B], + direction: SyncQueueKind, + getLocalHeadKeyCb: GetSyncKeyCallback[`name SyncEndpoint`.K], + getLocalWallKeyCb: GetSyncKeyCallback[`name SyncEndpoint`.K], + getFinalizedKeyCb: GetSyncKeyCallback[`name SyncEndpoint`.K], + getBackfillKeyCb: GetSyncKeyCallback[`name SyncEndpoint`.K], + progressPivot: `name SyncEndpoint`.K, + valueVerifier: SyncValueVerifier[`name SyncEndpoint`.V], + maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), + chunkSize = uint64(SLOTS_PER_EPOCH), + toleranceValue = uint64(1) + ): `name SyncManager`[A, B] = + `name SyncEndpoint`.newSyncManager( + pool, direction, getLocalHeadKeyCb, getLocalWallKeyCb, getFinalizedKeyCb, + getBackfillKeyCb, progressPivot, valueVerifier, maxHeadAge, chunkSize, + toleranceValue) + +declareSyncManager BeaconBlocks proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} = SyncMoment(stamp: now(chronos.Moment), slots: slots) @@ -86,118 +111,128 @@ proc speed*(start, finish: SyncMoment): float {.inline.} = dur = toFloatSeconds(finish.stamp - start.stamp) slots / dur -proc initQueue[A, B](man: SyncManager[A, B]) = +proc initQueue[A, B, E](man: SyncManager[A, B, E]) = case man.direction of SyncQueueKind.Forward: - man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(), - man.getLastSlot(), man.chunkSize, - man.getSafeSlot, man.blockVerifier, 1) + man.queue = E.initSyncQueue(A, man.direction, man.getFirstKey(), + man.getLastKey(), man.chunkSize, + man.getSafeKey, man.valueVerifier, 1) of SyncQueueKind.Backward: let - firstSlot = man.getFirstSlot() - lastSlot = man.getLastSlot() - startSlot = if firstSlot == lastSlot: - # This case should never be happened in real life because - # there is present check `needsBackfill(). - firstSlot - else: - Slot(firstSlot - 1'u64) - man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot, - man.chunkSize, man.getSafeSlot, - man.blockVerifier, 1) - -proc newSyncManager*[A, B](pool: PeerPool[A, B], - direction: SyncQueueKind, - getLocalHeadSlotCb: GetSlotCallback, - getLocalWallSlotCb: GetSlotCallback, - getFinalizedSlotCb: GetSlotCallback, - getBackfillSlotCb: GetSlotCallback, - progressPivot: Slot, - blockVerifier: BlockVerifier, - maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), - chunkSize = uint64(SLOTS_PER_EPOCH), - toleranceValue = uint64(1) - ): SyncManager[A, B] = - let (getFirstSlot, getLastSlot, getSafeSlot) = case direction + firstKey = man.getFirstKey() + lastKey = man.getLastKey() + startKey = if firstKey == lastKey: + # This case should never be happened in real life because + # there is present check `needsBackfill(). + firstKey + else: + E.K(firstKey - 1'u64) + man.queue = E.initSyncQueue(A, man.direction, firstKey, lastKey, + man.chunkSize, man.getSafeKey, + man.valueVerifier, 1) + +proc newSyncManager*[A, B, E](e: typedesc[E], pool: PeerPool[A, B], + direction: SyncQueueKind, + getLocalHeadKeyCb: GetSyncKeyCallback[E.K], + getLocalWallKeyCb: GetSyncKeyCallback[E.K], + getFinalizedKeyCb: GetSyncKeyCallback[E.K], + getBackfillKeyCb: GetSyncKeyCallback[E.K], + progressPivot: E.K, + valueVerifier: SyncValueVerifier[E.V], + maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), + chunkSize = uint64(SLOTS_PER_EPOCH), + toleranceValue = uint64(1) + ): SyncManager[A, B, E] = + let (getFirstKey, getLastKey, getSafeKey) = case direction of SyncQueueKind.Forward: - (getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb) + (getLocalHeadKeyCb, getLocalWallKeyCb, getFinalizedKeyCb) of SyncQueueKind.Backward: - (getBackfillSlotCb, GetSlotCallback(proc(): Slot = Slot(0)), - getBackfillSlotCb) + (getBackfillKeyCb, GetSyncKeyCallback[E.K](proc(): E.K = E.K(0)), + getBackfillKeyCb) - var res = SyncManager[A, B]( + var res = SyncManager[A, B, E]( pool: pool, - getLocalHeadSlot: getLocalHeadSlotCb, - getLocalWallSlot: getLocalWallSlotCb, - getSafeSlot: getSafeSlot, - getFirstSlot: getFirstSlot, - getLastSlot: getLastSlot, + getLocalHeadKey: getLocalHeadKeyCb, + getLocalWallKey: getLocalWallKeyCb, + getSafeKey: getSafeKey, + getFirstKey: getFirstKey, + getLastKey: getLastKey, progressPivot: progressPivot, maxHeadAge: maxHeadAge, chunkSize: chunkSize, - blockVerifier: blockVerifier, + valueVerifier: valueVerifier, notInSyncEvent: newAsyncEvent(), direction: direction ) res.initQueue() res -proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, - req: SyncRequest): Future[BeaconBlocksRes] {.async.} = +proc doRequest*[A, B](man: BeaconBlocksSyncManager[A, B], peer: A, + req: BeaconBlocksSyncRequest[A] + ): Future[BeaconBlocksRes] {.async.} = mixin beaconBlocksByRange, getScore, `==` doAssert(not(req.isEmpty()), "Request must not be empty!") debug "Requesting blocks from peer", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, + slot = req.start, slot_count = req.count, step = req.step, peer_score = peer.getScore(), peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman" try: let res = if peer.useSyncV2(): - await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step) + await beaconBlocksByRange_v2(peer, req.start, req.count, req.step) else: - (await beaconBlocksByRange(peer, req.slot, req.count, req.step)).map( + (await beaconBlocksByRange(peer, req.start, req.count, req.step)).map( proc(blcks: seq[phase0.SignedBeaconBlock]): auto = blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it)))) if res.isErr(): - debug "Error, while reading getBlocks response", - peer = peer, slot = req.slot, count = req.count, + debug "Error, while reading beaconBlocksByRange response", + peer = peer, slot = req.start, count = req.count, step = req.step, peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman", error = $res.error() return return res except CancelledError: - debug "Interrupt, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, + debug "Interrupt, while waiting beaconBlocksByRange response", peer = peer, + slot = req.start, slot_count = req.count, step = req.step, peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman" return except CatchableError as exc: - debug "Error, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, + debug "Error, while waiting beaconBlocksByRange response", peer = peer, + slot = req.start, slot_count = req.count, step = req.step, errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman" return -proc remainingSlots(man: SyncManager): uint64 = +proc remainingKeys(man: SyncManager): uint64 = if man.direction == SyncQueueKind.Forward: - man.getLastSlot() - man.getFirstSlot() + man.getLastKey() - man.getFirstKey() else: - man.getFirstSlot() - man.getLastSlot() + man.getFirstKey() - man.getLastKey() -proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = +func slotToKey[E: SyncEndpoint](slot: Slot, e: typedesc[E]): E.K = + when E.K is Slot: + slot + else: static: raiseAssert false + +proc syncStep[A, B, E](man: SyncManager[A, B, E], + index: int, peer: A) {.async.} = var - headSlot = man.getLocalHeadSlot() - wallSlot = man.getLocalWallSlot() + headKey = man.getLocalHeadKey() + wallKey = man.getLocalWallKey() peerSlot = peer.getHeadSlot() + peerKey = peerSlot.slotToKey(E) block: # Check that peer status is recent and relevant - debug "Peer's syncing status", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - peer_score = peer.getScore(), peer = peer, index = index, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" + when E.K is Slot: + debug "Peer's syncing status", wall_clock_slot = wallKey, + remote_head_slot = peerKey, local_head_slot = headKey, + peer_score = peer.getScore(), peer = peer, index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + else: static: raiseAssert false let peerStatusAge = Moment.now() - peer.state(BeaconSync).statusLastTime @@ -205,21 +240,23 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = # Latest status we got is old peerStatusAge >= StatusExpirationTime or # The point we need to sync is close to where the peer is - man.getFirstSlot() >= peerSlot + man.getFirstKey() >= peerKey if needsUpdate: man.workers[index].status = SyncWorkerStatus.UpdatingStatus # Avoid a stampede of requests, but make them more frequent in case the - # peer is "close" to the slot range of interest + # peer is "close" to the key range of interest if peerStatusAge < StatusExpirationTime div 2: await sleepAsync(StatusExpirationTime div 2 - peerStatusAge) - trace "Updating peer's status information", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - peer = peer, peer_score = peer.getScore(), index = index, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" + when E.K is Slot: + trace "Updating peer's status information", wall_clock_slot = wallKey, + remote_head_slot = peerKey, local_head_slot = headKey, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + else: static: raiseAssert false try: let res = await peer.updateStatus() @@ -238,67 +275,81 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = topics = "syncman" return - let newPeerSlot = peer.getHeadSlot() + let + newPeerSlot = peer.getHeadSlot() + newPeerKey = newPeerSlot.slotToKey(E) if peerSlot >= newPeerSlot: peer.updateScore(PeerScoreStaleStatus) - debug "Peer's status information is stale", - wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, - local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, - peer = peer, peer_score = peer.getScore(), index = index, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" + when E.K is Slot: + debug "Peer's status information is stale", + wall_clock_slot = wallKey, remote_old_head_slot = peerSlot, + local_head_slot = headKey, remote_new_head_slot = newPeerSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + else: static: raiseAssert false else: - debug "Peer's status information updated", wall_clock_slot = wallSlot, - remote_old_head_slot = peerSlot, local_head_slot = headSlot, - remote_new_head_slot = newPeerSlot, peer = peer, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - index = index, direction = man.direction, topics = "syncman" + when E.K is Slot: + debug "Peer's status information updated", + wall_clock_slot = wallKey, + remote_old_head_slot = peerKey, local_head_slot = headKey, + remote_new_head_slot = newPeerKey, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + index = index, direction = man.direction, topics = "syncman" + else: static: raiseAssert false peer.updateScore(PeerScoreGoodStatus) peerSlot = newPeerSlot + peerKey = newPeerKey - # Time passed - enough to move slots, if sleep happened - headSlot = man.getLocalHeadSlot() - wallSlot = man.getLocalWallSlot() + # Time passed - enough to move to newer key, if sleep happened + headKey = man.getLocalHeadKey() + wallKey = man.getLocalWallKey() - if peerSlot > wallSlot + man.toleranceValue: - # If the peer reports a head slot higher than our wall slot, something is + if peerKey > wallKey + man.toleranceValue: + # If the peer reports a head newer than our wall clock, something is # wrong: our clock is off or the peer is on a different network (or # dishonest) peer.updateScore(PeerScoreHeadTooNew) - warn "Peer reports a head newer than our wall clock - clock out of sync?", - wall_clock_slot = wallSlot, remote_head_slot = peerSlot, - local_head_slot = headSlot, peer = peer, index = index, - tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(), - peer_score = peer.getScore(), direction = man.direction, - topics = "syncman" + when E.K is Slot: + warn "Peer reports a head newer than our wall clock - clock out of sync?", + wall_clock_slot = wallKey, remote_head_slot = peerKey, + local_head_slot = headKey, peer = peer, index = index, + tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(), + peer_score = peer.getScore(), direction = man.direction, + topics = "syncman" + else: static: raiseAssert false return - if man.remainingSlots() <= man.maxHeadAge: + if man.remainingKeys() <= man.maxHeadAge: case man.direction of SyncQueueKind.Forward: - info "We are in sync with network", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - peer = peer, peer_score = peer.getScore(), index = index, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + info "We are in sync with network", wall_clock_slot = wallKey, + remote_head_slot = peerKey, local_head_slot = headKey, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + else: static: raiseAssert false of SyncQueueKind.Backward: - info "Backfill complete", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - peer = peer, peer_score = peer.getScore(), index = index, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + info "Backfill complete", wall_clock_slot = wallKey, + remote_head_slot = peerKey, local_head_slot = headKey, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + else: static: raiseAssert false # We clear SyncManager's `notInSyncEvent` so all the workers will become # sleeping soon. man.notInSyncEvent.clear() return - # Find out if the peer potentially can give useful blocks - in the case of - # forward sync, they can be useful if they have blocks newer than our head - - # in the case of backwards sync, they're useful if they have blocks newer than + # Find out if the peer potentially can give useful values - in the case of + # forward sync, they can be useful if they have values newer than our head - + # in the case of backwards sync, they're useful if they have values newer than # the backfill point - if man.getFirstSlot() >= peerSlot: + if man.getFirstKey() >= peerKey: # This is not very good solution because we should not discriminate and/or # penalize peers which are in sync process too, but their latest head is # lower then our latest head. We should keep connections with such peers @@ -307,96 +358,110 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = # Right now we decreasing peer's score a bit, so it will not be # disconnected due to low peer's score, but new fresh peers could replace # peers with low latest head. - debug "Peer's head slot is lower then local head slot", - wall_clock_slot = wallSlot, remote_head_slot = peerSlot, - local_last_slot = man.getLastSlot(), - local_first_slot = man.getFirstSlot(), peer = peer, - peer_score = peer.getScore(), - peer_speed = peer.netKbps(), index = index, - direction = man.direction, topics = "syncman" + when E.K is Slot: + debug "Peer's head slot is lower then local head slot", + wall_clock_slot = wallKey, remote_head_slot = peerKey, + local_last_slot = man.getLastKey(), + local_first_slot = man.getFirstKey(), peer = peer, + peer_score = peer.getScore(), + peer_speed = peer.netKbps(), index = index, + direction = man.direction, topics = "syncman" + else: static: raiseAssert false peer.updateScore(PeerScoreUseless) return if man.direction == SyncQueueKind.Forward: # Wall clock keeps ticking, so we need to update the queue - man.queue.updateLastSlot(man.getLastSlot()) + man.queue.updateLastKey(man.getLastKey()) man.workers[index].status = SyncWorkerStatus.Requesting - let req = man.queue.pop(peerSlot, peer) + let req = man.queue.pop(peerKey, peer) if req.isEmpty(): # SyncQueue could return empty request in 2 cases: - # 1. There no more slots in SyncQueue to download (we are synced, but + # 1. There no more keys in SyncQueue to download (we are synced, but # our ``notInSyncEvent`` is not yet cleared). - # 2. Current peer's known head slot is too low to satisfy request. + # 2. Current peer's known head key is too low to satisfy request. # # To avoid endless loop we going to wait for RESP_TIMEOUT time here. # This time is enough for all pending requests to finish and it is also # enough for main sync loop to clear ``notInSyncEvent``. - debug "Empty request received from queue, exiting", peer = peer, - local_head_slot = headSlot, remote_head_slot = peerSlot, - queue_input_slot = man.queue.inpSlot, - queue_output_slot = man.queue.outSlot, - queue_last_slot = man.queue.finalSlot, - peer_speed = peer.netKbps(), peer_score = peer.getScore(), - index = index, direction = man.direction, topics = "syncman" + when E.K is Slot: + debug "Empty request received from queue, exiting", peer = peer, + local_head_slot = headKey, remote_head_slot = peerKey, + queue_input_slot = man.queue.inpKey, + queue_output_slot = man.queue.outKey, + queue_last_slot = man.queue.finalKey, + peer_speed = peer.netKbps(), peer_score = peer.getScore(), + index = index, direction = man.direction, topics = "syncman" + else: static: raiseAssert false await sleepAsync(RESP_TIMEOUT) return - debug "Creating new request for peer", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, peer = peer, peer_speed = peer.netKbps(), - peer_score = peer.getScore(), index = index, - direction = man.direction, topics = "syncman" + when E.K is Slot: + debug "Creating new request for peer", wall_clock_slot = wallKey, + remote_head_slot = peerKey, local_head_slot = headKey, + request_slot = req.start, request_count = req.count, + request_step = req.step, peer = peer, peer_speed = peer.netKbps(), + peer_score = peer.getScore(), index = index, + direction = man.direction, topics = "syncman" + else: static: raiseAssert false man.workers[index].status = SyncWorkerStatus.Downloading try: - let blocks = await man.getBlocks(peer, req) - if blocks.isOk(): - let data = blocks.get() - let smap = getShortMap(req, data) - debug "Received blocks on request", blocks_count = len(data), - blocks_map = smap, request_slot = req.slot, - request_count = req.count, request_step = req.step, - peer = peer, peer_score = peer.getScore(), - peer_speed = peer.netKbps(), index = index, - direction = man.direction, topics = "syncman" + let response = await man.doRequest(peer, req) + if response.isOk(): + let data = response.get() + let smap = getShortMap[A, E](req, data) + when E is BeaconBlocksSyncEndpoint: + debug "Received blocks on request", blocks_count = len(data), + blocks_map = smap, request_slot = req.start, + request_count = req.count, request_step = req.step, + peer = peer, peer_score = peer.getScore(), + peer_speed = peer.netKbps(), index = index, + direction = man.direction, topics = "syncman" + else: static: raiseAssert false if not(checkResponse(req, data)): peer.updateScore(PeerScoreBadResponse) - warn "Received blocks sequence is not in requested range", - blocks_count = len(data), blocks_map = smap, - request_slot = req.slot, request_count = req.count, - request_step = req.step, peer = peer, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - index = index, direction = man.direction, topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + warn "Received blocks sequence is not in requested range", + blocks_count = len(data), blocks_map = smap, + request_slot = req.start, request_count = req.count, + request_step = req.step, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + index = index, direction = man.direction, topics = "syncman" + else: static: raiseAssert false return - # Scoring will happen in `syncUpdate`. + # Scoring will be done inside of SyncQueue. man.workers[index].status = SyncWorkerStatus.Queueing await man.queue.push(req, data, proc() = man.workers[index].status = SyncWorkerStatus.Processing) else: peer.updateScore(PeerScoreNoBlocks) man.queue.push(req) - debug "Failed to receive blocks on request", - request_slot = req.slot, request_count = req.count, - request_step = req.step, peer = peer, index = index, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - direction = man.direction, topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + debug "Failed to receive blocks on request", + request_slot = req.start, request_count = req.count, + request_step = req.step, peer = peer, index = index, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + direction = man.direction, topics = "syncman" + else: static: raiseAssert false return except CatchableError as exc: - debug "Unexpected exception while receiving blocks", - request_slot = req.slot, request_count = req.count, - request_step = req.step, peer = peer, index = index, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - errName = exc.name, errMsg = exc.msg, direction = man.direction, - topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + debug "Unexpected exception while receiving blocks", + request_slot = req.start, request_count = req.count, + request_step = req.step, peer = peer, index = index, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + errName = exc.name, errMsg = exc.msg, direction = man.direction, + topics = "syncman" + else: static: raiseAssert false return -proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = +proc syncWorker[A, B, E](man: SyncManager[A, B, E], index: int) {.async.} = mixin getKey, getScore, getHeadSlot debug "Starting syncing worker", index = index, direction = man.direction, @@ -431,10 +496,10 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = debug "Sync worker stopped", index = index, direction = man.direction, topics = "syncman" -proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, - sleeping: int, - waiting: int, - pending: int] = +proc getWorkersStats[A, B, E](man: SyncManager[A, B, E]): tuple[map: string, + sleeping: int, + waiting: int, + pending: int] = var map = newString(len(man.workers)) var sleeping, waiting, pending: int for i in 0 ..< len(man.workers): @@ -464,12 +529,12 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, map[i] = ch (map, sleeping, waiting, pending) -proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = +proc guardTask[A, B, E](man: SyncManager[A, B, E]) {.async.} = var pending: array[SyncWorkersCount, Future[void]] # Starting all the synchronization workers. for i in 0 ..< len(man.workers): - let future = syncWorker[A, B](man, i) + let future = syncWorker[A, B, E](man, i) man.workers[i].future = future pending[i] = future @@ -485,7 +550,7 @@ proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = warn "Synchronization worker stopped working unexpectedly without error", index = index, direction = man.direction - let future = syncWorker[A, B](man, index) + let future = syncWorker[A, B, E](man, index) man.workers[index].future = future pending[index] = future @@ -515,7 +580,13 @@ proc toTimeLeftString*(d: Duration): string = res = res & "00m" res -proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = +func slots[K](keys: K): Slot = + when K is Slot: + keys + else: + static: raiseAssert false + +proc syncLoop[A, B, E](man: SyncManager[A, B, E]) {.async.} = mixin getKey, getScore var pauseTime = 0 @@ -536,14 +607,14 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = await sleepAsync(seconds(SECONDS_PER_SLOT.int64)) var - stamp = SyncMoment.now(man.queue.progress()) + stamp = SyncMoment.now(E.K(man.queue.progress()).slots.uint64) syncCount = 0 while man.inProgress: await sleepAsync(seconds(SECONDS_PER_SLOT.int64)) let - newStamp = SyncMoment.now(man.queue.progress()) + newStamp = SyncMoment.now(E.K(man.queue.progress()).slots.uint64) slotsPerSec = speed(stamp, newStamp) syncCount += 1 @@ -557,40 +628,43 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = var averageSpeedTaskFut = averageSpeedTask() while true: - let wallSlot = man.getLocalWallSlot() - let headSlot = man.getLocalHeadSlot() + let wallKey = man.getLocalWallKey() + let headKey = man.getLocalHeadKey() let (map, sleeping, waiting, pending) = man.getWorkersStats() - debug "Current syncing state", workers_map = map, - sleeping_workers_count = sleeping, - waiting_workers_count = waiting, - pending_workers_count = pending, - wall_head_slot = wallSlot, local_head_slot = headSlot, - pause_time = $chronos.seconds(pauseTime), - avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed, - direction = man.direction, topics = "syncman" + when E.K is Slot: + debug "Current syncing state", workers_map = map, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, + pending_workers_count = pending, + wall_head_slot = wallKey, local_head_slot = headKey, + pause_time = $chronos.seconds(pauseTime), + avg_sync_speed = man.avgSyncSpeed, + ins_sync_speed = man.insSyncSpeed, + direction = man.direction, topics = "syncman" + else: static: doAssert false let pivot = man.progressPivot progress = float( - if man.queue.kind == SyncQueueKind.Forward: man.queue.outSlot - pivot - else: pivot - man.queue.outSlot) + if man.queue.kind == SyncQueueKind.Forward: man.queue.outKey - pivot + else: pivot - man.queue.outKey) total = float( - if man.queue.kind == SyncQueueKind.Forward: man.queue.finalSlot - pivot - else: pivot - man.queue.finalSlot) + if man.queue.kind == SyncQueueKind.Forward: man.queue.finalKey - pivot + else: pivot - man.queue.finalKey) remaining = total - progress done = if total > 0.0: progress / total else: 1.0 timeleft = if man.avgSyncSpeed >= 0.001: Duration.fromFloatSeconds(remaining / man.avgSyncSpeed) else: InfiniteDuration - currentSlot = Base10.toString( + currentKey = if man.queue.kind == SyncQueueKind.Forward: - max(uint64(man.queue.outSlot), 1'u64) - 1'u64 + max(uint64(man.queue.outKey), 1'u64) - 1'u64 else: - uint64(man.queue.outSlot) + 1'u64 - ) + uint64(man.queue.outKey) + 1'u64 + currentSlot = Base10.toString(E.K(currentKey).slots.uint64) # Update status string man.syncStatus = timeLeft.toTimeLeftString() & " (" & @@ -598,17 +672,20 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) & "slots/s (" & map & ":" & currentSlot & ")" - if man.remainingSlots() <= man.maxHeadAge: + if man.remainingKeys() <= man.maxHeadAge: man.notInSyncEvent.clear() # We are marking SyncManager as not working only when we are in sync and # all sync workers are in `Sleeping` state. if pending > 0: - debug "Synchronization loop waits for workers completion", - wall_head_slot = wallSlot, local_head_slot = headSlot, - difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge, - sleeping_workers_count = sleeping, - waiting_workers_count = waiting, pending_workers_count = pending, - direction = man.direction, topics = "syncman" + when E.K is Slot: + debug "Synchronization loop waits for workers completion", + wall_head_slot = wallKey, local_head_slot = headKey, + difference = (wallKey - headKey), max_head_age = man.maxHeadAge, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, + pending_workers_count = pending, + direction = man.direction, topics = "syncman" + else: static: doAssert false # We already synced, so we should reset all the pending workers from # any state they have. man.queue.clearAndWakeup() @@ -618,17 +695,21 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = of SyncQueueKind.Forward: if man.inProgress: man.inProgress = false - debug "Forward synchronization process finished, sleeping", - wall_head_slot = wallSlot, local_head_slot = headSlot, - difference = (wallSlot - headSlot), - max_head_age = man.maxHeadAge, direction = man.direction, - topics = "syncman" + when E.K is Slot: + debug "Forward synchronization process finished, sleeping", + wall_head_slot = wallKey, local_head_slot = headKey, + difference = (wallKey - headKey), + max_head_age = man.maxHeadAge, direction = man.direction, + topics = "syncman" + else: static: doAssert false else: - debug "Synchronization loop sleeping", wall_head_slot = wallSlot, - local_head_slot = headSlot, - difference = (wallSlot - headSlot), - max_head_age = man.maxHeadAge, direction = man.direction, - topics = "syncman" + when E.K is Slot: + debug "Synchronization loop sleeping", + wall_head_slot = wallKey, local_head_slot = headKey, + difference = (wallKey - headKey), + max_head_age = man.maxHeadAge, direction = man.direction, + topics = "syncman" + else: static: doAssert false of SyncQueueKind.Backward: # Backward syncing is going to be executed only once, so we exit loop # and stop all pending tasks which belongs to this instance (sync @@ -649,11 +730,13 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = res await allFutures(pendingTasks) man.inProgress = false - debug "Backward synchronization process finished, exiting", - wall_head_slot = wallSlot, local_head_slot = headSlot, - backfill_slot = man.getLastSlot(), - max_head_age = man.maxHeadAge, direction = man.direction, - topics = "syncman" + when E.K is Slot: + debug "Backward synchronization process finished, exiting", + wall_head_slot = wallKey, local_head_slot = headKey, + backfill_slot = man.getLastKey(), + max_head_age = man.maxHeadAge, direction = man.direction, + topics = "syncman" + else: static: doAssert false break else: if not(man.notInSyncEvent.isSet()): @@ -662,26 +745,28 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = man.initQueue() man.notInSyncEvent.fire() man.inProgress = true - debug "Node lost sync for more then preset period", - period = man.maxHeadAge, wall_head_slot = wallSlot, - local_head_slot = headSlot, - missing_slots = man.remainingSlots(), - progress = float(man.queue.progress()), - topics = "syncman" + when E.K is Slot: + debug "Node lost sync for more then preset period", + period = man.maxHeadAge, wall_head_slot = wallKey, + local_head_slot = headKey, + missing_slots = man.remainingKeys(), + progress = float(man.queue.progress()), + topics = "syncman" + else: static: doAssert false else: man.notInSyncEvent.fire() man.inProgress = true await sleepAsync(chronos.seconds(2)) -proc start*[A, B](man: SyncManager[A, B]) = +proc start*[A, B, E](man: SyncManager[A, B, E]) = ## Starts SyncManager's main loop. man.syncFut = man.syncLoop() -proc getInfo*[A, B](man: SyncManager[A, B]): RpcSyncInfo = +proc getInfo*[A, B, E](man: SyncManager[A, B, E]): RpcSyncInfo = ## Returns current synchronization information for RPC call. - let wallSlot = man.getLocalWallSlot() - let headSlot = man.getLocalHeadSlot() + let wallSlot = man.getLocalWallKey().slots + let headSlot = man.getLocalHeadKey().slots let sync_distance = wallSlot - headSlot ( head_slot: headSlot, diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 04f34c1be1..a08296fc0e 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -24,72 +24,116 @@ logScope: topics = "syncqueue" type - GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} - ProcessingCallback* = proc() {.gcsafe, raises: [Defect].} - BlockVerifier* = - proc(signedBlock: ForkedSignedBeaconBlock): - Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].} + SyncEndpoint*[K, V, R] = (K, V, R) + + GetSyncKeyCallback*[K] = + proc(): K {.gcsafe, raises: [Defect].} + ProcessingCallback* = + proc() {.gcsafe, raises: [Defect].} + SyncValueVerifier*[V] = + proc(v: V): Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].} SyncQueueKind* {.pure.} = enum Forward, Backward - SyncRequest*[T] = object + SyncRequest*[T; E: SyncEndpoint] = object kind: SyncQueueKind index*: uint64 - slot*: Slot + start*: E.K count*: uint64 step*: uint64 item*: T - SyncResult*[T] = object - request*: SyncRequest[T] - data*: seq[ref ForkedSignedBeaconBlock] + SyncResult*[T; E: SyncEndpoint] = object + request*: SyncRequest[T, E] + data*: seq[E.R] SyncWaiter* = ref object future: Future[void] reset: bool - RewindPoint = object - failSlot: Slot - epochCount: uint64 + RewindPoint[K] = object + failKey: K + count: uint64 - SyncQueue*[T] = ref object + SyncQueue*[T; E: SyncEndpoint] = ref object kind*: SyncQueueKind - inpSlot*: Slot - outSlot*: Slot - startSlot*: Slot - finalSlot*: Slot + inpKey*: E.K + outKey*: E.K + startKey*: E.K + finalKey*: E.K chunkSize*: uint64 queueSize*: int counter*: uint64 - pending*: Table[uint64, SyncRequest[T]] + pending*: Table[uint64, SyncRequest[T, E]] waiters: seq[SyncWaiter] - getSafeSlot*: GetSlotCallback - debtsQueue: HeapQueue[SyncRequest[T]] + getSafeKey*: GetSyncKeyCallback[E.K] + debtsQueue: HeapQueue[SyncRequest[T, E]] debtsCount: uint64 - readyQueue: HeapQueue[SyncResult[T]] - rewind: Option[RewindPoint] - blockVerifier: BlockVerifier - - SyncManagerError* = object of CatchableError - BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]] + readyQueue: HeapQueue[SyncResult[T, E]] + rewind: Option[RewindPoint[E.K]] + valueVerifier: SyncValueVerifier[E.V] chronicles.formatIt SyncQueueKind: $it -proc getShortMap*[T](req: SyncRequest[T], - data: openArray[ref ForkedSignedBeaconBlock]): string = - ## Returns all slot numbers in ``data`` as placement map. +template declareSyncKey(K: typedesc): untyped {.dirty.} = + type + `Get K Callback`* = GetSyncKeyCallback[K] + `K RewindPoint`* = RewindPoint[K] + +template declareSyncValue(name: untyped, V: typedesc): untyped {.dirty.} = + type + `name Verifier`* = SyncValueVerifier[V] + +template declareSyncEndpoint(name: untyped, K, V: typedesc, + isRefWrapped = false): untyped {.dirty.} = + when isRefWrapped: + type `name SyncEndpoint`* = SyncEndpoint[K, V, ref V] + else: + type `name SyncEndpoint`* = SyncEndpoint[K, V, V] + + type + `name SyncRequest`*[T] = SyncRequest[T, `name SyncEndpoint`] + `name SyncResult`*[T] = SyncResult[T, `name SyncEndpoint`] + `name SyncQueue`*[T] = SyncQueue[T, `name SyncEndpoint`] + + template `init name SyncQueue`*[T](t2: typedesc[T], + queueKind: SyncQueueKind, + start, final: K, chunkSize: uint64, + getSafeKeyCb: GetSyncKeyCallback[K], + valueVerifier: SyncValueVerifier[V], + syncQueueSize: int = -1 + ): `name SyncQueue`[T] = + `name SyncEndpoint`.initSyncQueue(t2, queueKind, start, final, + chunkSize, getSafeKeyCb, valueVerifier, + syncQueueSize) + +declareSyncKey Slot + +declareSyncValue Block, ForkedSignedBeaconBlock + +declareSyncEndpoint BeaconBlocks, + Slot, ForkedSignedBeaconBlock, isRefWrapped = true + +func key[E](v: E.R, e: typedesc[E]): E.K = + when E is BeaconBlocksSyncEndpoint: + v[].slot + else: static: raiseAssert false + +proc getShortMap*[T, E](req: SyncRequest[T, E], + data: openArray[E.R]): string = + ## Returns all key values in ``data`` as placement map. var res = newStringOfCap(req.count) - var slider = req.slot + var slider = req.start var last = 0 for i in 0 ..< req.count: if last < len(data): for k in last ..< len(data): - if slider == data[k][].slot: + if slider == data[k].key(E): res.add('x') last = k + 1 break - elif slider < data[k][].slot: + elif slider < data[k].key(E): res.add('.') break else: @@ -97,15 +141,15 @@ proc getShortMap*[T](req: SyncRequest[T], slider = slider + req.step res -proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} = - slot >= req.slot and slot < req.slot + req.count * req.step and - ((slot - req.slot) mod req.step == 0) +proc contains*[T, E](req: SyncRequest[T, E], key: E.K): bool {.inline.} = + key >= req.start and key < req.start + req.count * req.step and + ((key - req.start) mod req.step == 0) -proc cmp*[T](a, b: SyncRequest[T]): int = - cmp(uint64(a.slot), uint64(b.slot)) +proc cmp*[T, E](a, b: SyncRequest[T, E]): int = + cmp(uint64(a.start), uint64(b.start)) -proc checkResponse*[T](req: SyncRequest[T], - data: openArray[ref ForkedSignedBeaconBlock]): bool = +proc checkResponse*[T, E](req: SyncRequest[T, E], + data: openArray[E.R]): bool = if len(data) == 0: # Impossible to verify empty response. return true @@ -115,18 +159,18 @@ proc checkResponse*[T](req: SyncRequest[T], # requested blocks. return false - var slot = req.slot + var key = req.start var rindex = 0'u64 var dindex = 0 while (rindex < req.count) and (dindex < len(data)): - if slot < data[dindex][].slot: + if key < data[dindex].key(E): discard - elif slot == data[dindex][].slot: + elif key == data[dindex].key(E): inc(dindex) else: return false - slot = slot + req.step + key = key + req.step rindex = rindex + 1'u64 if dindex == len(data): @@ -134,46 +178,56 @@ proc checkResponse*[T](req: SyncRequest[T], else: return false -proc getFullMap*[T](req: SyncRequest[T], - data: openArray[ForkedSignedBeaconBlock]): string = - # Returns all slot numbers in ``data`` as comma-delimeted string. - mapIt(data, $it.message.slot).join(", ") - -proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, - finish: Slot, t2: typedesc[T]): SyncRequest[T] = - let count = finish - start + 1'u64 - SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64) - -proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, slot: Slot, - count: uint64, item: T): SyncRequest[T] = - SyncRequest[T](kind: kind, slot: slot, count: count, item: item, step: 1'u64) - -proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, - finish: Slot, item: T): SyncRequest[T] = - let count = finish - start + 1'u64 - SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64, item: item) - -proc empty*[T](t: typedesc[SyncRequest], kind: SyncQueueKind, - t2: typedesc[T]): SyncRequest[T] {.inline.} = - SyncRequest[T](kind: kind, step: 0'u64, count: 0'u64) - -proc setItem*[T](sr: var SyncRequest[T], item: T) = +proc getFullMap*[T, E](req: SyncRequest[T, E], data: openArray[E.R]): string = + # Returns all key values in ``data`` as comma-delimeted string. + mapIt(data, $it.key(E)).join(", ") + +proc init[T, E](t1: typedesc[SyncRequest], kind: SyncQueueKind, + start, final: E.K, t2: typedesc[T], t3: typedesc[E] + ): SyncRequest[T, E] = + let count = final - start + 1'u64 + SyncRequest[T, E]( + kind: kind, start: start, count: count, step: 1'u64) + +proc init[T, E](t1: typedesc[SyncRequest], kind: SyncQueueKind, + start: E.K, count: uint64, item: T, t3: typedesc[E] + ): SyncRequest[T, E] = + SyncRequest[T, E]( + kind: kind, start: start, count: count, step: 1'u64, item: item) + +proc init[T, E](t1: typedesc[SyncRequest], kind: SyncQueueKind, + start, final: E.K, item: T, t3: typedesc[E] + ): SyncRequest[T, E] = + let count = final - start + 1'u64 + SyncRequest[T, E]( + kind: kind, start: start, count: count, step: 1'u64, item: item) + +proc empty*[T, E](t1: typedesc[SyncRequest], kind: SyncQueueKind, + t2: typedesc[T], t3: typedesc[E] + ): SyncRequest[T, E] {.inline.} = + SyncRequest[T, E](kind: kind, count: 0'u64, step: 0'u64) + +proc setItem*[T, E](sr: var SyncRequest[T, E], item: T) = sr.item = item -proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} = +proc isEmpty*[T, E](sr: SyncRequest[T, E]): bool {.inline.} = (sr.step == 0'u64) and (sr.count == 0'u64) -proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], - queueKind: SyncQueueKind, - start, final: Slot, chunkSize: uint64, - getSafeSlotCb: GetSlotCallback, - blockVerifier: BlockVerifier, - syncQueueSize: int = -1): SyncQueue[T] = +proc init[K](t1: typedesc[RewindPoint], + failKey: K, count: uint64): RewindPoint[K] = + RewindPoint[K](failKey: failKey, count: count) + +proc initSyncQueue*[T, E](e: typedesc[E], t2: typedesc[T], + queueKind: SyncQueueKind, + start, final: E.K, chunkSize: uint64, + getSafeKeyCb: GetSyncKeyCallback[E.K], + valueVerifier: SyncValueVerifier[E.V], + syncQueueSize: int = -1): SyncQueue[T, E] = ## Create new synchronization queue with parameters ## - ## ``start`` and ``last`` are starting and finishing Slots. + ## ``start`` and ``final`` are starting and final keys. ## - ## ``chunkSize`` maximum number of slots in one request. + ## ``chunkSize`` maximum number of keys in one request. ## ## ``syncQueueSize`` maximum queue size for incoming data. ## If ``syncQueueSize > 0`` queue will help to keep backpressure under @@ -187,10 +241,11 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], # # Joker's problem # - # According to current Ethereum2 network specification + # According to pre-v0.12.0 Ethereum consensus network specification # > Clients MUST respond with at least one block, if they have it and it # > exists in the range. Clients MAY limit the number of blocks in the # > response. + # https://github.com/ethereum/consensus-specs/blob/v0.11.3/specs/phase0/p2p-interface.md#L590 # # Such rule can lead to very uncertain responses, for example let slots from # 10 to 12 will be not empty. Client which follows specification can answer @@ -204,79 +259,85 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], # 6. X - X # 7. X X - # - # If peer answers with `1` everything will be fine and `block_pool` will be - # able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool` - # will fail immediately with chunk and report "parent is missing" error. - # But in case of `5` and `7` blocks will be processed by `block_pool` without - # any problems, however it will start producing problems right from this - # uncertain last slot. SyncQueue will start producing requests for next + # If peer answers with `1` everything will be fine and `block_processor` + # will be able to process all 3 blocks. + # In case of `2`, `3`, `4`, `6` - `block_processor` will fail immediately + # with chunk and report "parent is missing" error. + # But in case of `5` and `7` blocks will be processed by `block_processor` + # without any problems, however it will start producing problems right from + # this uncertain last slot. SyncQueue will start producing requests for next # blocks, but all the responses from this point will fail with "parent is # missing" error. Lets call such peers "jokers", because they are joking # with responses. # # To fix "joker" problem we going to perform rollback to the latest finalized # epoch's first slot. + # + # Note that as of spec v0.12.0, well-behaving clients are forbidden from + # answering this way. However, it still makes sense to attempt to handle + # this case to increase compatibility (e.g., with weak subjectivity nodes + # that are still backfilling blocks) doAssert(chunkSize > 0'u64, "Chunk size should not be zero") - SyncQueue[T]( + SyncQueue[T, E]( kind: queueKind, - startSlot: start, - finalSlot: final, + startKey: start, + finalKey: final, chunkSize: chunkSize, queueSize: syncQueueSize, - getSafeSlot: getSafeSlotCb, + getSafeKey: getSafeKeyCb, waiters: newSeq[SyncWaiter](), counter: 1'u64, - pending: initTable[uint64, SyncRequest[T]](), - debtsQueue: initHeapQueue[SyncRequest[T]](), - inpSlot: start, - outSlot: start, - blockVerifier: blockVerifier + pending: initTable[uint64, SyncRequest[T, E]](), + debtsQueue: initHeapQueue[SyncRequest[T, E]](), + inpKey: start, + outKey: start, + valueVerifier: valueVerifier ) -proc `<`*[T](a, b: SyncRequest[T]): bool = +proc `<`*[T, E](a, b: SyncRequest[T, E]): bool = doAssert(a.kind == b.kind) case a.kind of SyncQueueKind.Forward: - a.slot < b.slot + a.start < b.start of SyncQueueKind.Backward: - a.slot > b.slot + a.start > b.start -proc `<`*[T](a, b: SyncResult[T]): bool = +proc `<`*[T, E](a, b: SyncResult[T, E]): bool = doAssert(a.request.kind == b.request.kind) case a.request.kind of SyncQueueKind.Forward: - a.request.slot < b.request.slot + a.request.start < b.request.start of SyncQueueKind.Backward: - a.request.slot > b.request.slot + a.request.start > b.request.start -proc `==`*[T](a, b: SyncRequest[T]): bool = - (a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count) and +proc `==`*[T, E](a, b: SyncRequest[T, E]): bool = + (a.kind == b.kind) and (a.start == b.start) and (a.count == b.count) and (a.step == b.step) -proc lastSlot*[T](req: SyncRequest[T]): Slot = - ## Returns last slot for request ``req``. - req.slot + req.count - 1'u64 +proc lastKey*[T, E](req: SyncRequest[T, E]): E.K = + ## Returns last key for request ``req``. + req.start + req.count - 1'u64 -proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) = +proc makePending*[T, E](sq: SyncQueue[T, E], req: var SyncRequest[T, E]) = req.index = sq.counter sq.counter = sq.counter + 1'u64 sq.pending[req.index] = req -proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} = - ## Update last slot stored in queue ``sq`` with value ``last``. +proc updateLastKey*[T, E](sq: SyncQueue[T, E], last: E.K) {.inline.} = + ## Update last key stored in queue ``sq`` with value ``last``. case sq.kind of SyncQueueKind.Forward: - doAssert(sq.finalSlot <= last, - "Last slot could not be lower then stored one " & - $sq.finalSlot & " <= " & $last) - sq.finalSlot = last + doAssert(sq.finalKey <= last, + "Last key could not be lower then stored one " & + $sq.finalKey & " <= " & $last) + sq.finalKey = last of SyncQueueKind.Backward: - doAssert(sq.finalSlot >= last, - "Last slot could not be higher then stored one " & - $sq.finalSlot & " >= " & $last) - sq.finalSlot = last + doAssert(sq.finalKey >= last, + "Last key could not be higher then stored one " & + $sq.finalKey & " >= " & $last) + sq.finalKey = last -proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = +proc wakeupWaiters[T, E](sq: SyncQueue[T, E], reset = false) = ## Wakeup one or all blocked waiters. for item in sq.waiters: if reset: @@ -285,7 +346,7 @@ proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = if not(item.future.finished()): item.future.complete() -proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = +proc waitForChanges[T, E](sq: SyncQueue[T, E]): Future[bool] {.async.} = ## Create new waiter and wait for completion from `wakeupWaiters()`. var waitfut = newFuture[void]("SyncQueue.waitForChanges") let waititem = SyncWaiter(future: waitfut) @@ -296,18 +357,18 @@ proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = finally: sq.waiters.delete(sq.waiters.find(waititem)) -proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = - ## This procedure will perform wakeupWaiters(false) and blocks until last +proc wakeupAndWaitWaiters[T, E](sq: SyncQueue[T, E]) {.async.} = + ## This procedure will perform wakeupWaiters(true) and blocks until last ## waiter will be awakened. var waitChanges = sq.waitForChanges() sq.wakeupWaiters(true) discard await waitChanges -proc clearAndWakeup*[T](sq: SyncQueue[T]) = +proc clearAndWakeup*[T, E](sq: SyncQueue[T, E]) = sq.pending.clear() sq.wakeupWaiters(true) -proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = +proc resetWait*[T, E](sq: SyncQueue[T, E], toKey: Option[E.K]) {.async.} = ## Perform reset of all the blocked waiters in SyncQueue. ## ## We adding one more waiter to the waiters sequence and @@ -321,64 +382,64 @@ proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = # you can introduce race problem. sq.pending.clear() - # We calculating minimal slot number to which we will be able to reset, - # without missing any blocks. There 3 sources: + # We calculating minimal key value to which we will be able to reset, + # without missing any values. There 3 sources: # 1. Debts queue. - # 2. Processing queue (`inpSlot`, `outSlot`). - # 3. Requested slot `toSlot`. + # 2. Processing queue (`inpKey`, `outKey`). + # 3. Requested key `toKey`. # - # Queue's `outSlot` is the lowest slot we added to `block_pool`, but - # `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not - # added slot requests, so it can't be bigger then `outSlot` value. - let minSlot = + # Queue's `outKey` is the lowest key we added to value `processor`, but + # `toKey` key can be less then `outKey`. `debtsQueue` holds only not + # added key requests, so it can't be bigger then `outKey` value. + let minKey = case sq.kind of SyncQueueKind.Forward: - if toSlot.isSome(): - min(toSlot.get(), sq.outSlot) + if toKey.isSome(): + min(toKey.get(), sq.outKey) else: - sq.outSlot + sq.outKey of SyncQueueKind.Backward: - if toSlot.isSome(): - toSlot.get() + if toKey.isSome(): + toKey.get() else: - sq.outSlot + sq.outKey sq.debtsQueue.clear() sq.debtsCount = 0 sq.readyQueue.clear() - sq.inpSlot = minSlot - sq.outSlot = minSlot + sq.inpKey = minKey + sq.outKey = minKey # We are going to wakeup all the waiters and wait for last one. await sq.wakeupAndWaitWaiters() -proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} = - ## Returns ``true`` if response chain of blocks is empty (has only empty - ## slots). +proc isEmpty*[T, E](sr: SyncResult[T, E]): bool {.inline.} = + ## Returns ``true`` if response chain of values is empty (has only empty + ## keys). len(sr.data) == 0 -proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} = - ## Returns ``true`` if response chain of blocks has gap at the end. - let lastslot = sr.request.slot + sr.request.count - 1'u64 +proc hasEndGap*[T, E](sr: SyncResult[T, E]): bool {.inline.} = + ## Returns ``true`` if response chain of values has gap at the end. + let lastKey = sr.request.start + sr.request.count - 1'u64 if len(sr.data) == 0: return true - if sr.data[^1][].slot != lastslot: + if sr.data[^1].key(E) != lastKey: return true return false -proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} = - ## Returns last non-empty slot from result ``sr``. If response has only - ## empty slots, original request slot will be returned. +proc getLastNonEmptyKey*[T, E](sr: SyncResult[T, E]): E.K {.inline.} = + ## Returns last non-empty key from result ``sr``. If response has only + ## empty keys, original request key will be returned. if len(sr.data) == 0: - # If response has only empty slots we going to use original request slot - sr.request.slot + # If response has only empty keys we going to use original request key + sr.request.start else: - sr.data[^1][].slot + sr.data[^1].key(E) -proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = +proc toDebtsQueue[T, E](sq: SyncQueue[T, E], sr: SyncRequest[T, E]) = sq.debtsQueue.push(sr) sq.debtsCount = sq.debtsCount + sr.count -proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, - safeSlot: Slot): Slot = +proc getRewindPoint*[T](sq: BeaconBlocksSyncQueue[T], + failSlot, safeSlot: Slot): Slot = case sq.kind of SyncQueueKind.Forward: # Calculate the latest finalized epoch. @@ -391,12 +452,12 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, let epochCount = if sq.rewind.isSome(): let rewind = sq.rewind.get() - if failSlot == rewind.failSlot: + if failSlot == rewind.failKey: # `MissingParent` happened at same slot so we increase rewind point by # factor of 2. if failEpoch > finalizedEpoch: - let rewindPoint = rewind.epochCount shl 1 - if rewindPoint < rewind.epochCount: + let rewindPoint = rewind.count shl 1 + if rewindPoint < rewind.count: # If exponential rewind point produces `uint64` overflow we will # make rewind to latest finalized epoch. failEpoch - finalizedEpoch @@ -412,7 +473,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, warn "Trying to rewind over the last finalized epoch", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - rewind_epoch_count = rewind.epochCount, + rewind_epoch_count = rewind.count, finalized_epoch = finalizedEpoch, direction = sq.kind, topics = "syncman" 0'u64 @@ -423,7 +484,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, warn "Сould not rewind further than the last finalized epoch", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - rewind_epoch_count = rewind.epochCount, + rewind_epoch_count = rewind.count, finalized_epoch = finalizedEpoch, direction = sq.kind, topics = "syncman" 0'u64 @@ -453,14 +514,14 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, if sq.rewind.isNone(): finalizedEpoch else: - epoch(sq.rewind.get().failSlot) - sq.rewind.get().epochCount + epoch(sq.rewind.get().failKey) - sq.rewind.get().count rewindEpoch.start_slot() else: # Calculate the rewind epoch, which should not be less than the latest # finalized epoch. let rewindEpoch = failEpoch - epochCount # Update and save new rewind point in SyncQueue. - sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount)) + sq.rewind = some(RewindPoint.init(failSlot, epochCount)) rewindEpoch.start_slot() of SyncQueueKind.Backward: # While we perform backward sync, the only possible slot we could rewind is @@ -471,8 +532,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, topics = "syncman" safeSlot -iterator blocks*[T](sq: SyncQueue[T], - sr: SyncResult[T]): ref ForkedSignedBeaconBlock = +iterator values*[T, E](sq: SyncQueue[T, E], sr: SyncResult[T, E]): E.R = case sq.kind of SyncQueueKind.Forward: for i in countup(0, len(sr.data) - 1): @@ -481,30 +541,29 @@ iterator blocks*[T](sq: SyncQueue[T], for i in countdown(len(sr.data) - 1, 0): yield sr.data[i] -proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) = +proc advanceOutput*[T, E](sq: SyncQueue[T, E], number: uint64) = case sq.kind of SyncQueueKind.Forward: - sq.outSlot = sq.outSlot + number + sq.outKey = sq.outKey + number of SyncQueueKind.Backward: - sq.outSlot = sq.outSlot - number + sq.outKey = sq.outKey - number -proc advanceInput[T](sq: SyncQueue[T], number: uint64) = +proc advanceInput[T, E](sq: SyncQueue[T, E], number: uint64) = case sq.kind of SyncQueueKind.Forward: - sq.inpSlot = sq.inpSlot + number + sq.inpKey = sq.inpKey + number of SyncQueueKind.Backward: - sq.inpSlot = sq.inpSlot - number + sq.inpKey = sq.inpKey - number -proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool = +proc notInRange[T, E](sq: SyncQueue[T, E], sr: SyncRequest[T, E]): bool = case sq.kind of SyncQueueKind.Forward: - (sq.queueSize > 0) and (sr.slot != sq.outSlot) + (sq.queueSize > 0) and (sr.start != sq.outKey) of SyncQueueKind.Backward: - (sq.queueSize > 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot) + (sq.queueSize > 0) and (sr.start + sr.count - 1'u64 != sq.outKey) -proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], - data: seq[ref ForkedSignedBeaconBlock], - processingCb: ProcessingCallback = nil) {.async.} = +proc push*[T, E](sq: SyncQueue[T, E], sr: SyncRequest[T, E], data: seq[E.R], + processingCb: ProcessingCallback = nil) {.async.} = ## Push successful result to queue ``sq``. mixin updateScore @@ -518,7 +577,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], sq.pending.del(sr.index) # This is backpressure handling algorithm, this algorithm is blocking - # all pending `push` requests if `request.slot` not in range. + # all pending `push` requests if `request.start` not in range. while true: if sq.notInRange(sr): let reset = await sq.waitForChanges() @@ -526,7 +585,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], # SyncQueue reset happens. We are exiting to wake up sync-worker. return else: - let syncres = SyncResult[T](request: sr, data: data) + let syncres = SyncResult[T, E](request: sr, data: data) sq.readyQueue.push(syncres) break @@ -534,16 +593,16 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], let reqres = case sq.kind of SyncQueueKind.Forward: - let minSlot = sq.readyQueue[0].request.slot - if sq.outSlot != minSlot: - none[SyncResult[T]]() + let minKey = sq.readyQueue[0].request.start + if sq.outKey != minKey: + none[SyncResult[T, E]]() else: some(sq.readyQueue.pop()) of SyncQueueKind.Backward: - let maxSlot = sq.readyQueue[0].request.slot + - (sq.readyQueue[0].request.count - 1'u64) - if sq.outSlot != maxSlot: - none[SyncResult[T]]() + let maxKey = sq.readyQueue[0].request.start + + (sq.readyQueue[0].request.count - 1'u64) + if sq.outKey != maxKey: + none[SyncResult[T, E]]() else: some(sq.readyQueue.pop()) @@ -551,177 +610,205 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], if reqres.isSome(): reqres.get() else: - let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot()) - warn "Got incorrect sync result in queue, rewind happens", - request_slot = sq.readyQueue[0].request.slot, - request_count = sq.readyQueue[0].request.count, - request_step = sq.readyQueue[0].request.step, - blocks_map = getShortMap(sq.readyQueue[0].request, - sq.readyQueue[0].data), - blocks_count = len(sq.readyQueue[0].data), - output_slot = sq.outSlot, input_slot = sq.inpSlot, - peer = sq.readyQueue[0].request.item, rewind_to_slot = rewindSlot, - direction = sq.readyQueue[0].request.kind, topics = "syncman" - await sq.resetWait(some(rewindSlot)) + let rewindKey = sq.getRewindPoint(sq.outKey, sq.getSafeKey()) + when E is BeaconBlocksSyncEndpoint: + warn "Got incorrect sync result in queue, rewind happens", + request_slot = sq.readyQueue[0].request.start, + request_count = sq.readyQueue[0].request.count, + request_step = sq.readyQueue[0].request.step, + blocks_map = getShortMap[T, E](sq.readyQueue[0].request, + sq.readyQueue[0].data), + blocks_count = len(sq.readyQueue[0].data), + output_slot = sq.outKey, input_slot = sq.inpKey, + peer = sq.readyQueue[0].request.item, + rewind_to_slot = rewindKey, + direction = sq.readyQueue[0].request.kind, topics = "syncman" + else: static: raiseAssert false + await sq.resetWait(some(rewindKey)) break if processingCb != nil: processingCb() - # Validating received blocks one by one + # Validating received values one by one + type UnviableTuple = (Eth2Digest, E.K) var - hasOkBlock = false - hasInvalidBlock = false - unviableBlock: Option[(Eth2Digest, Slot)] - missingParentSlot: Option[Slot] + hasOkValue = false + hasInvalidValue = false + unviableValue: Option[UnviableTuple] + missingParentKey: Option[E.K] # compiler segfault if this is moved into the for loop, at time of writing res: Result[void, BlockError] - for blk in sq.blocks(item): - res = await sq.blockVerifier(blk[]) + for value in sq.values(item): + res = await sq.valueVerifier(value[]) if res.isOk(): - hasOkBlock = true + hasOkValue = true else: case res.error() of BlockError.MissingParent: - missingParentSlot = some(blk[].slot) + missingParentKey = some(value.key(E)) break of BlockError.Duplicate: # Keep going, happens naturally discard of BlockError.UnviableFork: - # Keep going so as to register other unviable blocks with the - # quarantine - if unviableBlock.isNone: - # Remember the first unviable block, so we can log it - unviableBlock = some((blk[].root, blk[].slot)) - + if unviableValue.isNone: + # Remember the first unviable value, so we can log it + unviableValue = some((value[].root, value.key(E))) + when E.V is ForkedSignedBeaconBlock: + # Keep going so as to register other unviable blocks with the + # quarantine + discard + else: static: raiseAssert false of BlockError.Invalid: - hasInvalidBlock = true + hasInvalidValue = true let req = item.request - warn "Received invalid sequence of blocks", peer = req.item, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - direction = req.kind, topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + warn "Received invalid sequence of blocks", peer = req.item, + request_slot = req.start, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap[T, E](req, item.data), + direction = req.kind, topics = "syncman" + else: static: raiseAssert false req.item.updateScore(PeerScoreBadBlocks) break - # When errors happen while processing blocks, we retry the same request + # When errors happen while processing values, we retry the same request # with, hopefully, a different peer let retryRequest = - hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome() + hasInvalidValue or unviableValue.isSome() or missingParentKey.isSome() if not retryRequest: sq.advanceOutput(item.request.count) - if hasOkBlock: + if hasOkValue: # If there no error and response was not empty we should reward peer - # with some bonus score - not for duplicate blocks though. + # with some bonus score - not for duplicate values though. item.request.item.updateScore(PeerScoreGoodBlocks) sq.wakeupWaiters() else: - debug "Block pool rejected peer's response", peer = item.request.item, - request_slot = item.request.slot, - request_count = item.request.count, - request_step = item.request.step, - blocks_map = getShortMap(item.request, item.data), - blocks_count = len(item.data), - ok = hasOkBlock, - unviable = unviableBlock.isSome(), - missing_parent = missingParentSlot.isSome(), - direction = item.request.kind, topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + debug "Block pool rejected peer's response", peer = item.request.item, + request_slot = item.request.start, + request_count = item.request.count, + request_step = item.request.step, + blocks_map = getShortMap[T, E](item.request, item.data), + blocks_count = len(item.data), + ok = hasOkValue, + unviable = unviableValue.isSome(), + missing_parent = missingParentKey.isSome(), + direction = item.request.kind, topics = "syncman" + else: static: raiseAssert false # We need to move failed response to the debts queue. sq.toDebtsQueue(item.request) - if unviableBlock.isSome: + if unviableValue.isSome: let req = item.request - notice "Received blocks from an unviable fork", - blockRoot = unviableBlock.get()[0], - blockSlot = unviableBlock.get()[1], peer = req.item, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - direction = req.kind, topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + notice "Received blocks from an unviable fork", + blockRoot = unviableValue.get()[0], + blockSlot = unviableValue.get()[1], peer = req.item, + request_slot = req.start, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap[T, E](req, item.data), + direction = req.kind, topics = "syncman" + else: static: raiseAssert false req.item.updateScore(PeerScoreUnviableFork) - if missingParentSlot.isSome: + if missingParentKey.isSome: var - resetSlot: Option[Slot] - failSlot = missingParentSlot.get() + resetKey: Option[E.K] + failKey = missingParentKey.get() # If we got `BlockError.MissingParent` it means that peer returns chain - # of blocks with holes or `block_pool` is in incomplete state. We going - # to rewind to the first slot at latest finalized epoch. + # of values with holes or `block_processor` is in incomplete state. + # For blocks we will rewind to the first slot at latest finalized epoch. let req = item.request - safeSlot = sq.getSafeSlot() + safeKey = sq.getSafeKey() case sq.kind of SyncQueueKind.Forward: - if safeSlot < req.slot: - let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) - warn "Unexpected missing parent, rewind happens", - peer = req.item, rewind_to_slot = rewindSlot, - rewind_epoch_count = sq.rewind.get().epochCount, - rewind_fail_slot = failSlot, - finalized_slot = safeSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - direction = req.kind, topics = "syncman" - resetSlot = some(rewindSlot) + if safeKey < req.start: + let rewindKey = sq.getRewindPoint(failKey, safeKey) + when E is BeaconBlocksSyncEndpoint: + warn "Unexpected missing parent, rewind happens", + peer = req.item, rewind_to_slot = rewindKey, + rewind_epoch_count = sq.rewind.get().count, + rewind_fail_slot = failKey, + finalized_slot = safeKey, + request_slot = req.start, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap[T, E](req, item.data), + direction = req.kind, topics = "syncman" + else: static: raiseAssert false + resetKey = some(rewindKey) req.item.updateScore(PeerScoreMissingBlocks) else: - error "Unexpected missing parent at finalized epoch slot", - peer = req.item, to_slot = safeSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - direction = req.kind, topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + error "Unexpected missing parent at finalized epoch slot", + peer = req.item, to_slot = safeKey, + request_slot = req.start, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap[T, E](req, item.data), + direction = req.kind, topics = "syncman" + else: static: raiseAssert false req.item.updateScore(PeerScoreBadBlocks) of SyncQueueKind.Backward: - if safeSlot > req.slot: - let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) - # It's quite common peers give us fewer blocks than we ask for - info "Gap in block range response, rewinding", - peer = req.item, rewind_to_slot = rewindSlot, - rewind_fail_slot = failSlot, - finalized_slot = safeSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - direction = req.kind, topics = "syncman" - resetSlot = some(rewindSlot) + if safeKey > req.start: + let rewindKey = sq.getRewindPoint(failKey, safeKey) + when E is BeaconBlocksSyncEndpoint: + # It's quite common peers give us fewer values than we ask for + info "Gap in block range response, rewinding", + peer = req.item, rewind_to_slot = rewindKey, + rewind_fail_slot = failKey, + finalized_slot = safeKey, + request_slot = req.start, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap[T, E](req, item.data), + direction = req.kind, topics = "syncman" + else: static: raiseAssert false + resetKey = some(rewindKey) req.item.updateScore(PeerScoreMissingBlocks) else: - error "Unexpected missing parent at safe slot", - peer = req.item, to_slot = safeSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - direction = req.kind, topics = "syncman" + when E is BeaconBlocksSyncEndpoint: + error "Unexpected missing parent at safe slot", + peer = req.item, to_slot = safeKey, + request_slot = req.start, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap[T, E](req, item.data), + direction = req.kind, topics = "syncman" + else: static: raiseAssert false req.item.updateScore(PeerScoreBadBlocks) - if resetSlot.isSome(): - await sq.resetWait(resetSlot) + if resetKey.isSome(): + await sq.resetWait(resetKey) case sq.kind of SyncQueueKind.Forward: - debug "Rewind to slot was happened", reset_slot = reset_slot.get(), - queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - rewind_epoch_count = sq.rewind.get().epochCount, - rewind_fail_slot = sq.rewind.get().failSlot, - reset_slot = resetSlot, direction = sq.kind, topics = "syncman" + when E.K is Slot: + debug "Rewind to slot has happened", + reset_slot = resetKey.get(), + queue_input_slot = sq.inpKey, + queue_output_slot = sq.outKey, + rewind_epoch_count = sq.rewind.get().count, + rewind_fail_slot = sq.rewind.get().failKey, + direction = sq.kind, topics = "syncman" + else: static: raiseAssert false of SyncQueueKind.Backward: - debug "Rewind to slot was happened", reset_slot = reset_slot.get(), - queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - reset_slot = resetSlot, direction = sq.kind, topics = "syncman" + when E.K is Slot: + debug "Rewind to slot has happened", + reset_slot = resetKey.get(), + queue_input_slot = sq.inpKey, + queue_output_slot = sq.outKey, + direction = sq.kind, topics = "syncman" + else: static: raiseAssert false break -proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = +proc push*[T, E](sq: SyncQueue[T, E], sr: SyncRequest[T, E]) = ## Push failed request back to queue. if sr.index notin sq.pending: # If request `sr` not in our pending list, it only means that @@ -732,15 +819,16 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = sq.pending.del(sr.index) sq.toDebtsQueue(sr) -proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] = +proc pop*[T, E](sq: SyncQueue[T, E], + maxKey: E.K, item: T): SyncRequest[T, E] = ## Create new request according to current SyncQueue parameters. if len(sq.debtsQueue) > 0: - if maxSlot < sq.debtsQueue[0].slot: - # Peer's latest slot is less than starting request's slot. - return SyncRequest.empty(sq.kind, T) - if maxSlot < sq.debtsQueue[0].lastSlot(): - # Peer's latest slot is less than finishing request's slot. - return SyncRequest.empty(sq.kind, T) + if maxKey < sq.debtsQueue[0].start: + # Peer's latest key is less than starting request's key. + return SyncRequest.empty(sq.kind, T, E) + if maxKey < sq.debtsQueue[0].lastKey(): + # Peer's latest key is less than finishing request's key. + return SyncRequest.empty(sq.kind, T, E) var sr = sq.debtsQueue.pop() sq.debtsCount = sq.debtsCount - sr.count sr.setItem(item) @@ -749,71 +837,71 @@ proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] = else: case sq.kind of SyncQueueKind.Forward: - if maxSlot < sq.inpSlot: - # Peer's latest slot is less than queue's input slot. - return SyncRequest.empty(sq.kind, T) - if sq.inpSlot > sq.finalSlot: - # Queue's input slot is bigger than queue's final slot. - return SyncRequest.empty(sq.kind, T) - let lastSlot = min(maxslot, sq.finalSlot) - let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot) - var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item) + if maxKey < sq.inpKey: + # Peer's latest key is less than queue's input key. + return SyncRequest.empty(sq.kind, T, E) + if sq.inpKey > sq.finalKey: + # Queue's input key is bigger than queue's final key. + return SyncRequest.empty(sq.kind, T, E) + let lastKey = min(maxKey, sq.finalKey) + let count = min(sq.chunkSize, lastKey + 1'u64 - sq.inpKey) + var sr = SyncRequest.init(sq.kind, sq.inpKey, count, item, E) sq.advanceInput(count) sq.makePending(sr) sr of SyncQueueKind.Backward: - if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64: - return SyncRequest.empty(sq.kind, T) - if sq.inpSlot < sq.finalSlot: - return SyncRequest.empty(sq.kind, T) - let (slot, count) = + if sq.inpKey == 0xFFFF_FFFF_FFFF_FFFF'u64: + return SyncRequest.empty(sq.kind, T, E) + if sq.inpKey < sq.finalKey: + return SyncRequest.empty(sq.kind, T, E) + let (key, count) = block: - let baseSlot = sq.inpSlot + 1'u64 - if baseSlot - sq.finalSlot < sq.chunkSize: - let count = uint64(baseSlot - sq.finalSlot) - (baseSlot - count, count) + let baseKey = sq.inpKey + 1'u64 + if baseKey - sq.finalKey < sq.chunkSize: + let count = uint64(baseKey - sq.finalKey) + (baseKey - count, count) else: - (baseSlot - sq.chunkSize, sq.chunkSize) - if (maxSlot + 1'u64) < slot + count: - # Peer's latest slot is less than queue's input slot. - return SyncRequest.empty(sq.kind, T) - var sr = SyncRequest.init(sq.kind, slot, count, item) + (baseKey - sq.chunkSize, sq.chunkSize) + if (maxKey + 1'u64) < key + count: + # Peer's latest key is less than queue's input key. + return SyncRequest.empty(sq.kind, T, E) + var sr = SyncRequest.init(sq.kind, key, count, item, E) sq.advanceInput(count) sq.makePending(sr) sr -proc debtLen*[T](sq: SyncQueue[T]): uint64 = +proc debtLen*[T, E](sq: SyncQueue[T, E]): uint64 = sq.debtsCount -proc pendingLen*[T](sq: SyncQueue[T]): uint64 = +proc pendingLen*[T, E](sq: SyncQueue[T, E]): uint64 = case sq.kind of SyncQueueKind.Forward: - # When moving forward `outSlot` will be <= of `inpSlot`. - sq.inpSlot - sq.outSlot + # When moving forward `outKey` will be <= of `inpKey`. + sq.inpKey - sq.outKey of SyncQueueKind.Backward: - # When moving backward `outSlot` will be >= of `inpSlot` - sq.outSlot - sq.inpSlot + # When moving backward `outKey` will be >= of `inpKey` + sq.outKey - sq.inpKey -proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} = - ## Returns number of slots left in queue ``sq``. +proc len*[T, E](sq: SyncQueue[T, E]): uint64 {.inline.} = + ## Returns number of keys left in queue ``sq``. case sq.kind of SyncQueueKind.Forward: - sq.finalSlot + 1'u64 - sq.outSlot + sq.finalKey + 1'u64 - sq.outKey of SyncQueueKind.Backward: - sq.outSlot + 1'u64 - sq.finalSlot + sq.outKey + 1'u64 - sq.finalKey -proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} = - ## Returns total number of slots in queue ``sq``. +proc total*[T, E](sq: SyncQueue[T, E]): uint64 {.inline.} = + ## Returns total number of keys in queue ``sq``. case sq.kind of SyncQueueKind.Forward: - sq.finalSlot + 1'u64 - sq.startSlot + sq.finalKey + 1'u64 - sq.startKey of SyncQueueKind.Backward: - sq.startSlot + 1'u64 - sq.finalSlot + sq.startKey + 1'u64 - sq.finalKey -proc progress*[T](sq: SyncQueue[T]): uint64 = - ## How many slots we've synced so far +proc progress*[T, E](sq: SyncQueue[T, E]): uint64 = + ## How many keys we've synced so far case sq.kind of SyncQueueKind.Forward: - sq.outSlot - sq.startSlot + sq.outKey - sq.startKey of SyncQueueKind.Backward: - sq.startSlot - sq.outSlot + sq.startKey - sq.outKey diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 6ea3fae4b3..f2d2a21527 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -1,3 +1,10 @@ +# beacon_chain +# Copyright (c) 2020-2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + {.used.} import std/strutils @@ -52,11 +59,12 @@ suite "SyncManager test suite": curslot = curslot + 1'u64 res - proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot, - request: SyncRequest[SomeTPeer]): seq[ref ForkedSignedBeaconBlock] = + proc getSlice(chain: openarray[ref ForkedSignedBeaconBlock], startSlot: Slot, + request: BeaconBlocksSyncRequest[SomeTPeer] + ): seq[ref ForkedSignedBeaconBlock] = let - startIndex = int(request.slot - startSlot) - finishIndex = int(request.slot - startSlot) + int(request.count) - 1 + startIndex = int(request.start - startSlot) + finishIndex = int(request.start - startSlot) + int(request.count) - 1 var res = newSeq[ref ForkedSignedBeaconBlock](1 + finishIndex - startIndex) for i in 0..