Skip to content

Commit

Permalink
Allow the FSM to return a response to Apply
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Nov 18, 2013
1 parent 5bbbe6d commit b89c454
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
2 changes: 1 addition & 1 deletion fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// clients to make use of the replicated log
type FSM interface {
// Apply log is invoked once a log entry is commited
Apply([]byte)
Apply([]byte) interface{}

// Snapshot is used to support log compaction. This call should
// return an FSMSnapshot which can be used to save a point-in-time
Expand Down
21 changes: 18 additions & 3 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ import (
"time"
)

// Future is used to represent an application that may occur in the future
// Future is used to represent an action that may occur in the future
type Future interface {
Error() error
}

// ApplyFuture is used for Apply() and can returns the FSM response
type ApplyFuture interface {
Future
Response() interface{}
}

// errorFuture is used to return a static error
type errorFuture struct {
err error
Expand All @@ -19,6 +25,10 @@ func (e errorFuture) Error() error {
return e.err
}

func (e errorFuture) Response() interface{} {
return nil
}

// deferError can be embedded to allow a future
// to provide an error in the future
type deferError struct {
Expand Down Expand Up @@ -53,8 +63,13 @@ func (d *deferError) respond(err error) {
// the log is considered committed
type logFuture struct {
deferError
log Log
policy quorumPolicy
log Log
policy quorumPolicy
response interface{}
}

func (l *logFuture) Response() interface{} {
return l.response
}

type shutdownFuture struct {
Expand Down
7 changes: 5 additions & 2 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) Future {
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
Expand Down Expand Up @@ -319,14 +319,15 @@ func (r *Raft) runFSM() {

case commitTuple := <-r.fsmCommitCh:
// Apply the log
r.fsm.Apply(commitTuple.log.Data)
resp := r.fsm.Apply(commitTuple.log.Data)

// Update the indexes
lastIndex = commitTuple.log.Index
lastTerm = commitTuple.log.Term

// Invoke the future if given
if commitTuple.future != nil {
commitTuple.future.response = resp
commitTuple.future.respond(nil)
}
case <-r.shutdownCh:
Expand Down Expand Up @@ -944,6 +945,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
case r.fsmRestoreCh <- future:
case <-r.shutdownCh:
future.respond(RaftShutdown)
return
}

// Wait for the restore to happen
Expand All @@ -968,6 +970,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
log.Printf("[ERR] Failed to compact logs: %v", err)
}

log.Printf("[INFO] Installed remote snapshot")
resp.Success = true
return
}
Expand Down
5 changes: 5 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ func TestRaft_SingleNode(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Check the response
if future.Response().(int) != 1 {
t.Fatalf("bad response: %v", future.Response())
}

// Check that it is applied to the FSM
if len(c.fsms[0].logs) != 1 {
t.Fatalf("did not apply to FSM!")
Expand Down

0 comments on commit b89c454

Please sign in to comment.