-
Notifications
You must be signed in to change notification settings - Fork 76
[ETCM-105] State sync improvements #715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
542a06a
2aced07
e487563
c47db27
f606b82
4189a55
391cbe2
9cbbf56
e0fd3ac
7bcae77
4d1ad0f
2627ea5
11755e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,8 @@ package io.iohk.ethereum.sync | |
|
||
import java.net.{InetSocketAddress, ServerSocket} | ||
import java.nio.file.Files | ||
import java.util.concurrent.TimeoutException | ||
import java.util.concurrent.atomic.AtomicReference | ||
import java.util.concurrent.{ThreadLocalRandom, TimeoutException} | ||
|
||
import akka.actor.{ActorRef, ActorSystem} | ||
import akka.testkit.TestProbe | ||
|
@@ -13,6 +13,7 @@ import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed | |
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor | ||
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock | ||
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig} | ||
import io.iohk.ethereum.crypto.kec256 | ||
import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages} | ||
import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource} | ||
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} | ||
|
@@ -38,9 +39,9 @@ import io.iohk.ethereum.network.{ | |
ServerActor | ||
} | ||
import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder} | ||
import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, IdentityUpdate, updateStateAtBlock} | ||
import io.iohk.ethereum.sync.FastSyncItSpec._ | ||
import io.iohk.ethereum.utils.ServerStatus.Listening | ||
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus, VmConfig} | ||
import io.iohk.ethereum.utils._ | ||
import io.iohk.ethereum.vm.EvmConfig | ||
import io.iohk.ethereum.{Fixtures, FlatSpecBase, Timeouts} | ||
import monix.eval.Task | ||
|
@@ -79,14 +80,66 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter { | |
_ <- peer1.waitForFastSyncFinish() | ||
} yield { | ||
val trie = peer1.getBestBlockTrie() | ||
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500) | ||
// due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same | ||
// state, so peer1 can get whole trie from both of them. | ||
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) | ||
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset) | ||
assert(trie.isDefined) | ||
assert(synchronizingPeerHaveAllData) | ||
} | ||
} | ||
|
||
it should "should sync blockchain with state nodes when peer do not response with full responses" in | ||
customTestCaseResourceM( | ||
FakePeer.start3FakePeersRes( | ||
fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()), | ||
fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig()) | ||
) | ||
) { case (peer1, peer2, peer3) => | ||
for { | ||
_ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500)) | ||
_ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500)) | ||
_ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) | ||
_ <- peer1.startFastSync().delayExecution(50.milliseconds) | ||
_ <- peer1.waitForFastSyncFinish() | ||
} yield { | ||
val trie = peer1.getBestBlockTrie() | ||
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500) | ||
// due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same | ||
// state, so peer1 can get whole trie from both of them. | ||
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) | ||
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset) | ||
assert(trie.isDefined) | ||
assert(synchronizingPeerHaveAllData) | ||
} | ||
} | ||
|
||
it should "should sync blockchain with state nodes when one of the peers send empty state responses" in | ||
customTestCaseResourceM( | ||
FakePeer.start3FakePeersRes( | ||
fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()), | ||
fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig().copy(maxMptComponentsPerMessage = 0)) | ||
) | ||
) { case (peer1, peer2, peer3) => | ||
for { | ||
_ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500)) | ||
_ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500)) | ||
_ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) | ||
_ <- peer1.startFastSync().delayExecution(50.milliseconds) | ||
_ <- peer1.waitForFastSyncFinish() | ||
} yield { | ||
val trie = peer1.getBestBlockTrie() | ||
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500) | ||
// due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same | ||
// state, so peer1 can get whole trie from both of them. | ||
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) | ||
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset) | ||
assert(trie.isDefined) | ||
assert(synchronizingPeerHaveAllData) | ||
} | ||
} | ||
|
||
it should "should update target block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { | ||
case (peer1, peer2) => | ||
for { | ||
|
@@ -99,6 +152,21 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter { | |
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) | ||
} | ||
} | ||
|
||
it should "should update target block and sync this new target block state" in customTestCaseResourceM( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: |
||
FakePeer.start2FakePeersRes() | ||
) { case (peer1, peer2) => | ||
for { | ||
_ <- peer2.importBlocksUntil(1000)(IdentityUpdate) | ||
_ <- peer1.connectToPeers(Set(peer2.node)) | ||
_ <- peer2.importBlocksUntil(2000)(updateStateAtBlock(1500)).startAndForget | ||
_ <- peer1.startFastSync().delayExecution(50.milliseconds) | ||
_ <- peer1.waitForFastSyncFinish() | ||
} yield { | ||
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) | ||
} | ||
} | ||
|
||
} | ||
|
||
object FastSyncItSpec { | ||
|
@@ -155,7 +223,32 @@ object FastSyncItSpec { | |
} | ||
} | ||
|
||
class FakePeer(peerName: String) extends SecureRandomBuilder with TestSyncConfig { | ||
case class HostConfig( | ||
maxBlocksHeadersPerMessage: Int, | ||
maxBlocksBodiesPerMessage: Int, | ||
maxReceiptsPerMessage: Int, | ||
maxMptComponentsPerMessage: Int | ||
) extends FastSyncHostConfiguration | ||
|
||
object HostConfig { | ||
def apply(): HostConfig = { | ||
val random: ThreadLocalRandom = ThreadLocalRandom.current() | ||
new HostConfig( | ||
maxBlocksHeadersPerMessage = random.nextInt(100, 201), | ||
maxBlocksBodiesPerMessage = random.nextInt(30, 51), | ||
maxReceiptsPerMessage = random.nextInt(30, 51), | ||
maxMptComponentsPerMessage = random.nextInt(100, 201) | ||
) | ||
} | ||
} | ||
|
||
final case class FakePeerCustomConfig(hostConfig: HostConfig) | ||
|
||
val defaultConfig = FakePeerCustomConfig(HostConfig(200, 200, 200, 200)) | ||
kapke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig) | ||
kapke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
extends SecureRandomBuilder | ||
with TestSyncConfig { | ||
implicit val akkaTimeout: Timeout = Timeout(5.second) | ||
|
||
val config = Config.config | ||
|
@@ -231,10 +324,10 @@ object FastSyncItSpec { | |
|
||
val peerConf = new PeerConfiguration { | ||
override val fastSyncHostConfiguration: FastSyncHostConfiguration = new FastSyncHostConfiguration { | ||
val maxBlocksHeadersPerMessage: Int = 200 | ||
val maxBlocksBodiesPerMessage: Int = 200 | ||
val maxReceiptsPerMessage: Int = 200 | ||
val maxMptComponentsPerMessage: Int = 200 | ||
val maxBlocksHeadersPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksHeadersPerMessage | ||
val maxBlocksBodiesPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksBodiesPerMessage | ||
val maxReceiptsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxReceiptsPerMessage | ||
val maxMptComponentsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxMptComponentsPerMessage | ||
} | ||
override val rlpxConfiguration: RLPxConfiguration = new RLPxConfiguration { | ||
override val waitForTcpAckTimeout: FiniteDuration = Timeouts.normalTimeout | ||
|
@@ -445,36 +538,80 @@ object FastSyncItSpec { | |
) | ||
}.toOption | ||
} | ||
|
||
def containsExpectedDataUpToAccountAtBlock(n: BigInt, blockNumber: BigInt): Boolean = { | ||
def go(i: BigInt): Boolean = { | ||
kapke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (i >= n) { | ||
true | ||
} else { | ||
val expectedBalance = i | ||
val accountAddress = Address(i) | ||
val accountExpectedCode = ByteString(i.toByteArray) | ||
val codeHash = kec256(accountExpectedCode) | ||
val accountExpectedStorageAddresses = (i until i + 20).toList | ||
val account = bl.getAccount(accountAddress, blockNumber).get | ||
val code = bl.getEvmCodeByHash(codeHash).get | ||
val storedData = accountExpectedStorageAddresses.map { addr => | ||
ByteUtils.toBigInt(bl.getAccountStorageAt(account.storageRoot, addr, ethCompatibleStorage = true)) | ||
} | ||
val haveAllStoredData = accountExpectedStorageAddresses.zip(storedData).forall { case (address, value) => | ||
address == value | ||
} | ||
|
||
val dataIsCorrect = | ||
account.balance.toBigInt == expectedBalance && code == accountExpectedCode && haveAllStoredData | ||
if (dataIsCorrect) { | ||
go(i + 1) | ||
} else { | ||
false | ||
} | ||
} | ||
} | ||
|
||
go(0) | ||
} | ||
} | ||
|
||
object FakePeer { | ||
def startFakePeer(peerName: String): Task[FakePeer] = { | ||
def startFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig): Task[FakePeer] = { | ||
for { | ||
peer <- Task(new FakePeer(peerName)) | ||
peer <- Task(new FakePeer(peerName, fakePeerCustomConfig)) | ||
_ <- peer.startPeer() | ||
} yield peer | ||
} | ||
|
||
def start1FakePeerRes(): Resource[Task, FakePeer] = { | ||
def start1FakePeerRes(fakePeerCustomConfig: FakePeerCustomConfig = defaultConfig): Resource[Task, FakePeer] = { | ||
Resource.make { | ||
startFakePeer("Peer1") | ||
startFakePeer("Peer1", fakePeerCustomConfig) | ||
} { peer => | ||
peer.shutdown() | ||
} | ||
} | ||
|
||
def start2FakePeersRes() = { | ||
def start2FakePeersRes( | ||
kapke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fakePeerCustomConfig1: FakePeerCustomConfig = defaultConfig, | ||
fakePeerCustomConfig2: FakePeerCustomConfig = defaultConfig | ||
) = { | ||
Resource.make { | ||
Task.parZip2(startFakePeer("Peer1"), startFakePeer("Peer2")) | ||
Task.parZip2(startFakePeer("Peer1", fakePeerCustomConfig1), startFakePeer("Peer2", fakePeerCustomConfig2)) | ||
} { case (peer, peer1) => Task.parMap2(peer.shutdown(), peer1.shutdown())((_, _) => ()) } | ||
} | ||
|
||
def start3FakePeersRes() = { | ||
def start3FakePeersRes( | ||
fakePeerCustomConfig1: FakePeerCustomConfig = defaultConfig, | ||
fakePeerCustomConfig2: FakePeerCustomConfig = defaultConfig, | ||
fakePeerCustomConfig3: FakePeerCustomConfig = defaultConfig | ||
) = { | ||
Resource.make { | ||
Task.parZip3(startFakePeer("Peer1"), startFakePeer("Peer2"), startFakePeer("Peer3")) | ||
Task.parZip3( | ||
startFakePeer("Peer1", fakePeerCustomConfig1), | ||
startFakePeer("Peer2", fakePeerCustomConfig2), | ||
startFakePeer("Peer3", fakePeerCustomConfig3) | ||
) | ||
} { case (peer, peer1, peer2) => | ||
Task.parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_, _, _) => ()) | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -325,7 +325,7 @@ mantis { | |
|
||
# During fast-sync when most up to date block is determined from peers, the actual target block number | ||
# will be decreased by this value | ||
target-block-offset = 128 | ||
target-block-offset = 32 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this value was taken from geth, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is part of my experiments with fast sync. It seems geth tries to have offset equal to 64 blocks. But their sync is much faster and they process a lot more nodes before updating to new target. |
||
|
||
# How often to query peers for new blocks after the top of the chain has been reached | ||
check-for-new-block-interval = 10.seconds | ||
|
Uh oh!
There was an error while loading. Please reload this page.