Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making sure that the resolved DNS lists get used in full #3071

Merged
merged 4 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -376,6 +377,7 @@ public Collection<PeerConnection> getPeers() {
public Stream<DiscoveryPeer> streamDiscoveredPeers() {
List<DiscoveryPeer> peers = dnsPeers.get();
if (peers != null) {
Collections.shuffle(peers);
return Stream.concat(peerDiscoveryAgent.streamDiscoveredPeers(), peers.stream());
}
return peerDiscoveryAgent.streamDiscoveredPeers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also shuffle this part ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, unfortunately streams don't have a convenient method to shuffle the elements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffling a stream goes against the "lazy" nature of a stream I would say. To shuffle it you would need to process all elements at least once. So you need to collect, shuffle, and convert result to another stream.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,11 @@ public void connect(final Stream<? extends Peer> peerStream) {
if (!localNode.isReady()) {
return;
}
final int availablePeerSlots = Math.max(0, maxConnections - getConnectionCount());
peerStream
.takeWhile(peer -> Math.max(0, maxConnections - getConnectionCount()) > 0)
.filter(peer -> !connectionsById.containsKey(peer.getId()))
.filter(peer -> peer.getEnodeURL().isListening())
.filter(peerPermissions::allowNewOutboundConnectionTo)
.limit(availablePeerSlots)
.forEach(this::connect);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,22 @@ public void connect_largeStreamOfPeers() {
verify(agent, times(maxPeers)).connect(any(Peer.class));
}

@Test
public void connect_largeStreamOfPeersFirstFewImpostors() {
final int maxPeers = 5;
final int impostorsCount = 5;
connectionInitializer.setAutoDisconnectCounter(impostorsCount);
final Stream<Peer> peerStream = Stream.generate(PeerTestHelper::createPeer).limit(20);

startAgentWithMaxPeers(maxPeers);
agent = spy(agent);
agent.connect(peerStream);

assertThat(agent.getConnectionCount()).isEqualTo(maxPeers);
// Check that stream was not fully iterated
verify(agent, times(maxPeers + impostorsCount)).connect(any(Peer.class));
}

@Test
public void disconnect() throws ExecutionException, InterruptedException {
startAgent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.util.Subscribers;

import java.net.InetSocketAddress;
Expand All @@ -32,6 +33,7 @@ public class MockConnectionInitializer implements ConnectionInitializer {
private boolean autocompleteConnections = true;
private final Map<Peer, CompletableFuture<PeerConnection>> incompleteConnections =
new HashMap<>();
private int autoDisconnectCounter = 0;

public MockConnectionInitializer(final PeerConnectionEventDispatcher eventDispatcher) {
this.eventDispatcher = eventDispatcher;
Expand Down Expand Up @@ -72,6 +74,12 @@ public void subscribeIncomingConnect(final ConnectCallback callback) {

@Override
public CompletableFuture<PeerConnection> connect(final Peer peer) {
if (autoDisconnectCounter > 0) {
autoDisconnectCounter--;
MockPeerConnection mockPeerConnection = MockPeerConnection.create(peer, eventDispatcher);
mockPeerConnection.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING);
return CompletableFuture.completedFuture(mockPeerConnection);
}
if (autocompleteConnections) {
return CompletableFuture.completedFuture(MockPeerConnection.create(peer, eventDispatcher));
} else {
Expand All @@ -80,4 +88,8 @@ public CompletableFuture<PeerConnection> connect(final Peer peer) {
return future;
}
}

public void setAutoDisconnectCounter(final int autoDisconnectCounter) {
this.autoDisconnectCounter = autoDisconnectCounter;
}
}