-
Notifications
You must be signed in to change notification settings - Fork 75
[ETCM-105] State sync improvements #715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6fc329e
to
542a06a
Compare
1c56ac7
to
8090650
Compare
8090650
to
e487563
Compare
val etcPeerManager: ActorRef, | ||
val syncConfig: SyncConfig, | ||
implicit val scheduler: Scheduler) | ||
val fastSyncStateStorage: FastSyncStateStorage, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange formatting
@@ -72,6 +72,10 @@ class FastSync( | |||
s"block to download to ${syncState.safeDownloadTarget}") | |||
val syncingHandler = new SyncingHandler(syncState) | |||
context become syncingHandler.receive | |||
if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) { | |||
// chain has already been downloaded we can start state sync | |||
syncingHandler.startStateSync(syncState.targetBlock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some log here will be helpful?
if (blockchainDataToDownload) | ||
processDownloads() | ||
else if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) { | ||
// TODO we are waiting for state sync to finish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this TODO will be addressed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i will add ticket number there. My plan was to address this in ETCM-103
, as there i will add monitoring for stale target block, and will probably know more how exactly this synicng loop should look like in FastSync
nextBlockToFullyValidate: BigInt = 1, | ||
targetBlockUpdateFailures: Int = 0, | ||
updatingTargetBlock: Boolean = false) { | ||
targetBlock: BlockHeader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange formatting
* If it would valuable, it possible to implement processor which would gather statistics about duplicated or not requested data. | ||
*/ | ||
def processResponses(state: SchedulerState, responses: List[SyncResponse]): Either[CriticalError, SchedulerState] = { | ||
def go(currentState: SchedulerState, remaining: Seq[SyncResponse]): Either[CriticalError, SchedulerState] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing @tailrec
requestType match { | ||
case SyncStateScheduler.StateNode => | ||
import io.iohk.ethereum.network.p2p.messages.PV63.AccountImplicits._ | ||
scala.util.Try(n.value.toArray[Byte].toAccount).toEither.left.map(_ => NotAccountLeafNode).map { account => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add some decoder from RLPEncodable to Account and use n.parsedRlp
instead of using Try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add custom apply
method for account to do that, to not expose all this details here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had in mind that we could use already parsed LeafNote to avoid unnecessary decoding from bytes
} | ||
|
||
private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = { | ||
if (state.memBatch.contains(req.nodeHash)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simpler: private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = state.memBatch.contains(req.nodeHash) || isInDatabase(req)
|
||
private val stateNodeRequestComparator = new Comparator[StateNodeRequest] { | ||
override def compare(o1: StateNodeRequest, o2: StateNodeRequest): Int = { | ||
if (o1.nodeDepth > o2.nodeDepth) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simpler o2.nodeDepth compare o1.nodeDepth
Add tailrec annotation Simplify known node check Fix formatting in all files
val (newRequests, newState) = | ||
currentState.assignTasksToPeers( | ||
NonEmptyList.fromListUnsafe(freePeers.toList), | ||
Some(newNodesToGet), | ||
syncConfig.nodesPerRequest | ||
) | ||
log.info( | ||
"Creating {} new state node requests. Current request queue size is {}", | ||
newRequests.size, | ||
newState.nodesToGet.size | ||
) | ||
newRequests.foreach { request => | ||
requestNodes(request) | ||
} | ||
context.become(downloading(scheduler, newState)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be extracted to separate a method. It is the same in both cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aahh you are right we probably do not need check for peers message at all
if (nextRequested == receivedHash) { | ||
go(requestedRemaining.tail, receivedRemaining.tail, SyncResponse(receivedHash, nextReceived) :: processed) | ||
} else { | ||
// hash of next element does not match return what what we have processed, and remaing hashes to get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very minor: typo remaing
} else { | ||
val (notReceived, received) = process(requestedHashes.toList, receivedMessage.values.toList) | ||
if (received.isEmpty) { | ||
val rescheduleRequestedHashes = notReceived.foldLeft(nodesToGet) { case (map, hash) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simpler (?): nodesToGet ++ notReceived.map(_ -> None)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but at least two traversals of notReceived
collection, one for map
one for addition to map. I would leave it as it is.
// so we can ignore those errors. | ||
sync.processResponses(currentState, nodes) match { | ||
case Left(value) => | ||
log.info(s"Critical error while state syncing ${value}, stopping state sync") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error
if (parentsToCheck.isEmpty) { | ||
(currentRequests, currentBatch) | ||
} else { | ||
val parent = parentsToCheck.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about adding some meaningful exception here? eg. val parent = parentsToCheck.headOption.getOrElse(throw new IllegalStateException("Critical exception. Cannot find parent"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it sound like good idea, it will be instantly known that some invariants have been broken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very minor comments, will continue reviewing tomorrow
src/main/resources/application.conf
Outdated
@@ -325,7 +325,7 @@ mantis { | |||
|
|||
# During fast-sync when most up to date block is determined from peers, the actual target block number | |||
# will be decreased by this value | |||
target-block-offset = 128 | |||
target-block-offset = 32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this value was taken from geth, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is part of my experiments with fast sync. It seems geth tries to have offset equal to 64 blocks. But their sync is much faster and they process a lot more nodes before updating to new target.
128 is definitly to much, as large part of the peers keeps only 128 blocks history, so it can happen that they won;t have target block root and our sync will not even start.
} | ||
|
||
def getMissingHashes(max: Int): (List[ByteString], SchedulerState) = { | ||
def go( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing @tailrec
/** | ||
* Default responses processor which ignores duplicated or not requested hashes, but informs the caller about critical | ||
* errors. | ||
* If it would valuable, it possible to implement processor which would gather statistics about duplicated or not requested data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could log them for now?
Add statistics logging Remove unnecessary CheckPeers Messages from downloader
override def run(sender: ActorRef, msg: Any): AutoPilot = { | ||
msg match { | ||
case SendMessage(msg, peer) if msg.underlyingMsg.isInstanceOf[GetNodeData] => | ||
val msgToGet = msg.underlyingMsg.asInstanceOf[GetNodeData] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pattern matching instead?
src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala
Show resolved
Hide resolved
src/test/scala/io/iohk/ethereum/blockchain/sync/SyncSchedulerSpec.scala
Outdated
Show resolved
Hide resolved
val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() | ||
val header = Fixtures.Blocks.ValidBlock.header.copy(stateRoot = worldHash, number = 1) | ||
schedulerBlockchain.storeBlockHeader(header).commit() | ||
var state = scheduler.initState(worldHash).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor - foldLeft
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe i am missing something, but this is not necessary fold, as we do not have any collection to summarise into one value, but need to process some stuff until some condition hold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, I missed the fact that condition is being checked on different state with each loop, which makes it much elaborate to express purely than while loop.
src/test/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderStateSpec.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this more of isResponseAlreadyKnown? To difference with isRequestAlreadyKnownOrResolved
case n: BranchNode => | ||
Right(n.children.collect { case HashNode(childHash) => | ||
StateNodeRequest( | ||
ByteString.fromArrayUnsafe(childHash), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the ByteString.fromArrayUnsafe
is here for performance reasons, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, it can be used when we known that provided array won't be mutated. (as in this case)
Refactor FastSyncIt tests Properly close actor system in StateSyncSpec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments only. If it syncs with the mainnet it is ready to merge
} | ||
|
||
private def isRequestedHashAlreadyCommitted(state: SchedulerState, req: StateNodeRequest): Boolean = { | ||
// TODO add bloom filter step before data base to speed things up. Bloomfilter will need to be reloaded after node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: please add JIRA ticket to the comment
|
||
case object AlreadyProcessedItem extends NotCriticalError | ||
|
||
final case class ProcessingStatistics(duplicatedHashes: Long, notRequestedHashes: Long, saved: Long) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to expose ProcessingStatistics as a metric?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is possible, we can even save it to do to save stats between shutdowns. I will probably do this when sync will be ready for its prime time(i.e it will work with mainnet)
with BeforeAndAfterAll | ||
with ScalaCheckPropertyChecks { | ||
|
||
override def afterAll(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: You could use WithActorSystemShutDown
trait
with Matchers | ||
with BeforeAndAfterAll { | ||
|
||
override def afterAll(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Task.raiseError(new TimeoutException("Task time out after all retries")) | ||
} | ||
} | ||
it should "should update target block and sync this new target block state" in customTestCaseResourceM( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: pivot
instead of target
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor stuff only. LGTM if it syncs with mainnet (I can test Mordor over the weekend if you want to)
newNodes: Option[Seq[ByteString]], | ||
nodesPerPeerCapacity: Int | ||
): (Seq[PeerRequest], DownloaderState) = { | ||
def go( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: missing @tailrec
?
val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() | ||
val header = Fixtures.Blocks.ValidBlock.header.copy(stateRoot = worldHash, number = 1) | ||
schedulerBlockchain.storeBlockHeader(header).commit() | ||
var state = scheduler.initState(worldHash).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, I missed the fact that condition is being checked on different state with each loop, which makes it much elaborate to express purely than while loop.
import org.scalatest.matchers.must.Matchers | ||
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks | ||
|
||
class SyncSchedulerSpec extends AnyFlatSpec with Matchers with EitherValues with ScalaCheckPropertyChecks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor - names are out of sync (SyncSchedulerSpec, SyncSchedulerState, SyncStateScheduler)
val goodResponse = peerRequest.nodes.toList.take(perPeerCapacity / 2).map(h => hashNodeMap(h)) | ||
val badResponse = (200 until 210).map(ByteString(_)).toList | ||
val (result, newState2) = newState1.handleRequestSuccess(requests(0).peer, NodeData(goodResponse ++ badResponse)) | ||
assert(result.isInstanceOf[UsefulData]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pattern match instead?
@kapke It would be great if you try mordor sync. As for mainnet, syncing with it will be possible after next ticker in line
|
Description
PR which makes all grunt work necessary for implementing fast sync restarting at arbitrary new target block. Notable changes:
FastSyc
actor and split it into two separate componentsStateSyncScheduler
which traverses mpt trie in dfs fashion and create requests for currently missing nodes,StateSyncDownloader
which retrieves those nodes from remote peers and provides them toScheduler
syncStateStorageActor
. This process was highly indeterministic and probably could lose data (i did not have any test case for it, but it easy to imagine situation when node is killed during node processing with not correctly updated queues).With this pr, after restart state sync start from scratch i.e from already known target block, but it does not request nodes which are already saved in database.
The order of traversal here is important, as the fact that we have node at level
n
implies that we have all subtries at deeper levels, so ultimatly we need to traverse only unknown paths from root.Whole solution is havily influenced by the way how
Geth
is handling state sync in FastSyncFuture Tasks
StateSyncScheduler
to request restart, waiting for downloader to finish or its download tasks.MissingNodes
messages and do the processing in background. Then when the first messge withdownloaderCapacity
> 0 arrives, scheduler would send missing nodes in between processing ofMissingNodes
message.TODO