-
Notifications
You must be signed in to change notification settings - Fork 75
ETCM-1023: Block fetching and -building flow #1108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
6c48f70
b75140e
20cef79
70f7865
3512e08
b9e1e34
7f48cef
8405533
26ffde0
3f5bc3f
02e973f
4dee3df
4890acd
b6673ef
d0f1195
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package io.iohk.ethereum.blockchain.sync.regular | ||
|
||
import akka.NotUsed | ||
import akka.stream.scaladsl.Flow | ||
import akka.util.ByteString | ||
|
||
import cats.data.NonEmptyList | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.immutable.Queue | ||
|
||
import io.iohk.ethereum.blockchain.sync.regular.BranchBuffer._ | ||
import io.iohk.ethereum.domain.Block | ||
import io.iohk.ethereum.domain.BlockchainReader | ||
import io.iohk.ethereum.domain.branch.BestBranch | ||
import io.iohk.ethereum.domain.branch.Branch | ||
|
||
/** Naive & temporary branch buffer implementation with some serious limitations: | ||
* - it is not able to store multiple competing branches at the same time. | ||
* - it will only find branches from the best branch tip. | ||
* | ||
* @param byParent in-memory buffer of blocks retrievable by parentHash. Only one block per parent is kept, last received wins. | ||
* @param branchFound branch found from given best branch, as a list of blocks. They are removed from the buffer. | ||
*/ | ||
case class BranchBuffer(byParent: Map[Hash, Block] = Map.empty, branchFound: Queue[Block] = Queue.empty) { | ||
def handle(trunk: Branch, block: Block): BranchBuffer = | ||
copy(byParent = byParent + (block.parentHash -> block), branchFound = Queue.empty) | ||
.branchFrom(trunk match { | ||
case BestBranch(tipBlockHash, _) => tipBlockHash | ||
case _ => ByteString.empty | ||
}) | ||
|
||
@tailrec | ||
private def branchFrom(hash: Hash): BranchBuffer = | ||
byParent.get(hash) match { | ||
case None => this | ||
jvdp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case Some(block) => copy(byParent.removed(hash), branchFound = branchFound :+ block).branchFrom(block.hash) | ||
} | ||
} | ||
|
||
object BranchBuffer { | ||
type Hash = ByteString | ||
|
||
def flow(blockchainReader: BlockchainReader): Flow[Block, NonEmptyList[Block], NotUsed] = | ||
Flow[Block] | ||
.scan(BranchBuffer()) { case (buffer, block) => buffer.handle(blockchainReader.getBestBranch(), block) } | ||
.collect { case BranchBuffer(_, head +: tail) => NonEmptyList(head, tail.toList) } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,11 @@ | ||
package io.iohk.ethereum.blockchain.sync.regular | ||
|
||
import akka.NotUsed | ||
import akka.stream.QueueOfferResult.Enqueued | ||
import akka.stream.scaladsl.Flow | ||
import akka.stream.scaladsl.Keep | ||
import akka.stream.scaladsl.Sink | ||
import akka.stream.scaladsl.SourceQueue | ||
import akka.util.ByteString | ||
|
||
import cats.data.EitherT | ||
|
@@ -11,17 +17,27 @@ import scala.annotation.tailrec | |
|
||
import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.RequestFailed | ||
import io.iohk.ethereum.consensus.validators.BlockValidator | ||
import io.iohk.ethereum.consensus.validators.std.StdBlockValidator | ||
import io.iohk.ethereum.domain.Block | ||
import io.iohk.ethereum.domain.BlockBody | ||
import io.iohk.ethereum.domain.BlockHeader | ||
import io.iohk.ethereum.domain.BlockchainReader | ||
import io.iohk.ethereum.network.Peer | ||
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer | ||
import io.iohk.ethereum.network.PeerId | ||
import io.iohk.ethereum.network.p2p.messages.ETH62.BlockBodies | ||
import io.iohk.ethereum.network.p2p.messages.ETH62.BlockHeaders | ||
import io.iohk.ethereum.utils.Config.SyncConfig | ||
|
||
//not used atm, a part of the future ExecutionSync | ||
class FetcherService(validator: BlockValidator, blockchainReader: BlockchainReader, syncConfig: SyncConfig) { | ||
class FetcherService( | ||
blockchainReader: BlockchainReader, | ||
syncConfig: SyncConfig, | ||
sourceQueue: SourceQueue[Block] | ||
) { | ||
|
||
import FetcherService._ | ||
|
||
val batchSize = syncConfig.blockHeadersPerRequest | ||
|
||
private def requestHeaders( | ||
|
@@ -33,7 +49,11 @@ class FetcherService(validator: BlockValidator, blockchainReader: BlockchainRead | |
|
||
//TODO: add private def requestStateNode(hash: ByteString): Task[Either[RequestFailed, Seq[ByteString]]] = ??? | ||
|
||
private def placeBlockInPeerStream(block: Block, peer: Peer): Peer = ??? | ||
def placeBlockInPeerStream(block: Block): Task[Either[String, Unit]] = | ||
Task.deferFuture(sourceQueue.offer(block)).map { | ||
case Enqueued => Right(()) | ||
case reason => Left(s"SourceQueue.offer failed: $reason") | ||
} | ||
|
||
def fetchBlocksUntil( | ||
peer: Peer, | ||
|
@@ -66,27 +86,55 @@ class FetcherService(validator: BlockValidator, blockchainReader: BlockchainRead | |
for { | ||
headers <- EitherT(requestHeaders(block, amount)) | ||
bodies <- EitherT(requestBodies(headers.headers.map(_.hash))) | ||
blocks <- EitherT.fromOption[Task]( | ||
bodiesAreOrderedSubsetOfRequested(headers.headers, bodies.bodies), | ||
RequestFailed(peer, "Unmatching bodies") | ||
) | ||
_ = blocks.foreach(placeBlockInPeerStream(_, peer)) | ||
blocks = buildBlocks(headers.headers, bodies.bodies) | ||
_ <- EitherT.cond[Task](blocks.length == headers.headers.length, (), RequestFailed(peer, "Unmatching bodies")) | ||
_ <- blocks.traverse(block => EitherT(placeBlockInPeerStream(block)).leftMap(RequestFailed(peer, _))) | ||
} yield peer | ||
} | ||
|
||
// Checks that the received block bodies are an ordered subset of the ones requested | ||
@tailrec | ||
private def bodiesAreOrderedSubsetOfRequested( | ||
requestedHeaders: Seq[BlockHeader], | ||
respondedBodies: Seq[BlockBody], | ||
matchedBlocks: Seq[Block] = Nil | ||
): Option[Seq[Block]] = | ||
(requestedHeaders, respondedBodies) match { | ||
case (Seq(), _ +: _) => None | ||
case (_, Seq()) => Some(matchedBlocks) | ||
case (header +: remainingHeaders, body +: remainingBodies) => | ||
if (validator.validateHeaderAndBody(header, body).isRight) | ||
bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body)) | ||
else | ||
bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks) | ||
} | ||
object FetcherService { | ||
case class BlockIdentifier(transactionsRoot: ByteString, ommersHash: ByteString) | ||
object BlockIdentifier { | ||
def apply(blockHeader: BlockHeader): BlockIdentifier = | ||
BlockIdentifier(blockHeader.transactionsRoot, blockHeader.ommersHash) | ||
|
||
def apply(blockBody: BlockBody): BlockIdentifier = | ||
BlockIdentifier( | ||
ByteString(StdBlockValidator.transactionsRootHash(blockBody).toIterable), | ||
ByteString(StdBlockValidator.blockBodyOmmersHash(blockBody).toIterable) | ||
) | ||
} | ||
|
||
def buildBlocks(headers: Seq[BlockHeader], bodies: Seq[BlockBody]): Seq[Block] = { | ||
val bodyById = bodies.view.map(body => BlockIdentifier(body) -> body).toMap | ||
for { | ||
header <- headers | ||
body <- bodyById.get(BlockIdentifier(header)) | ||
} yield Block(header, body) | ||
} | ||
|
||
/** State of block fetching stream after processing a given incoming message with block headers or bodies | ||
* | ||
* @param outstanding headers that are yet to be matched to bodies | ||
* @param result blocks produced by matching received headers with received bodies | ||
*/ | ||
case class FetchState( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so in the end we still can't get rid of the state? this was one of the goals we defined. We still can do it by using sync processing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this isn't a state in the imperative 'shared mutable state' sense; it's kept by the |
||
outstanding: Set[BlockHeader], | ||
result: Seq[Block] | ||
) | ||
object FetchState { | ||
val initial: FetchState = FetchState(Set.empty, Nil) | ||
} | ||
|
||
// TODO: remove once we have the FetcherService instance integrated | ||
val tempFlow: Flow[MessageFromPeer, Seq[Block], NotUsed] = | ||
Flow[MessageFromPeer] | ||
.scan(FetchState.initial) { | ||
case (FetchState(outstanding, _), MessageFromPeer(BlockHeaders(headers), peerId)) => | ||
FetchState(outstanding.concat(headers), Nil) | ||
case (FetchState(outstanding, _), MessageFromPeer(BlockBodies(bodies), _)) => | ||
val blocks = buildBlocks(outstanding.toSeq, bodies) | ||
FetchState(outstanding.removedAll(blocks.map(_.header)), blocks) | ||
} | ||
.collect { case FetchState(_, blocks) if blocks.nonEmpty => blocks } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,25 @@ package io.iohk.ethereum.blockchain.sync.regular | |
import akka.actor.Actor | ||
import akka.actor.ActorLogging | ||
import akka.actor.ActorRef | ||
import akka.actor.ActorSystem | ||
import akka.actor.AllForOneStrategy | ||
import akka.actor.Cancellable | ||
import akka.actor.Props | ||
import akka.actor.Scheduler | ||
import akka.actor.SupervisorStrategy | ||
import akka.actor.typed.scaladsl.adapter._ | ||
import akka.actor.typed.{ActorRef => TypedActorRef} | ||
import akka.stream.OverflowStrategy | ||
import akka.stream.scaladsl.Flow | ||
import akka.stream.scaladsl.Sink | ||
import akka.stream.scaladsl.Source | ||
import akka.util.ByteString | ||
|
||
import cats.data.NonEmptyList | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.immutable.Queue | ||
import scala.concurrent.ExecutionContext | ||
|
||
import io.iohk.ethereum.blockchain.sync.Blacklist | ||
import io.iohk.ethereum.blockchain.sync.SyncProtocol | ||
|
@@ -24,7 +36,14 @@ import io.iohk.ethereum.consensus.validators.BlockValidator | |
import io.iohk.ethereum.db.storage.StateStorage | ||
import io.iohk.ethereum.domain.Block | ||
import io.iohk.ethereum.domain.BlockchainReader | ||
import io.iohk.ethereum.domain.branch.BestBranch | ||
import io.iohk.ethereum.domain.branch.Branch | ||
import io.iohk.ethereum.ledger.BranchResolution | ||
import io.iohk.ethereum.network.EtcPeerManagerActor | ||
import io.iohk.ethereum.network.Peer | ||
import io.iohk.ethereum.network.PeerEventBusActor | ||
import io.iohk.ethereum.network.p2p.messages.Codes | ||
import io.iohk.ethereum.network.p2p.messages.ETH62 | ||
import io.iohk.ethereum.nodebuilder.BlockchainConfigBuilder | ||
import io.iohk.ethereum.utils.ByteStringUtils | ||
import io.iohk.ethereum.utils.Config.SyncConfig | ||
|
@@ -43,13 +62,14 @@ class RegularSync( | |
ommersPool: ActorRef, | ||
pendingTransactionsManager: ActorRef, | ||
scheduler: Scheduler, | ||
configBuilder: BlockchainConfigBuilder | ||
configBuilder: BlockchainConfigBuilder, | ||
newFlow: Boolean | ||
) extends Actor | ||
with ActorLogging { | ||
|
||
val fetcher: TypedActorRef[BlockFetcher.FetchCommand] = | ||
context.spawn( | ||
BlockFetcher(peersClient, peerEventBus, self, syncConfig, blockValidator), | ||
BlockFetcher(peersClient, peerEventBus, self, syncConfig, blockValidator, newFlow), | ||
"block-fetcher" | ||
) | ||
|
||
|
@@ -78,22 +98,64 @@ class RegularSync( | |
"block-importer" | ||
) | ||
|
||
implicit val system: ActorSystem = context.system | ||
implicit val ec: ExecutionContext = context.dispatcher | ||
|
||
val printFetcherSchedule: Cancellable = | ||
scheduler.scheduleWithFixedDelay( | ||
syncConfig.printStatusInterval, | ||
syncConfig.printStatusInterval, | ||
fetcher.toClassic, | ||
BlockFetcher.PrintStatus | ||
)(context.dispatcher) | ||
) | ||
|
||
val (blockSourceQueue, blockSource) = Source.queue[Block](256, OverflowStrategy.fail).preMaterialize() | ||
val fetcherService = new FetcherService(blockchainReader, syncConfig, blockSourceQueue) | ||
|
||
override def receive: Receive = running( | ||
ProgressState(startedFetching = false, initialBlock = 0, currentBlock = 0, bestKnownNetworkBlock = 0) | ||
) | ||
|
||
private def startTemporaryBlockProducer() = { | ||
import monix.execution.Scheduler.Implicits.global | ||
PeerEventBusActor | ||
.messageSource( | ||
peerEventBus, | ||
PeerEventBusActor.SubscriptionClassifier | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you're replacing the PeerRequestHandler here then PeerDisconnectedClassifier is missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not replacing PeerRequestHandler yet. What's missing is a piece of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but it's gonna replace it in future? just pointing out what should be there if we proceed with this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I sorta misspoke. It's largely replacing |
||
.MessageClassifier( | ||
Set(Codes.BlockBodiesCode, Codes.BlockHeadersCode), | ||
PeerEventBusActor.PeerSelector.AllPeers | ||
) | ||
) | ||
.via(FetcherService.tempFlow) | ||
.buffer(256, OverflowStrategy.fail) | ||
jvdp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.mapConcat(identity) | ||
.runWith(Sink.foreachAsync(1) { block => | ||
fetcherService | ||
.placeBlockInPeerStream(block) | ||
.runToFuture | ||
.collect { case Right(()) => () } | ||
|
||
}) | ||
} | ||
|
||
private def startNewFlow() = | ||
blockSource | ||
.via(BranchBuffer.flow(blockchainReader)) | ||
.runWith(Sink.foreach { blocks => | ||
importer ! BlockFetcher.PickedBlocks(blocks) | ||
}) | ||
.onComplete(res => log.error(res.toString)) | ||
|
||
def running(progressState: ProgressState): Receive = { | ||
case SyncProtocol.Start => | ||
log.info("Starting regular sync") | ||
importer ! BlockImporter.Start | ||
if (newFlow) { | ||
startNewFlow() | ||
startTemporaryBlockProducer() | ||
} | ||
|
||
case SyncProtocol.MinedBlock(block) => | ||
log.info(s"Block mined [number = {}, hash = {}]", block.number, block.header.hashAsHexString) | ||
importer ! BlockImporter.MinedBlock(block) | ||
|
@@ -147,7 +209,8 @@ object RegularSync { | |
ommersPool: ActorRef, | ||
pendingTransactionsManager: ActorRef, | ||
scheduler: Scheduler, | ||
configBuilder: BlockchainConfigBuilder | ||
configBuilder: BlockchainConfigBuilder, | ||
newFlow: Boolean | ||
): Props = | ||
Props( | ||
new RegularSync( | ||
|
@@ -164,7 +227,8 @@ object RegularSync { | |
ommersPool, | ||
pendingTransactionsManager, | ||
scheduler, | ||
configBuilder | ||
configBuilder, | ||
newFlow | ||
) | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a branch buffer if we immediately store everything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't immediately store everything; only rooted blocks / branches, with the reasoning that these blocks have passed the proof of work threshold and so should be relatively safe to store without DoS concerns.
(With the caveat that we should also not store super old blocks; we have been discussing a "settled block" concept that we can also use for this.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. how much work do you think it takes to turn this into a structure that supports parallel branches? and can the current one be used in any way as is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current one is able to sync Sagano fully, but then might get stuck on a dead branch.
I think I'm close to one that will work for parallel branches for my current ticket (1059.)