Skip to content

Commit

Permalink
PBFT consensus (#112)
Browse files Browse the repository at this point in the history
* [WIP] Basic skeleton for PBFT consensus (#99)

* Message types are implicit

* Some semblence of a pBFT cluster

* Add execution, sending response to orign, and handle messages on the "b7s" protocol

* Ensure execution is done only once

* Add check that incoming messages are in the correct view

* Add state locking

* Broadcast is concurrent and network operations are limited in duration

* Add state locking for general Blockless protocol too

* [pBFT] View change implementation (#102)

* WIP: View change initialization (sending)

* Processing view change messages (starting view change if condition met)

* Adding a new-view message with valid preprepares

* Handling new view message

* Small tweaks and comment updates

* Determine which messages are allowed in which stage at a single place

* Minor tweaks to request and preprepare handling

* Ensure requests are executed in order (sequence number accounting) + minor tweaks (accounting commits)

* View change/New view logging tweaks

* Splitting commit function into two

* Use 'AfterFunc' for view change instead of timer

* Fix stopping a timer thats not running + broadcast error handling

* Fix commit quorum condition

* Fix log message for not sending a commit

* Fix log message if the request has already been executed

* Logging tweaks - decrease log levels and remove redundant log messages

* Code for byzantine replica plus few small improvements/fixes

* Fix new view preprepare set (starting from 1) + fix JSON handling of new-view/view-change

* Add serialization/deserialization code for message types

* Tidy up marshallers/unmarshallers

* Checking if request is commit-able after a preprepare

* Cache past executions + cleanup old state after transition to a new view

* New primary resubmits request that it received but havent been executed yet

* Delayed view change messages AFTER we successfully transitioned to a new view are no-op

* Several fixes related to view changes

- new primary records the generated preprepares
- sequence number check corrected
- checking for prepared requests corrected

* Introducing PBFT as a consensus option for execution requests (#104)

* Plucking out raft from the "node" package

* FormCluster message specifies the type of consensus expected

* Fix bootstrap check on new Raft cluster

* Minor tweaks, raft returns a multierror on shutdown errors

* Add support for PBFT consensus to node execution

* PBFT replica no longer listens on the Blockless general protocol (only PBFT specific protocol)

* Rename Raft handler => replica

* File rename and const rename

* Tweak how we treat execution requests on the worker node

* Refactor roll call/cluster formation

* Remove mentions of a "quorum" in node code since its imprecise

* PBFT uses a cluster-specific protocol for messaging

* Add callback for caching execution results in PBFT

* Moving definition for the Executor to the `blockless` package to remove local definitions

* Add signing and signature verification of preprepare, prepare and commit messages

* Add signing for view changes and new views

* View change and new view messages validated

* Remove registered stream handler for PBFT

* Fix new view message processing

- signature marshalling
- timestamp fixed - set by the head node

* View change timer corner case fixed - trigerring an obsoleted view change

* Remove obsolete comment

* Result signing + per-consensus result processing

* Correct retcode

* Fix execution output

* Update import paths

* Update test (roll call)

* Add execution request signing

* Fix log message
  • Loading branch information
Maelkum authored Sep 6, 2023
1 parent 9bb2983 commit c03db43
Show file tree
Hide file tree
Showing 64 changed files with 4,330 additions and 657 deletions.
33 changes: 33 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package consensus

import (
"fmt"
)

// Type identifies consensus protocols suported by Blockless.
type Type uint

const (
Raft Type = iota + 1
PBFT
)

func (t Type) String() string {
switch t {
case Raft:
return "Raft"
case PBFT:
return "PBFT"
default:
return fmt.Sprintf("unknown: %d", t)
}
}

func (t Type) Valid() bool {
switch t {
case Raft, PBFT:
return true
default:
return false
}
}
139 changes: 139 additions & 0 deletions consensus/pbft/commit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package pbft

import (
"fmt"

"github.com/libp2p/go-libp2p/core/peer"
)

func (r *Replica) maybeSendCommit(view uint, sequenceNo uint, digest string) error {

log := r.log.With().Uint("view", view).Uint("sequence_number", sequenceNo).Str("digest", digest).Logger()

if !r.shouldSendCommit(view, sequenceNo, digest) {
log.Info().Msg("not sending commit")
return nil
}

log.Info().Msg("request prepared, broadcasting commit")

err := r.sendCommit(view, sequenceNo, digest)
if err != nil {
return fmt.Errorf("could not send commit message: %w", err)
}

if !r.committed(view, sequenceNo, digest) {
log.Info().Msg("request is not yet committed")
return nil
}

log.Info().Msg("request committed, executing")

return r.execute(view, sequenceNo, digest)
}

func (r *Replica) shouldSendCommit(view uint, sequenceNo uint, digest string) bool {

log := r.log.With().Uint("view", view).Uint("sequence_number", sequenceNo).Str("digest", digest).Logger()

if !r.prepared(view, sequenceNo, digest) {
log.Info().Msg("request not yet prepared, commit not due yet")
return false
}

// Have we already sent a commit message?
msgID := getMessageID(view, sequenceNo)
commits, ok := r.commits[msgID]
if ok {
_, sent := commits.m[r.id]
if sent {
log.Info().Msg("commit for this request already broadcast")
return false
}
}

return true
}

func (r *Replica) sendCommit(view uint, sequenceNo uint, digest string) error {

log := r.log.With().Uint("view", view).Uint("sequence_number", sequenceNo).Str("digest", digest).Logger()

log.Info().Msg("broadcasting commit message")

commit := Commit{
View: view,
SequenceNumber: sequenceNo,
Digest: digest,
}

err := r.sign(&commit)
if err != nil {
return fmt.Errorf("could not sign commit message: %w", err)
}

err = r.broadcast(commit)
if err != nil {
return fmt.Errorf("could not broadcast commit message: %w", err)
}

log.Info().Msg("commit message successfully broadcast")

// Record this commit message.
r.recordCommitReceipt(r.id, commit)

return nil
}

func (r *Replica) processCommit(replica peer.ID, commit Commit) error {

log := r.log.With().Str("replica", replica.String()).Uint("view", commit.View).Uint("sequence_no", commit.SequenceNumber).Str("digest", commit.Digest).Logger()

log.Info().Msg("received commit message")

if commit.View != r.view {
return fmt.Errorf("commit has an invalid view value (received: %v, current: %v)", commit.View, r.view)
}

err := r.verifySignature(&commit, replica)
if err != nil {
return fmt.Errorf("could not validate commit signature: %w", err)
}

r.recordCommitReceipt(replica, commit)

if !r.committed(commit.View, commit.SequenceNumber, commit.Digest) {
log.Info().Msg("request is not yet committed")
return nil
}

err = r.execute(commit.View, commit.SequenceNumber, commit.Digest)
if err != nil {
return fmt.Errorf("request execution failed: %w", err)

}

return nil
}

func (r *Replica) recordCommitReceipt(replica peer.ID, commit Commit) {

msgID := getMessageID(commit.View, commit.SequenceNumber)
commits, ok := r.commits[msgID]
if !ok {
r.commits[msgID] = newCommitReceipts()
commits = r.commits[msgID]
}

commits.Lock()
defer commits.Unlock()

// Have we already seen this commit?
_, exists := commits.m[replica]
if exists {
r.log.Warn().Uint("view", commit.View).Uint("sequence", commit.SequenceNumber).Str("digest", commit.Digest).Msg("ignoring duplicate commit")
return
}

commits.m[replica] = commit
}
93 changes: 93 additions & 0 deletions consensus/pbft/conditions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package pbft

func (r *Replica) prePrepared(view uint, sequenceNo uint, digest string) bool {

// NOTE: Digest can be empty (NullRequest).

// Have we seen this request before?
_, seen := r.requests[digest]
if !seen {
return false
}

// Do we have a pre-prepare for this request?
preprepare, seen := r.preprepares[getMessageID(view, sequenceNo)]
if !seen {
return false
}

if preprepare.Digest != digest {
return false
}

return true
}

func (r *Replica) prepared(view uint, sequenceNo uint, digest string) bool {

// Check if we have seen this request before.
// NOTE: This is also checked as part of the pre-prepare check.
_, seen := r.requests[digest]
if !seen {
return false
}

// Is the pre-prepare condition met for this request?
if !r.prePrepared(view, sequenceNo, digest) {
return false
}

prepares, ok := r.prepares[getMessageID(view, sequenceNo)]
if !ok {
return false
}

prepareCount := uint(len(prepares.m))
haveQuorum := prepareCount >= r.prepareQuorum()

r.log.Debug().Str("digest", digest).Uint("view", view).Uint("sequence_no", sequenceNo).
Uint("quorum", prepareCount).Bool("have_quorum", haveQuorum).
Msg("number of prepares for a request")

return haveQuorum
}

func (r *Replica) committed(view uint, sequenceNo uint, digest string) bool {

// Is the prepare condition met for this request?
if !r.prepared(view, sequenceNo, digest) {
return false
}

commits, ok := r.commits[getMessageID(view, sequenceNo)]
if !ok {
return false
}

commitCount := uint(len(commits.m))
haveQuorum := commitCount >= r.commitQuorum()

r.log.Debug().Str("digest", digest).Uint("view", view).Uint("sequence_no", sequenceNo).
Uint("quorum", commitCount).Bool("have_quorum", haveQuorum).
Msg("number of commits for a request")

return haveQuorum
}

func (r *Replica) viewChangeReady(view uint) bool {

vc, ok := r.viewChanges[view]
if !ok {
return false
}

vc.Lock()
defer vc.Unlock()

vcCount := uint(len(vc.m))
haveQuorum := vcCount >= r.commitQuorum()

r.log.Debug().Uint("view", view).Uint("quorum", vcCount).Bool("have_quorum", haveQuorum).Msg("number of view change messages for a view")

return haveQuorum
}
49 changes: 49 additions & 0 deletions consensus/pbft/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pbft

import (
"time"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/models/execute"
)

// Option can be used to set PBFT configuration options.
type Option func(*Config)

// PostProcessFunc is invoked by the replica after execution is done.
type PostProcessFunc func(requestID string, origin peer.ID, request execute.Request, result execute.Result)

var DefaultConfig = Config{
NetworkTimeout: NetworkTimeout,
RequestTimeout: RequestTimeout,
}

type Config struct {
PostProcessors []PostProcessFunc // Callback functions to be invoked after execution is done.
NetworkTimeout time.Duration
RequestTimeout time.Duration
}

// WithNetworkTimeout sets how much time we allow for message sending.
func WithNetworkTimeout(d time.Duration) Option {
return func(cfg *Config) {
cfg.NetworkTimeout = d
}
}

// WithRequestTimeout sets the inactivity period before we trigger a view change.
func WithRequestTimeout(d time.Duration) Option {
return func(cfg *Config) {
cfg.RequestTimeout = d
}
}

// WithPostProcessors sets the callbacks that will be invoked after execution.
func WithPostProcessors(callbacks ...PostProcessFunc) Option {
return func(cfg *Config) {
var fns []PostProcessFunc
fns = append(fns, callbacks...)
cfg.PostProcessors = fns
}
}
73 changes: 73 additions & 0 deletions consensus/pbft/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package pbft

type pbftCore struct {
// Number of replicas in the cluster.
n uint

// Number of byzantine replicas we can tolerate.
f uint

// Sequence number.
sequence uint

// ViewNumber.
view uint
}

func newPbftCore(total uint) pbftCore {

return pbftCore{
sequence: 0,
view: 0,
n: total,
f: calcByzantineTolerance(total),
}
}

// given a view number, return the index of the expected primary.
func (c pbftCore) primary(v uint) uint {
return v % c.n
}

// return the index of the expected primary for the current view.
func (c pbftCore) currentPrimary() uint {
return c.view % c.n
}

func (c pbftCore) prepareQuorum() uint {
return 2 * c.f
}

func (c pbftCore) commitQuorum() uint {
return 2*c.f + 1
}

// MinClusterResults returns the number of identical results client should expect from the
// cluster before accepting the result as valid. The number is f+1.
func MinClusterResults(n uint) uint {
return calcByzantineTolerance(n) + 1
}

// based on the number of replicas, determine how many byzantine replicas we can tolerate.
func calcByzantineTolerance(n uint) uint {

if n <= 1 {
return 0
}

f := (n - 1) / 3
return f
}

// messageID is used to identify a specific point in time as view + sequence number combination.
type messageID struct {
view uint
sequence uint
}

func getMessageID(view uint, sequenceNo uint) messageID {
return messageID{
view: view,
sequence: sequenceNo,
}
}
Loading

0 comments on commit c03db43

Please sign in to comment.