Skip to content

[ETCM-105] State sync improvements #715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 9, 2020
Merged

Conversation

KonradStaniec
Copy link
Contributor

@KonradStaniec KonradStaniec commented Oct 1, 2020

Description

PR which makes all grunt work necessary for implementing fast sync restarting at arbitrary new target block. Notable changes:

  • moves state sync from FastSyc actor and split it into two separate components StateSyncScheduler which traverses mpt trie in dfs fashion and create requests for currently missing nodes, StateSyncDownloader which retrieves those nodes from remote peers and provides them to Scheduler
  • Add a lot of testing to state sync process. Until now state sync process was almost untested, and making any change to it was a lottery.
  • Fix a bug with target block update. Until now state sync process started just after safe target header was downloaded (without receipts and bodies), this pr fixs it that state sync is started only when whole blockchain data is retrieved.
  • Change how state sync behaves after node restart. Until now state of the download queues was persist periodically by syncStateStorageActor. This process was highly indeterministic and probably could lose data (i did not have any test case for it, but it easy to imagine situation when node is killed during node processing with not correctly updated queues).
    With this pr, after restart state sync start from scratch i.e from already known target block, but it does not request nodes which are already saved in database.
    The order of traversal here is important, as the fact that we have node at level n implies that we have all subtries at deeper levels, so ultimatly we need to traverse only unknown paths from root.

Whole solution is havily influenced by the way how Geth is handling state sync in FastSync

Future Tasks

  • Restarting state sync to new target - with all current improvements from this PR, it will be just the case of sending signal
    StateSyncScheduler to request restart, waiting for downloader to finish or its download tasks.
  • Currently we sync first blockchain then state, with all with all current improvements from this PR, it would be posible to start both concurrently, and update target to some final at some point. Changes necessry to do that requires changing how we handle list of handshaked peers to avoind concurrent state and blockchain requests to peers.
  • Protocol between scheduler and downloader could be improved, as now when there is large batch of brach nodes, scheduler can take long time to process them and downloader can sit idly. The solution, would be to have an explicit queue of MissingNodes messages and do the processing in background. Then when the first messge with downloaderCapacity > 0 arrives, scheduler would send missing nodes in between processing of MissingNodes message.

TODO

  • Fix existing tests
  • Add docs
  • Run new state sync on real net - mordor - completed whole mordor state sync in 4 minutes
  • Run new state sync on real net - main - partially completed as remote peers run out of nodes, to make this complete, we need the stale target detection, which is planned as next feature.

@KonradStaniec KonradStaniec force-pushed the etcm-105/improve-fast-sync branch from 6fc329e to 542a06a Compare October 1, 2020 13:31
@KonradStaniec KonradStaniec force-pushed the etcm-105/improve-fast-sync branch 2 times, most recently from 1c56ac7 to 8090650 Compare October 2, 2020 10:25
@KonradStaniec KonradStaniec force-pushed the etcm-105/improve-fast-sync branch from 8090650 to e487563 Compare October 2, 2020 11:00
@KonradStaniec KonradStaniec marked this pull request as ready for review October 2, 2020 11:01
val etcPeerManager: ActorRef,
val syncConfig: SyncConfig,
implicit val scheduler: Scheduler)
val fastSyncStateStorage: FastSyncStateStorage,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange formatting

@@ -72,6 +72,10 @@ class FastSync(
s"block to download to ${syncState.safeDownloadTarget}")
val syncingHandler = new SyncingHandler(syncState)
context become syncingHandler.receive
if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) {
// chain has already been downloaded we can start state sync
syncingHandler.startStateSync(syncState.targetBlock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some log here will be helpful?

if (blockchainDataToDownload)
processDownloads()
else if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) {
// TODO we are waiting for state sync to finish
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this TODO will be addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will add ticket number there. My plan was to address this in ETCM-103, as there i will add monitoring for stale target block, and will probably know more how exactly this synicng loop should look like in FastSync

nextBlockToFullyValidate: BigInt = 1,
targetBlockUpdateFailures: Int = 0,
updatingTargetBlock: Boolean = false) {
targetBlock: BlockHeader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange formatting

* If it would valuable, it possible to implement processor which would gather statistics about duplicated or not requested data.
*/
def processResponses(state: SchedulerState, responses: List[SyncResponse]): Either[CriticalError, SchedulerState] = {
def go(currentState: SchedulerState, remaining: Seq[SyncResponse]): Either[CriticalError, SchedulerState] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @tailrec

requestType match {
case SyncStateScheduler.StateNode =>
import io.iohk.ethereum.network.p2p.messages.PV63.AccountImplicits._
scala.util.Try(n.value.toArray[Byte].toAccount).toEither.left.map(_ => NotAccountLeafNode).map { account =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add some decoder from RLPEncodable to Account and use n.parsedRlp instead of using Try

Copy link
Contributor Author

@KonradStaniec KonradStaniec Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add custom apply method for account to do that, to not expose all this details here.

Copy link
Contributor

@mmrozek mmrozek Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had in mind that we could use already parsed LeafNote to avoid unnecessary decoding from bytes

}

private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = {
if (state.memBatch.contains(req.nodeHash)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler: private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = state.memBatch.contains(req.nodeHash) || isInDatabase(req)


private val stateNodeRequestComparator = new Comparator[StateNodeRequest] {
override def compare(o1: StateNodeRequest, o2: StateNodeRequest): Int = {
if (o1.nodeDepth > o2.nodeDepth) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler o2.nodeDepth compare o1.nodeDepth

Add tailrec annotation
Simplify known node check
Fix formatting in all files
Comment on lines 114 to 128
val (newRequests, newState) =
currentState.assignTasksToPeers(
NonEmptyList.fromListUnsafe(freePeers.toList),
Some(newNodesToGet),
syncConfig.nodesPerRequest
)
log.info(
"Creating {} new state node requests. Current request queue size is {}",
newRequests.size,
newState.nodesToGet.size
)
newRequests.foreach { request =>
requestNodes(request)
}
context.become(downloading(scheduler, newState))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be extracted to separate a method. It is the same in both cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aahh you are right we probably do not need check for peers message at all

if (nextRequested == receivedHash) {
go(requestedRemaining.tail, receivedRemaining.tail, SyncResponse(receivedHash, nextReceived) :: processed)
} else {
// hash of next element does not match return what what we have processed, and remaing hashes to get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor: typo remaing

} else {
val (notReceived, received) = process(requestedHashes.toList, receivedMessage.values.toList)
if (received.isEmpty) {
val rescheduleRequestedHashes = notReceived.foldLeft(nodesToGet) { case (map, hash) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler (?): nodesToGet ++ notReceived.map(_ -> None)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but at least two traversals of notReceived collection, one for map one for addition to map. I would leave it as it is.

// so we can ignore those errors.
sync.processResponses(currentState, nodes) match {
case Left(value) =>
log.info(s"Critical error while state syncing ${value}, stopping state sync")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.error

if (parentsToCheck.isEmpty) {
(currentRequests, currentBatch)
} else {
val parent = parentsToCheck.head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about adding some meaningful exception here? eg. val parent = parentsToCheck.headOption.getOrElse(throw new IllegalStateException("Critical exception. Cannot find parent"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sound like good idea, it will be instantly known that some invariants have been broken

ntallar
ntallar previously requested changes Oct 6, 2020
Copy link

@ntallar ntallar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor comments, will continue reviewing tomorrow

@@ -325,7 +325,7 @@ mantis {

# During fast-sync when most up to date block is determined from peers, the actual target block number
# will be decreased by this value
target-block-offset = 128
target-block-offset = 32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this value was taken from geth, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is part of my experiments with fast sync. It seems geth tries to have offset equal to 64 blocks. But their sync is much faster and they process a lot more nodes before updating to new target.
128 is definitly to much, as large part of the peers keeps only 128 blocks history, so it can happen that they won;t have target block root and our sync will not even start.

}

def getMissingHashes(max: Int): (List[ByteString], SchedulerState) = {
def go(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @tailrec

/**
* Default responses processor which ignores duplicated or not requested hashes, but informs the caller about critical
* errors.
* If it would valuable, it possible to implement processor which would gather statistics about duplicated or not requested data.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could log them for now?

Add statistics logging
Remove unnecessary CheckPeers Messages from downloader
override def run(sender: ActorRef, msg: Any): AutoPilot = {
msg match {
case SendMessage(msg, peer) if msg.underlyingMsg.isInstanceOf[GetNodeData] =>
val msgToGet = msg.underlyingMsg.asInstanceOf[GetNodeData]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pattern matching instead?

val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler()
val header = Fixtures.Blocks.ValidBlock.header.copy(stateRoot = worldHash, number = 1)
schedulerBlockchain.storeBlockHeader(header).commit()
var state = scheduler.initState(worldHash).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - foldLeft?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i am missing something, but this is not necessary fold, as we do not have any collection to summarise into one value, but need to process some stuff until some condition hold

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, I missed the fact that condition is being checked on different state with each loop, which makes it much elaborate to express purely than while loop.

}
}

private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this more of isResponseAlreadyKnown? To difference with isRequestAlreadyKnownOrResolved

case n: BranchNode =>
Right(n.children.collect { case HashNode(childHash) =>
StateNodeRequest(
ByteString.fromArrayUnsafe(childHash),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the ByteString.fromArrayUnsafe is here for performance reasons, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it can be used when we known that provided array won't be mutated. (as in this case)

Copy link
Contributor

@mmrozek mmrozek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments only. If it syncs with the mainnet it is ready to merge

}

private def isRequestedHashAlreadyCommitted(state: SchedulerState, req: StateNodeRequest): Boolean = {
// TODO add bloom filter step before data base to speed things up. Bloomfilter will need to be reloaded after node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: please add JIRA ticket to the comment


case object AlreadyProcessedItem extends NotCriticalError

final case class ProcessingStatistics(duplicatedHashes: Long, notRequestedHashes: Long, saved: Long) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to expose ProcessingStatistics as a metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible, we can even save it to do to save stats between shutdowns. I will probably do this when sync will be ready for its prime time(i.e it will work with mainnet)

with BeforeAndAfterAll
with ScalaCheckPropertyChecks {

override def afterAll(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: You could use WithActorSystemShutDown trait

with Matchers
with BeforeAndAfterAll {

override def afterAll(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Task.raiseError(new TimeoutException("Task time out after all retries"))
}
}
it should "should update target block and sync this new target block state" in customTestCaseResourceM(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: pivot instead of target

@ntallar ntallar dismissed their stale review October 9, 2020 11:26

There are already 2 other reviewers

Copy link
Contributor

@kapke kapke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor stuff only. LGTM if it syncs with mainnet (I can test Mordor over the weekend if you want to)

newNodes: Option[Seq[ByteString]],
nodesPerPeerCapacity: Int
): (Seq[PeerRequest], DownloaderState) = {
def go(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: missing @tailrec?

val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler()
val header = Fixtures.Blocks.ValidBlock.header.copy(stateRoot = worldHash, number = 1)
schedulerBlockchain.storeBlockHeader(header).commit()
var state = scheduler.initState(worldHash).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, I missed the fact that condition is being checked on different state with each loop, which makes it much elaborate to express purely than while loop.

import org.scalatest.matchers.must.Matchers
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks

class SyncSchedulerSpec extends AnyFlatSpec with Matchers with EitherValues with ScalaCheckPropertyChecks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor - names are out of sync (SyncSchedulerSpec, SyncSchedulerState, SyncStateScheduler)

val goodResponse = peerRequest.nodes.toList.take(perPeerCapacity / 2).map(h => hashNodeMap(h))
val badResponse = (200 until 210).map(ByteString(_)).toList
val (result, newState2) = newState1.handleRequestSuccess(requests(0).peer, NodeData(goodResponse ++ badResponse))
assert(result.isInstanceOf[UsefulData])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pattern match instead?

@KonradStaniec
Copy link
Contributor Author

@kapke It would be great if you try mordor sync. As for mainnet, syncing with it will be possible after next ticker in line ETCM-103 wchich will add two things:

  • Pivot block updates when state syncing
  • bloom filter in sync scheduler (without it syncing would take 2-3 days)

@KonradStaniec KonradStaniec merged commit 11755e2 into develop Oct 9, 2020
@KonradStaniec KonradStaniec deleted the etcm-105/improve-fast-sync branch October 9, 2020 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants