Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
just about done
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish committed Jan 17, 2019
1 parent a71049e commit d91761c
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public class RecursivePeerRefreshState {

private final SortedMap<BytesValue, MetadataPeer> oneTrueMap;

private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(2);
private final ScheduledExecutorService bondingScheduledExecutorService =
Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService neighboursScheduledExecutorService =
Executors.newScheduledThreadPool(1);
private final int timeoutPeriod;

RecursivePeerRefreshState(
Expand Down Expand Up @@ -68,7 +70,8 @@ public SortedMap<BytesValue, MetadataPeer> getOneTrueMap() {
}

void start() {
bondingInitiateRound();
final List<DiscoveryPeer> bondingRoundCandidatesList = bondingRoundCandidates(3, oneTrueMap);
bondingInitiateRound(bondingRoundCandidatesList);
}

void kickstartBootstrapPeers(final List<DiscoveryPeer> bootstrapPeers) {
Expand All @@ -86,20 +89,21 @@ private void bondingCancelOutstandingRequests() {
metadataPeer.setBondCancelled();
}
}
}

private void bondingInitiateRound() {
final List<DiscoveryPeer> bondingRoundCandidatesList =
bondingRoundCandidates(oneTrueMap.size(), oneTrueMap);
if (bondingRoundCandidatesList.size() > 0) {
bondingInitiateRound(bondingRoundCandidatesList);
}
}

private void bondingInitiateRound(final List<DiscoveryPeer> bondingRoundCandidatesList) {
for (DiscoveryPeer discoPeer : bondingRoundCandidatesList) {
final MetadataPeer metadataPeer = oneTrueMap.get(discoPeer.getId());
metadataPeer.setBondQueried();
pingDispatcher.ping(discoPeer);
}

final Runnable bondingCancellationTimerTask = this::bondingCancelOutstandingRequests;
scheduledExecutorService.schedule(
bondingScheduledExecutorService.schedule(
bondingCancellationTimerTask, timeoutPeriod, TimeUnit.SECONDS);
}

Expand All @@ -110,13 +114,34 @@ private void neighboursCancelOutstandingRequests() {
metadataPeer.setNeighbourCancelled();
}
}
final List<DiscoveryPeer> neighboursRoundCandidatesList =
neighboursRoundCandidates(3, oneTrueMap);
if (neighboursRoundCandidatesList.size() > 0) {
neighboursInitiateRound(neighboursRoundCandidatesList);
}
}

private void neighboursInitiateRound() {
// TODO: Terminating condition...
private boolean terminationConditionSatisfied() {
for (Map.Entry<BytesValue, MetadataPeer> candidateEntry : oneTrueMap.entrySet()) {
final MetadataPeer candidate = candidateEntry.getValue();
if (candidate.getBondQueried()) {
if (!(candidate.getBondResponded() || candidate.getBondCancelled())) {
return false;
}
}
if (candidate.getNeighbourQueried()) {
if (!(candidate.getNeighbourResponded() || candidate.getNeighbourCancelled())) {
return false;
}
}
}
return true;
}

final List<DiscoveryPeer> neighboursRoundCandidatesList =
neighboursRoundCandidates(3, oneTrueMap);
private void neighboursInitiateRound(final List<DiscoveryPeer> neighboursRoundCandidatesList) {
if (terminationConditionSatisfied()) {
return;
}

for (DiscoveryPeer discoPeer : neighboursRoundCandidatesList) {
findNeighbourDispatcher.findNeighbours(discoPeer, target);
Expand All @@ -125,7 +150,7 @@ private void neighboursInitiateRound() {
}

final Runnable neighboursCancellationTimerTask = this::neighboursCancelOutstandingRequests;
scheduledExecutorService.schedule(
neighboursScheduledExecutorService.schedule(
neighboursCancellationTimerTask, timeoutPeriod, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -155,15 +180,19 @@ void onNeighboursPacketReceived(
metadataPeer.setNeighbourResponded();

if (neighboursRoundTermination()) {
bondingInitiateRound();
final List<DiscoveryPeer> bondingRoundCandidatesList =
bondingRoundCandidates(oneTrueMap.size(), oneTrueMap);
bondingInitiateRound(bondingRoundCandidatesList);
}
}

void onPongPacketReceived(final DiscoveryPeer peer) {
final MetadataPeer iterationParticipant = oneTrueMap.get(peer.getId());
iterationParticipant.setBondResponded();
if (bondingRoundTermination()) {
neighboursInitiateRound();
final List<DiscoveryPeer> neighboursRoundCandidatesList =
neighboursRoundCandidates(3, oneTrueMap);
neighboursInitiateRound(neighboursRoundCandidatesList);
}
}

Expand Down Expand Up @@ -225,7 +254,11 @@ private List<DiscoveryPeer> neighboursRoundCandidates(
break;
}
final MetadataPeer candidate = candidateEntry.getValue();
if (candidate.getBondQueried() && candidate.getBondResponded()) {
if (candidate.getBondQueried()
&& candidate.getBondResponded()
&& !candidate.getNeighbourCancelled()
&& !candidate.getNeighbourQueried()
&& !candidate.getNeighbourResponded()) {
candidatesList.add(candidate.getPeer());
count++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ public void shouldAddNewPeerWhenReceivedPongAndPeerTableBucketIsFull() {

controller.start();

verify(outboundMessageHandler, times(16)).send(any(), matchPacketOfType(PacketType.PING));
verify(outboundMessageHandler, times(3)).send(any(), matchPacketOfType(PacketType.PING));

final Packet pongPacket =
MockPacketDataFactory.mockPongPacket(peers.get(0), pingPacket.getHash());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ public class RecursivePeerRefreshStateTest {
public void setup() throws Exception {
final JsonNode peers =
MAPPER.readTree(RecursivePeerRefreshStateTest.class.getResource("/peers.json"));
recursivePeerRefreshState =
new RecursivePeerRefreshState(
target,
new PeerBlacklist(),
new NodeWhitelistController(PermissioningConfiguration.createDefault()),
bondingAgent,
neighborFinder,
30);

peer_000 = (TestPeer) generatePeer(peers);

Expand Down Expand Up @@ -199,6 +191,15 @@ private boolean matchPeerToCorrespondingPacketData(

@Test
public void shouldIssueRequestToPeerWithLesserDistanceGreaterHops() {
recursivePeerRefreshState =
new RecursivePeerRefreshState(
target,
new PeerBlacklist(),
new NodeWhitelistController(PermissioningConfiguration.createDefault()),
bondingAgent,
neighborFinder,
30);

recursivePeerRefreshState.kickstartBootstrapPeers(Collections.singletonList(peer_000));
recursivePeerRefreshState.start();

Expand Down Expand Up @@ -267,32 +268,39 @@ public void shouldIssueRequestToPeerWithLesserDistanceGreaterHops() {
}

@Test
public void shouldIssueRequestToPeerWithGreaterDistanceOnExpirationOfLowerDistancePeerRequest() {
// recursivePeerRefreshState.kickstartBootstrapPeers(Collections.singletonList(peer_000));
// recursivePeerRefreshState.neighboursTimeoutEvaluation();
//
// verify(neighborFinder, never()).findNeighbours(peer_000, target);
// verify(bondingAgent).ping(peer_000);
//
// recursivePeerRefreshState.onPongPacketReceived(peer_000);
//
// recursivePeerRefreshState.onNeighboursPacketReceived(neighborsPacketData_000, peer_000);
//
// recursivePeerRefreshState.onPongPacketReceived(peer_010);
// recursivePeerRefreshState.onPongPacketReceived(peer_011);
// recursivePeerRefreshState.onPongPacketReceived(peer_012);
// recursivePeerRefreshState.onPongPacketReceived(peer_013);
//
// recursivePeerRefreshState.neighboursTimeoutEvaluation();
//
// verify(neighborFinder, never()).findNeighbours(peer_010, target);
// verify(neighborFinder).findNeighbours(peer_011, target);
// verify(neighborFinder).findNeighbours(peer_012, target);
// verify(neighborFinder).findNeighbours(peer_013, target);
//
// recursivePeerRefreshState.neighboursTimeoutEvaluation();
//
// verify(neighborFinder).findNeighbours(peer_010, target);
public void shouldIssueRequestToPeerWithGreaterDistanceOnExpirationOfLowerDistancePeerRequest()
throws InterruptedException {
final RecursivePeerRefreshState recursivePeerRefreshState0 =
new RecursivePeerRefreshState(
target,
new PeerBlacklist(),
new NodeWhitelistController(PermissioningConfiguration.createDefault()),
bondingAgent,
neighborFinder,
1);

recursivePeerRefreshState0.kickstartBootstrapPeers(Collections.singletonList(peer_000));
recursivePeerRefreshState0.start();

verify(neighborFinder, never()).findNeighbours(peer_000, target);
verify(bondingAgent).ping(peer_000);

recursivePeerRefreshState0.onPongPacketReceived(peer_000);
recursivePeerRefreshState0.onNeighboursPacketReceived(peer_000, neighborsPacketData_000);

recursivePeerRefreshState0.onPongPacketReceived(peer_010);
recursivePeerRefreshState0.onPongPacketReceived(peer_011);
recursivePeerRefreshState0.onPongPacketReceived(peer_012);
recursivePeerRefreshState0.onPongPacketReceived(peer_013);

verify(neighborFinder, never()).findNeighbours(peer_010, target);
verify(neighborFinder).findNeighbours(peer_011, target);
verify(neighborFinder).findNeighbours(peer_012, target);
verify(neighborFinder).findNeighbours(peer_013, target);

Thread.sleep(2000);

verify(neighborFinder).findNeighbours(peer_010, target);
}

private DiscoveryPeer generatePeer(final JsonNode peer) {
Expand Down

0 comments on commit d91761c

Please sign in to comment.