Skip to content

ETCM-1023: Block fetching and -building flow #1108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ object RegularSyncItSpecUtils {
ommersPool,
pendingTransactionsManager,
system.scheduler,
this
this,
newFlow = true
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class SyncController(
ommersPool,
pendingTransactionsManager,
scheduler,
configBuilder
configBuilder,
newFlow = false
),
"regular-sync"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class BlockFetcher(
val supervisor: ClassicActorRef,
val syncConfig: SyncConfig,
val blockValidator: BlockValidator,
context: ActorContext[BlockFetcher.FetchCommand]
context: ActorContext[BlockFetcher.FetchCommand],
newFlow: Boolean
) extends AbstractBehavior[BlockFetcher.FetchCommand](context) {

import BlockFetcher._
Expand Down Expand Up @@ -286,7 +287,9 @@ class BlockFetcher(
)(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState =
pickResult
.tap { case (blocks, _) =>
replyTo ! PickedBlocks(blocks)
if (!newFlow) {
replyTo ! PickedBlocks(blocks)
}
}
.fold(state)(_._2)

Expand Down Expand Up @@ -330,10 +333,19 @@ object BlockFetcher {
peerEventBus: ClassicActorRef,
supervisor: ClassicActorRef,
syncConfig: SyncConfig,
blockValidator: BlockValidator
blockValidator: BlockValidator,
newFlow: Boolean = false
): Behavior[FetchCommand] =
Behaviors.setup(context =>
new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator, context)
new BlockFetcher(
peersClient,
peerEventBus,
supervisor,
syncConfig,
blockValidator,
context,
newFlow
)
)

sealed trait FetchCommand
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.util.ByteString

import cats.data.NonEmptyList

import scala.annotation.tailrec
import scala.collection.immutable.Queue

import io.iohk.ethereum.blockchain.sync.regular.BranchBuffer._
import io.iohk.ethereum.domain.Block
import io.iohk.ethereum.domain.BlockchainReader
import io.iohk.ethereum.domain.branch.BestBranch
import io.iohk.ethereum.domain.branch.Branch

/** Naive & temporary branch buffer implementation with some serious limitations:
* - it is not able to store multiple competing branches at the same time.
* - it will only find branches from the best branch tip.
*
* @param byParent in-memory buffer of blocks retrievable by parentHash. Only one block per parent is kept, last received wins.
* @param branchFound branch found from given best branch, as a list of blocks. They are removed from the buffer.
*/
case class BranchBuffer(byParent: Map[Hash, Block] = Map.empty, branchFound: Queue[Block] = Queue.empty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a branch buffer if we immediately store everything?

Copy link
Contributor Author

@jvdp jvdp Aug 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't immediately store everything; only rooted blocks / branches, with the reasoning that these blocks have passed the proof of work threshold and so should be relatively safe to store without DoS concerns.

(With the caveat that we should also not store super old blocks; we have been discussing a "settled block" concept that we can also use for this.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. how much work do you think it takes to turn this into a structure that supports parallel branches? and can the current one be used in any way as is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current one is able to sync Sagano fully, but then might get stuck on a dead branch.

I think I'm close to one that will work for parallel branches for my current ticket (1059.)

def handle(trunk: Branch, block: Block): BranchBuffer =
copy(byParent = byParent + (block.parentHash -> block), branchFound = Queue.empty)
.branchFrom(trunk match {
case BestBranch(tipBlockHash, _) => tipBlockHash
case _ => ByteString.empty
})

@tailrec
private def branchFrom(hash: Hash): BranchBuffer =
byParent.get(hash) match {
case None => this
case Some(block) => copy(byParent.removed(hash), branchFound = branchFound :+ block).branchFrom(block.hash)
}
}

object BranchBuffer {
type Hash = ByteString

def flow(blockchainReader: BlockchainReader): Flow[Block, NonEmptyList[Block], NotUsed] =
Flow[Block]
.scan(BranchBuffer()) { case (buffer, block) => buffer.handle(blockchainReader.getBestBranch(), block) }
.collect { case BranchBuffer(_, head +: tail) => NonEmptyList(head, tail.toList) }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.NotUsed
import akka.stream.QueueOfferResult.Enqueued
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.SourceQueue
import akka.util.ByteString

import cats.data.EitherT
Expand All @@ -11,17 +17,27 @@ import scala.annotation.tailrec

import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.RequestFailed
import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.consensus.validators.std.StdBlockValidator
import io.iohk.ethereum.domain.Block
import io.iohk.ethereum.domain.BlockBody
import io.iohk.ethereum.domain.BlockHeader
import io.iohk.ethereum.domain.BlockchainReader
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.ETH62.BlockBodies
import io.iohk.ethereum.network.p2p.messages.ETH62.BlockHeaders
import io.iohk.ethereum.utils.Config.SyncConfig

//not used atm, a part of the future ExecutionSync
class FetcherService(validator: BlockValidator, blockchainReader: BlockchainReader, syncConfig: SyncConfig) {
class FetcherService(
blockchainReader: BlockchainReader,
syncConfig: SyncConfig,
sourceQueue: SourceQueue[Block]
) {

import FetcherService._

val batchSize = syncConfig.blockHeadersPerRequest

private def requestHeaders(
Expand All @@ -33,7 +49,11 @@ class FetcherService(validator: BlockValidator, blockchainReader: BlockchainRead

//TODO: add private def requestStateNode(hash: ByteString): Task[Either[RequestFailed, Seq[ByteString]]] = ???

private def placeBlockInPeerStream(block: Block, peer: Peer): Peer = ???
def placeBlockInPeerStream(block: Block): Task[Either[String, Unit]] =
Task.deferFuture(sourceQueue.offer(block)).map {
case Enqueued => Right(())
case reason => Left(s"SourceQueue.offer failed: $reason")
}

def fetchBlocksUntil(
peer: Peer,
Expand Down Expand Up @@ -66,27 +86,55 @@ class FetcherService(validator: BlockValidator, blockchainReader: BlockchainRead
for {
headers <- EitherT(requestHeaders(block, amount))
bodies <- EitherT(requestBodies(headers.headers.map(_.hash)))
blocks <- EitherT.fromOption[Task](
bodiesAreOrderedSubsetOfRequested(headers.headers, bodies.bodies),
RequestFailed(peer, "Unmatching bodies")
)
_ = blocks.foreach(placeBlockInPeerStream(_, peer))
blocks = buildBlocks(headers.headers, bodies.bodies)
_ <- EitherT.cond[Task](blocks.length == headers.headers.length, (), RequestFailed(peer, "Unmatching bodies"))
_ <- blocks.traverse(block => EitherT(placeBlockInPeerStream(block)).leftMap(RequestFailed(peer, _)))
} yield peer
}

// Checks that the received block bodies are an ordered subset of the ones requested
@tailrec
private def bodiesAreOrderedSubsetOfRequested(
requestedHeaders: Seq[BlockHeader],
respondedBodies: Seq[BlockBody],
matchedBlocks: Seq[Block] = Nil
): Option[Seq[Block]] =
(requestedHeaders, respondedBodies) match {
case (Seq(), _ +: _) => None
case (_, Seq()) => Some(matchedBlocks)
case (header +: remainingHeaders, body +: remainingBodies) =>
if (validator.validateHeaderAndBody(header, body).isRight)
bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body))
else
bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks)
}
object FetcherService {
case class BlockIdentifier(transactionsRoot: ByteString, ommersHash: ByteString)
object BlockIdentifier {
def apply(blockHeader: BlockHeader): BlockIdentifier =
BlockIdentifier(blockHeader.transactionsRoot, blockHeader.ommersHash)

def apply(blockBody: BlockBody): BlockIdentifier =
BlockIdentifier(
ByteString(StdBlockValidator.transactionsRootHash(blockBody).toIterable),
ByteString(StdBlockValidator.blockBodyOmmersHash(blockBody).toIterable)
)
}

def buildBlocks(headers: Seq[BlockHeader], bodies: Seq[BlockBody]): Seq[Block] = {
val bodyById = bodies.view.map(body => BlockIdentifier(body) -> body).toMap
for {
header <- headers
body <- bodyById.get(BlockIdentifier(header))
} yield Block(header, body)
}

/** State of block fetching stream after processing a given incoming message with block headers or bodies
*
* @param outstanding headers that are yet to be matched to bodies
* @param result blocks produced by matching received headers with received bodies
*/
case class FetchState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in the end we still can't get rid of the state? this was one of the goals we defined. We still can do it by using sync processing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this isn't a state in the imperative 'shared mutable state' sense; it's kept by the scan operator. This is a more functional approach.

outstanding: Set[BlockHeader],
result: Seq[Block]
)
object FetchState {
val initial: FetchState = FetchState(Set.empty, Nil)
}

// TODO: remove once we have the FetcherService instance integrated
val tempFlow: Flow[MessageFromPeer, Seq[Block], NotUsed] =
Flow[MessageFromPeer]
.scan(FetchState.initial) {
case (FetchState(outstanding, _), MessageFromPeer(BlockHeaders(headers), peerId)) =>
FetchState(outstanding.concat(headers), Nil)
case (FetchState(outstanding, _), MessageFromPeer(BlockBodies(bodies), _)) =>
val blocks = buildBlocks(outstanding.toSeq, bodies)
FetchState(outstanding.removedAll(blocks.map(_.header)), blocks)
}
.collect { case FetchState(_, blocks) if blocks.nonEmpty => blocks }
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,25 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.AllForOneStrategy
import akka.actor.Cancellable
import akka.actor.Props
import akka.actor.Scheduler
import akka.actor.SupervisorStrategy
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorRef => TypedActorRef}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString

import cats.data.NonEmptyList

import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.ExecutionContext

import io.iohk.ethereum.blockchain.sync.Blacklist
import io.iohk.ethereum.blockchain.sync.SyncProtocol
Expand All @@ -24,7 +36,14 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.db.storage.StateStorage
import io.iohk.ethereum.domain.Block
import io.iohk.ethereum.domain.BlockchainReader
import io.iohk.ethereum.domain.branch.BestBranch
import io.iohk.ethereum.domain.branch.Branch
import io.iohk.ethereum.ledger.BranchResolution
import io.iohk.ethereum.network.EtcPeerManagerActor
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.PeerEventBusActor
import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.p2p.messages.ETH62
import io.iohk.ethereum.nodebuilder.BlockchainConfigBuilder
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
Expand All @@ -43,13 +62,14 @@ class RegularSync(
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
scheduler: Scheduler,
configBuilder: BlockchainConfigBuilder
configBuilder: BlockchainConfigBuilder,
newFlow: Boolean
) extends Actor
with ActorLogging {

val fetcher: TypedActorRef[BlockFetcher.FetchCommand] =
context.spawn(
BlockFetcher(peersClient, peerEventBus, self, syncConfig, blockValidator),
BlockFetcher(peersClient, peerEventBus, self, syncConfig, blockValidator, newFlow),
"block-fetcher"
)

Expand Down Expand Up @@ -78,22 +98,64 @@ class RegularSync(
"block-importer"
)

implicit val system: ActorSystem = context.system
implicit val ec: ExecutionContext = context.dispatcher

val printFetcherSchedule: Cancellable =
scheduler.scheduleWithFixedDelay(
syncConfig.printStatusInterval,
syncConfig.printStatusInterval,
fetcher.toClassic,
BlockFetcher.PrintStatus
)(context.dispatcher)
)

val (blockSourceQueue, blockSource) = Source.queue[Block](256, OverflowStrategy.fail).preMaterialize()
val fetcherService = new FetcherService(blockchainReader, syncConfig, blockSourceQueue)

override def receive: Receive = running(
ProgressState(startedFetching = false, initialBlock = 0, currentBlock = 0, bestKnownNetworkBlock = 0)
)

private def startTemporaryBlockProducer() = {
import monix.execution.Scheduler.Implicits.global
PeerEventBusActor
.messageSource(
peerEventBus,
PeerEventBusActor.SubscriptionClassifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you're replacing the PeerRequestHandler here then PeerDisconnectedClassifier is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not replacing PeerRequestHandler yet. What's missing is a piece of Flow that manages the sending of headers requests and handling those disconnection and best block messages (and more?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's gonna replace it in future? just pointing out what should be there if we proceed with this

Copy link
Contributor Author

@jvdp jvdp Aug 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sorta misspoke. It's largely replacing PeerRequestHandler but by replacing BlockFetcher. FastSync (until rewritten) still uses PeerRequestHandler so it's not replacing it in that sense.

.MessageClassifier(
Set(Codes.BlockBodiesCode, Codes.BlockHeadersCode),
PeerEventBusActor.PeerSelector.AllPeers
)
)
.via(FetcherService.tempFlow)
.buffer(256, OverflowStrategy.fail)
.mapConcat(identity)
.runWith(Sink.foreachAsync(1) { block =>
fetcherService
.placeBlockInPeerStream(block)
.runToFuture
.collect { case Right(()) => () }

})
}

private def startNewFlow() =
blockSource
.via(BranchBuffer.flow(blockchainReader))
.runWith(Sink.foreach { blocks =>
importer ! BlockFetcher.PickedBlocks(blocks)
})
.onComplete(res => log.error(res.toString))

def running(progressState: ProgressState): Receive = {
case SyncProtocol.Start =>
log.info("Starting regular sync")
importer ! BlockImporter.Start
if (newFlow) {
startNewFlow()
startTemporaryBlockProducer()
}

case SyncProtocol.MinedBlock(block) =>
log.info(s"Block mined [number = {}, hash = {}]", block.number, block.header.hashAsHexString)
importer ! BlockImporter.MinedBlock(block)
Expand Down Expand Up @@ -147,7 +209,8 @@ object RegularSync {
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
scheduler: Scheduler,
configBuilder: BlockchainConfigBuilder
configBuilder: BlockchainConfigBuilder,
newFlow: Boolean
): Props =
Props(
new RegularSync(
Expand All @@ -164,7 +227,8 @@ object RegularSync {
ommersPool,
pendingTransactionsManager,
scheduler,
configBuilder
configBuilder,
newFlow
)
)

Expand Down
Loading