Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Ibft Round to update internal state on reception of NewRound Message #451

Merged
merged 28 commits into from
Dec 19, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Reworked buffering to be generic
  • Loading branch information
tmohay committed Dec 18, 2018
commit f2d3ea39b4a2bef2a0d5542b0180c0b8a15c362e
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public IbftStateMachine(final IbftBlockCreatorFactory blockCreatorFactory) {
/**
* Attempt to consume the event and update the maintained state
*
* @param event the external action that has occurred
* @param event the external Action that has occurred
* @param roundTimer timer that will fire expiry events that are expected to be received back into
* this machine
* @return whether this event was consumed or requires reprocessing later once the state machine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import java.util.function.Consumer;
import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparedCertificate;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload;
Expand Down Expand Up @@ -61,7 +63,7 @@ public class IbftBlockHeightManager {
private final Function<ConsensusRoundIdentifier, RoundState> roundStateCreator;
private final IbftFinalState finalState;

private Optional<PreparedCertificate> bestPreparedCertificate = Optional.empty();
private Optional<PreparedCertificate> latestPreparedCertificate = Optional.empty();

private IbftRound currentRound;

Expand Down Expand Up @@ -99,6 +101,17 @@ public void start() {
}
}

public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) {
if (roundIdentifier.equals(currentRound.getRoundIdentifier())) {
currentRound.createAndSendProposalMessage(clock.millis() / 1000);
} else {
LOG.info(
"Block timer expired for a round ({}) other than current ({})",
roundIdentifier,
currentRound.getRoundIdentifier());
}
}

public void roundExpired(final RoundExpiry expire) {
if (!expire.getView().equals(currentRound.getRoundIdentifier())) {
LOG.debug("Ignoring Round timer expired which does not match current round.");
Expand All @@ -110,15 +123,15 @@ public void roundExpired(final RoundExpiry expire) {
currentRound.createPrepareCertificate();

if (preparedCertificate.isPresent()) {
bestPreparedCertificate = preparedCertificate;
latestPreparedCertificate = preparedCertificate;
}

startNewRound(currentRound.getRoundIdentifier().getRoundNumber() + 1);

final SignedData<RoundChangePayload> localRoundChange =
messageFactory.createSignedRoundChangePayload(
currentRound.getRoundIdentifier(), bestPreparedCertificate);
transmitter.multicastRoundChange(currentRound.getRoundIdentifier(), bestPreparedCertificate);
currentRound.getRoundIdentifier(), latestPreparedCertificate);
transmitter.multicastRoundChange(currentRound.getRoundIdentifier(), latestPreparedCertificate);

// Its possible the locally created RoundChange triggers the transmission of a NewRound
// message - so it must be handled accordingly.
Expand All @@ -127,50 +140,39 @@ public void roundExpired(final RoundExpiry expire) {

public void handleProposalMessage(final SignedData<ProposalPayload> msg) {
LOG.info("Received a Proposal message.");
final ConsensusRoundIdentifier msgRoundId = msg.getPayload().getRoundIdentifier();
if (msgRoundId.equals(currentRound.getRoundIdentifier())) {
currentRound.handleProposalMessage(msg);
} else if (msgRoundId.getRoundNumber() > currentRound.getRoundIdentifier().getRoundNumber()) {
futureRoundStateBuffer.computeIfAbsent(
msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId));
futureRoundStateBuffer.get(msgRoundId.getRoundNumber()).setProposedBlock(msg);
}
actionOrBufferMessage(msg,
() -> currentRound.handleProposalMessage(msg),
(roundstate) -> roundstate.setProposedBlock(msg));
}

public void handlePrepareMessage(final SignedData<PreparePayload> msg) {
LOG.info("Received a prepare message.");
final ConsensusRoundIdentifier msgRoundId = msg.getPayload().getRoundIdentifier();
if (msgRoundId.equals(currentRound.getRoundIdentifier())) {
currentRound.handlePrepareMessage(msg);
} else if (msgRoundId.getRoundNumber() > currentRound.getRoundIdentifier().getRoundNumber()) {
futureRoundStateBuffer.computeIfAbsent(
msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId));
futureRoundStateBuffer.get(msgRoundId.getRoundNumber()).addPrepareMessage(msg);
}
actionOrBufferMessage(msg,
() -> currentRound.handlePrepareMessage(msg),
(roundstate) -> roundstate.addPrepareMessage(msg));
}

public void handleCommitMessage(final SignedData<CommitPayload> msg) {
LOG.info("Received a commit message.");
actionOrBufferMessage(msg,
() -> currentRound.handleCommitMessage(msg),
(roundstate) -> roundstate.addCommitMessage(msg));
}

private void actionOrBufferMessage(final SignedData<? extends Payload> msg,
final Action handler, final Consumer<RoundState> buffer) {
final ConsensusRoundIdentifier msgRoundId = msg.getPayload().getRoundIdentifier();
if (msgRoundId.equals(currentRound.getRoundIdentifier())) {
currentRound.handleCommitMessage(msg);
handler.apply();
} else if (msgRoundId.getRoundNumber() > currentRound.getRoundIdentifier().getRoundNumber()) {
futureRoundStateBuffer.computeIfAbsent(
final RoundState roundstate = futureRoundStateBuffer.computeIfAbsent(
msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId));
futureRoundStateBuffer.get(msgRoundId.getRoundNumber()).addCommitMessage(msg);
buffer.accept(roundstate);
}
}

public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) {
if (roundIdentifier.equals(currentRound.getRoundIdentifier())) {
currentRound.createAndSendProposalMessage(clock.millis() / 1000);
} else {
LOG.info(
"BLock timer expired for a round ({}) other than current ({})",
roundIdentifier,
currentRound.getRoundIdentifier());
}
}
private void upddateState(final RoundState state)


public void handleRoundChangeMessage(final SignedData<RoundChangePayload> msg) {
final Optional<RoundChangeCertificate> result =
Expand Down Expand Up @@ -198,18 +200,26 @@ private void startNewRound(final int roundNumber) {
currentRound = roundFactory.createNewRound(parentHeader, roundNumber);
}
// discard roundChange messages from the current and previous rounds
roundChangeManager.discardCompletedRound(currentRound.getRoundIdentifier());
roundChangeManager.discardRoundsPriorTo(currentRound.getRoundIdentifier());
roundTimer.startTimer(currentRound.getRoundIdentifier());
}

public void handleNewRoundMessage(final SignedData<NewRoundPayload> msg) {
if (newRoundMessageValidator.validateNewRoundMessage(msg)) {
startNewRound(msg.getPayload().getRoundIdentifier().getRoundNumber());
final ConsensusRoundIdentifier targetRound = msg.getPayload().getRoundIdentifier();
if (targetRound.getRoundNumber() > currentRound.getRoundIdentifier().getRoundNumber()) {
startNewRound(targetRound.getRoundNumber());
}
currentRound.handleProposalMessage(msg.getPayload().getProposalPayload());
}
}

public long getChainHeight() {
return currentRound.getRoundIdentifier().getSequenceNumber();
}

@FunctionalInterface
public interface Action {
void apply();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private RoundChangeStatus storeRoundChangeMessage(final SignedData<RoundChangePa
*
* @param completedRoundIdentifier round identifier that has been identified as superseded
*/
public void discardCompletedRound(final ConsensusRoundIdentifier completedRoundIdentifier) {
public void discardRoundsPriorTo(final ConsensusRoundIdentifier completedRoundIdentifier) {
roundChangeCache
.entrySet()
.removeIf(entry -> isAnEarlierRound(entry.getKey(), completedRoundIdentifier));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void discardsRoundPreviousToThatRequested() {
.isEqualTo(Optional.empty());
assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator2))
.isEqualTo(Optional.empty());
manager.discardCompletedRound(ri2);
manager.discardRoundsPriorTo(ri2);
assertThat(manager.roundChangeCache.get(ri1)).isNull();
assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1);
assertThat(manager.roundChangeCache.get(ri3).receivedMessages.size()).isEqualTo(1);
Expand Down