Skip to content

Commit

Permalink
Fix sync timeout issue (hyperledger#1149)
Browse files Browse the repository at this point in the history
This PR fixes an error when downloading chain (Async operation failed)

When there are several blocks which are very large (> 12M) and which are requested in the same segment (by default 200) we can have timeouts and never manage to synchronize. This modification will make it possible to gradually reduce the size of the segment with each attempt. Then the segment resumes its default size for the next blocks

If the reduction is not enough at the last attempt we try with a single block

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
  • Loading branch information
matkt authored Jun 29, 2020
1 parent b0115a2 commit 1755648
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,12 @@ private boolean isRetryableError(final Throwable error) {

return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError);
}

public int getRetryCount() {
return retryCount;
}

public int getMaxRetries() {
return maxRetries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
*/
public class CompleteBlocksTask extends AbstractRetryingPeerTask<List<Block>> {
private static final Logger LOG = LogManager.getLogger();

private static final int MIN_SIZE_INCOMPLETE_LIST = 1;
private static final int DEFAULT_RETRIES = 3;

private final EthContext ethContext;
Expand Down Expand Up @@ -136,8 +138,19 @@ private CompletableFuture<List<Block>> processBodiesResult(final List<Block> blo
}

private List<BlockHeader> incompleteHeaders() {
return headers.stream()
.filter(h -> blocks.get(h.getNumber()) == null)
.collect(Collectors.toList());
final List<BlockHeader> collectedHeaders =
headers.stream()
.filter(h -> blocks.get(h.getNumber()) == null)
.collect(Collectors.toList());
if (getRetryCount() > 1) {
final int subSize = (int) Math.ceil((double) collectedHeaders.size() / getRetryCount());
if (getRetryCount() > getMaxRetries()) {
return collectedHeaders.subList(0, MIN_SIZE_INCOMPLETE_LIST);
} else {
return collectedHeaders.subList(0, subSize);
}
}

return collectedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.manager.ethtaskutils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
Expand Down Expand Up @@ -62,6 +63,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
protected static MetricsSystem metricsSystem = new NoOpMetricsSystem();
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
protected EthPeers ethPeers;
protected TransactionPool transactionPool;
protected AtomicBoolean peersDoTimeout;
protected AtomicInteger peerCountToTimeout;
Expand All @@ -81,7 +83,7 @@ public static void setup() {
public void setupTest() {
peersDoTimeout = new AtomicBoolean(false);
peerCountToTimeout = new AtomicInteger(0);
final EthPeers ethPeers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem);
ethPeers = spy(new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem));
final EthMessages ethMessages = new EthMessages();
final EthScheduler ethScheduler =
new DeterministicEthScheduler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,39 @@

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.junit.Test;
import org.mockito.ArgumentCaptor;

public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>> {

@Override
protected List<Block> generateDataToBeRequested() {
return generateDataToBeRequested(3);
}

protected List<Block> generateDataToBeRequested(final int nbBlock) {
// Setup data to be requested and expected response
final List<Block> blocks = new ArrayList<>();
for (long i = 0; i < 3; i++) {
for (long i = 0; i < nbBlock; i++) {
final BlockHeader header = blockchain.getBlockHeader(10 + i).get();
final BlockBody body = blockchain.getBlockBody(header.getHash()).get();
blocks.add(new Block(header, body));
Expand Down Expand Up @@ -67,4 +78,50 @@ public void shouldCompleteWithoutPeersWhenAllBlocksAreEmpty() {
final EthTask<List<Block>> task = createTask(blocks);
assertThat(task.run()).isCompletedWithValue(blocks);
}

@SuppressWarnings("unchecked")
@Test
public void shouldReduceTheBlockSegmentSizeAfterEachRetry() {

peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(10);

final EthTask<List<Block>> task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();

ArgumentCaptor<Long> blockNumbersCaptor = ArgumentCaptor.forClass(Long.class);

verify(ethPeers, times(4))
.executePeerRequest(
any(PeerRequest.class), blockNumbersCaptor.capture(), any(Optional.class));

assertThat(future.isDone()).isFalse();
assertThat(blockNumbersCaptor.getAllValues().get(0)).isEqualTo(19);
assertThat(blockNumbersCaptor.getAllValues().get(1)).isEqualTo(14);
assertThat(blockNumbersCaptor.getAllValues().get(2)).isEqualTo(13);
assertThat(blockNumbersCaptor.getAllValues().get(3)).isEqualTo(10);
}

@SuppressWarnings("unchecked")
@Test
public void shouldNotReduceTheBlockSegmentSizeIfOnlyOneBlockNeeded() {

peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(1);

final EthTask<List<Block>> task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();

ArgumentCaptor<Long> blockNumbersCaptor = ArgumentCaptor.forClass(Long.class);

verify(ethPeers, times(4))
.executePeerRequest(
any(PeerRequest.class), blockNumbersCaptor.capture(), any(Optional.class));

assertThat(future.isDone()).isFalse();
assertThat(blockNumbersCaptor.getAllValues().get(0)).isEqualTo(10);
assertThat(blockNumbersCaptor.getAllValues().get(1)).isEqualTo(10);
assertThat(blockNumbersCaptor.getAllValues().get(2)).isEqualTo(10);
assertThat(blockNumbersCaptor.getAllValues().get(3)).isEqualTo(10);
}
}

0 comments on commit 1755648

Please sign in to comment.