Skip to content

[ETCM-105] State sync improvements #715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ object Dependencies {
"org.scala-sbt.ipcsocket" % "ipcsocket" % "1.1.0",
"com.google.guava" % "guava" % "29.0-jre",
"org.xerial.snappy" % "snappy-java" % "1.1.7.7",
"org.web3j" % "core" % "5.0.0" % Test
"org.web3j" % "core" % "5.0.0" % Test,
"io.vavr" % "vavr" % "1.0.0-alpha-3"
)

val prometheus: Seq[ModuleID] = {
Expand Down
16 changes: 16 additions & 0 deletions repo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,22 @@
url = "https://repo1.maven.org/maven2/io/suzaku/boopickle_2.12/1.3.3/boopickle_2.12-1.3.3.pom";
sha256 = "E88B339905B0C67211E08683A95AC7DB5185BF10BC7EC91378D95389D7CD5807";
};
"nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-javadoc.jar";
sha256 = "1F27BFD5D6187F5C57B699979ADE958C71D5A00A7C71B90ADA6F46A1FE0EFA78";
};
"nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-sources.jar" = {
url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-sources.jar";
sha256 = "CDE11815C879F2ED21437A59905F98C36F1BC4FFE1ACA2F7309A28EDEF60538A";
};
"nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.jar" = {
url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.jar";
sha256 = "D68F2A25AF5BDD4D26B2D272304040F39CB5031D91A3A295F13B9DE9EF0946B0";
};
"nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.pom" = {
url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.pom";
sha256 = "CF7C124A2C9EA71543EFD99C6A5BB0BB31C44BDF266A28F819AD1509C825ED88";
};
"nix-public/jline/jline/2.14.6/jline-2.14.6-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/jline/jline/2.14.6/jline-2.14.6-javadoc.jar";
sha256 = "EBD162363C0A6CA9E52AF51D7377F78F559D3F2663DC896E0DBF3B16A0188972";
Expand Down
169 changes: 153 additions & 16 deletions src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package io.iohk.ethereum.sync

import java.net.{InetSocketAddress, ServerSocket}
import java.nio.file.Files
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ThreadLocalRandom, TimeoutException}

import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.TestProbe
Expand All @@ -13,6 +13,7 @@ import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig}
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages}
import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource}
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
Expand All @@ -38,9 +39,9 @@ import io.iohk.ethereum.network.{
ServerActor
}
import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder}
import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, IdentityUpdate, updateStateAtBlock}
import io.iohk.ethereum.sync.FastSyncItSpec._
import io.iohk.ethereum.utils.ServerStatus.Listening
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus, VmConfig}
import io.iohk.ethereum.utils._
import io.iohk.ethereum.vm.EvmConfig
import io.iohk.ethereum.{Fixtures, FlatSpecBase, Timeouts}
import monix.eval.Task
Expand Down Expand Up @@ -79,14 +80,66 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
_ <- peer1.waitForFastSyncFinish()
} yield {
val trie = peer1.getBestBlockTrie()
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500)
// due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
// state, so peer1 can get whole trie from both of them.
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
assert(trie.isDefined)
assert(synchronizingPeerHaveAllData)
}
}

it should "should sync blockchain with state nodes when peer do not response with full responses" in
customTestCaseResourceM(
FakePeer.start3FakePeersRes(
fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()),
fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig())
)
) { case (peer1, peer2, peer3) =>
for {
_ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500))
_ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500))
_ <- peer1.connectToPeers(Set(peer2.node, peer3.node))
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.waitForFastSyncFinish()
} yield {
val trie = peer1.getBestBlockTrie()
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500)
// due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
// state, so peer1 can get whole trie from both of them.
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
assert(trie.isDefined)
assert(synchronizingPeerHaveAllData)
}
}

it should "should sync blockchain with state nodes when one of the peers send empty state responses" in
customTestCaseResourceM(
FakePeer.start3FakePeersRes(
fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()),
fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig().copy(maxMptComponentsPerMessage = 0))
)
) { case (peer1, peer2, peer3) =>
for {
_ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500))
_ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500))
_ <- peer1.connectToPeers(Set(peer2.node, peer3.node))
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.waitForFastSyncFinish()
} yield {
val trie = peer1.getBestBlockTrie()
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500)
// due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
// state, so peer1 can get whole trie from both of them.
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
assert(trie.isDefined)
assert(synchronizingPeerHaveAllData)
}
}

it should "should update target block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
for {
Expand All @@ -99,6 +152,21 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
}
}

it should "should update target block and sync this new target block state" in customTestCaseResourceM(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: pivot instead of target

FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
for {
_ <- peer2.importBlocksUntil(1000)(IdentityUpdate)
_ <- peer1.connectToPeers(Set(peer2.node))
_ <- peer2.importBlocksUntil(2000)(updateStateAtBlock(1500)).startAndForget
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.waitForFastSyncFinish()
} yield {
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
}
}

}

object FastSyncItSpec {
Expand Down Expand Up @@ -155,7 +223,32 @@ object FastSyncItSpec {
}
}

class FakePeer(peerName: String) extends SecureRandomBuilder with TestSyncConfig {
case class HostConfig(
maxBlocksHeadersPerMessage: Int,
maxBlocksBodiesPerMessage: Int,
maxReceiptsPerMessage: Int,
maxMptComponentsPerMessage: Int
) extends FastSyncHostConfiguration

object HostConfig {
def apply(): HostConfig = {
val random: ThreadLocalRandom = ThreadLocalRandom.current()
new HostConfig(
maxBlocksHeadersPerMessage = random.nextInt(100, 201),
maxBlocksBodiesPerMessage = random.nextInt(30, 51),
maxReceiptsPerMessage = random.nextInt(30, 51),
maxMptComponentsPerMessage = random.nextInt(100, 201)
)
}
}

final case class FakePeerCustomConfig(hostConfig: HostConfig)

val defaultConfig = FakePeerCustomConfig(HostConfig(200, 200, 200, 200))

class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig)
extends SecureRandomBuilder
with TestSyncConfig {
implicit val akkaTimeout: Timeout = Timeout(5.second)

val config = Config.config
Expand Down Expand Up @@ -231,10 +324,10 @@ object FastSyncItSpec {

val peerConf = new PeerConfiguration {
override val fastSyncHostConfiguration: FastSyncHostConfiguration = new FastSyncHostConfiguration {
val maxBlocksHeadersPerMessage: Int = 200
val maxBlocksBodiesPerMessage: Int = 200
val maxReceiptsPerMessage: Int = 200
val maxMptComponentsPerMessage: Int = 200
val maxBlocksHeadersPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksHeadersPerMessage
val maxBlocksBodiesPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksBodiesPerMessage
val maxReceiptsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxReceiptsPerMessage
val maxMptComponentsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxMptComponentsPerMessage
}
override val rlpxConfiguration: RLPxConfiguration = new RLPxConfiguration {
override val waitForTcpAckTimeout: FiniteDuration = Timeouts.normalTimeout
Expand Down Expand Up @@ -445,36 +538,80 @@ object FastSyncItSpec {
)
}.toOption
}

def containsExpectedDataUpToAccountAtBlock(n: BigInt, blockNumber: BigInt): Boolean = {
def go(i: BigInt): Boolean = {
if (i >= n) {
true
} else {
val expectedBalance = i
val accountAddress = Address(i)
val accountExpectedCode = ByteString(i.toByteArray)
val codeHash = kec256(accountExpectedCode)
val accountExpectedStorageAddresses = (i until i + 20).toList
val account = bl.getAccount(accountAddress, blockNumber).get
val code = bl.getEvmCodeByHash(codeHash).get
val storedData = accountExpectedStorageAddresses.map { addr =>
ByteUtils.toBigInt(bl.getAccountStorageAt(account.storageRoot, addr, ethCompatibleStorage = true))
}
val haveAllStoredData = accountExpectedStorageAddresses.zip(storedData).forall { case (address, value) =>
address == value
}

val dataIsCorrect =
account.balance.toBigInt == expectedBalance && code == accountExpectedCode && haveAllStoredData
if (dataIsCorrect) {
go(i + 1)
} else {
false
}
}
}

go(0)
}
}

object FakePeer {
def startFakePeer(peerName: String): Task[FakePeer] = {
def startFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig): Task[FakePeer] = {
for {
peer <- Task(new FakePeer(peerName))
peer <- Task(new FakePeer(peerName, fakePeerCustomConfig))
_ <- peer.startPeer()
} yield peer
}

def start1FakePeerRes(): Resource[Task, FakePeer] = {
def start1FakePeerRes(fakePeerCustomConfig: FakePeerCustomConfig = defaultConfig): Resource[Task, FakePeer] = {
Resource.make {
startFakePeer("Peer1")
startFakePeer("Peer1", fakePeerCustomConfig)
} { peer =>
peer.shutdown()
}
}

def start2FakePeersRes() = {
def start2FakePeersRes(
fakePeerCustomConfig1: FakePeerCustomConfig = defaultConfig,
fakePeerCustomConfig2: FakePeerCustomConfig = defaultConfig
) = {
Resource.make {
Task.parZip2(startFakePeer("Peer1"), startFakePeer("Peer2"))
Task.parZip2(startFakePeer("Peer1", fakePeerCustomConfig1), startFakePeer("Peer2", fakePeerCustomConfig2))
} { case (peer, peer1) => Task.parMap2(peer.shutdown(), peer1.shutdown())((_, _) => ()) }
}

def start3FakePeersRes() = {
def start3FakePeersRes(
fakePeerCustomConfig1: FakePeerCustomConfig = defaultConfig,
fakePeerCustomConfig2: FakePeerCustomConfig = defaultConfig,
fakePeerCustomConfig3: FakePeerCustomConfig = defaultConfig
) = {
Resource.make {
Task.parZip3(startFakePeer("Peer1"), startFakePeer("Peer2"), startFakePeer("Peer3"))
Task.parZip3(
startFakePeer("Peer1", fakePeerCustomConfig1),
startFakePeer("Peer2", fakePeerCustomConfig2),
startFakePeer("Peer3", fakePeerCustomConfig3)
)
} { case (peer, peer1, peer2) =>
Task.parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_, _, _) => ())
}
}
}

}
2 changes: 1 addition & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ mantis {

# During fast-sync when most up to date block is determined from peers, the actual target block number
# will be decreased by this value
target-block-offset = 128
target-block-offset = 32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this value was taken from geth, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is part of my experiments with fast sync. It seems geth tries to have offset equal to 64 blocks. But their sync is much faster and they process a lot more nodes before updating to new target.
128 is definitly to much, as large part of the peers keeps only 128 blocks history, so it can happen that they won;t have target block root and our sync will not even start.


# How often to query peers for new blocks after the top of the chain has been reached
check-for-new-block-interval = 10.seconds
Expand Down
Loading