Skip to content

Commit

Permalink
refactor separate etcd index from raft index
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Nov 10, 2013
1 parent 0372cde commit 6156d5c
Show file tree
Hide file tree
Showing 30 changed files with 334 additions and 407 deletions.
5 changes: 1 addition & 4 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,14 @@ type Error struct {
Message string `json:"message"`
Cause string `json:"cause,omitempty"`
Index uint64 `json:"index"`
Term uint64 `json:"term"`
}

func NewError(errorCode int, cause string, index uint64, term uint64) *Error {
func NewError(errorCode int, cause string, index uint64) *Error {
return &Error{
ErrorCode: errorCode,
Message: errors[errorCode],
Cause: cause,
Index: index,
Term: term,
}
}

Expand All @@ -93,7 +91,6 @@ func (e Error) toJsonString() string {

func (e Error) Write(w http.ResponseWriter) {
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(e.Term))
// 3xx is reft internal error
if e.ErrorCode/100 == 3 {
http.Error(w, e.toJsonString(), http.StatusInternalServerError)
Expand Down
20 changes: 10 additions & 10 deletions server/join_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ func init() {

// The JoinCommand adds a node to the cluster.
type JoinCommand struct {
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
}

func NewJoinCommand(minVersion int, maxVersion int, name, raftUrl, etcdUrl string) *JoinCommand {
return &JoinCommand{
MinVersion: minVersion,
MaxVersion: maxVersion,
Name: name,
RaftURL: raftUrl,
EtcdURL: etcdUrl,
Name: name,
RaftURL: raftUrl,
EtcdURL: etcdUrl,
}
}

Expand All @@ -54,11 +54,11 @@ func (c *JoinCommand) Apply(server raft.Server) (interface{}, error) {
// Check machine number in the cluster
if ps.registry.Count() == ps.MaxClusterSize {
log.Debug("Reject join request from ", c.Name)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex())
}

// Add to shared machine registry.
ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term())
ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL)

// Add peer in raft
err := server.AddPeer(c.Name, "")
Expand Down
18 changes: 9 additions & 9 deletions server/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,35 @@ func NewRegistry(s store.Store) *Registry {
}

// Adds a node to the registry.
func (r *Registry) Register(name string, peerURL string, url string, commitIndex uint64, term uint64) error {
func (r *Registry) Register(name string, peerURL string, url string) error {
r.Lock()
defer r.Unlock()

// Write data to store.
key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url)
_, err := r.store.Create(key, value, false, store.Permanent, commitIndex, term)
_, err := r.store.Create(key, value, false, store.Permanent)
log.Debugf("Register: %s", name)
return err
}

// Removes a node from the registry.
func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error {
func (r *Registry) Unregister(name string) error {
r.Lock()
defer r.Unlock()

// Remove from cache.
// delete(r.nodes, name)

// Remove the key from the store.
_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
_, err := r.store.Delete(path.Join(RegistryKey, name), false)
log.Debugf("Unregister: %s", name)
return err
}

// Returns the number of nodes in the cluster.
func (r *Registry) Count() int {
e, err := r.store.Get(RegistryKey, false, false, 0, 0)
e, err := r.store.Get(RegistryKey, false, false)
if err != nil {
return 0
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (r *Registry) urls(leaderName, selfName string, url func(name string) (stri
}

// Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
if e, _ := r.store.Get(RegistryKey, false, false); e != nil {
// Lookup the URL for each one.
for _, pair := range e.KVPairs {
_, name := filepath.Split(pair.Key)
Expand All @@ -160,7 +160,7 @@ func (r *Registry) load(name string) {
}

// Retrieve from store.
e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0)
e, err := r.store.Get(path.Join(RegistryKey, name), false, false)
if err != nil {
return
}
Expand All @@ -173,7 +173,7 @@ func (r *Registry) load(name string) {

// Create node.
r.nodes[name] = &node{
url: m["etcd"][0],
peerURL: m["raft"][0],
url: m["etcd"][0],
peerURL: m["raft"][0],
}
}
2 changes: 1 addition & 1 deletion server/remove_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *RemoveCommand) Apply(server raft.Server) (interface{}, error) {
ps, _ := server.Context().(*PeerServer)

// Remove node from the shared registry.
err := ps.registry.Unregister(c.Name, server.CommitIndex(), server.Term())
err := ps.registry.Unregister(c.Name)

// Delete from stats
delete(ps.followersStats.Followers, c.Name)
Expand Down
8 changes: 4 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
}

if result == nil {
return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
}

// response for raft related commands[join/remove]
Expand Down Expand Up @@ -268,7 +268,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque

// No leader available.
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(300, "", s.Store().Index())
}

var url string
Expand Down Expand Up @@ -317,7 +317,7 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err
func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
leader := s.peerServer.RaftServer().Leader()
if leader == "" {
return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index())
}
w.WriteHeader(http.StatusOK)
url, _ := s.registry.PeerURL(leader)
Expand Down Expand Up @@ -348,7 +348,7 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)

leader := s.peerServer.RaftServer().Leader()
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(300, "", s.Store().Index())
}
hostname, _ := s.registry.ClientURL(leader)
redirect(hostname, w, req)
Expand Down
2 changes: 1 addition & 1 deletion server/v1/get_key_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
key := "/" + vars["key"]

// Retrieve the key from the store.
event, err := s.Store().Get(key, false, false, s.CommitIndex(), s.Term())
event, err := s.Store().Get(key, false, false)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions server/v1/set_key_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// Parse non-blank value.
value := req.Form.Get("value")
if len(value) == 0 {
return etcdErr.NewError(200, "Set", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(200, "Set", s.Store().Index())
}

// Convert time-to-live to an expiration time.
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(202, "Set", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(202, "Set", s.Store().Index())
}

// If the "prevValue" is specified then test-and-set. Otherwise create a new key.
Expand Down
7 changes: 3 additions & 4 deletions server/v1/watch_key_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"

etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
)

Expand All @@ -21,14 +20,14 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
if req.Method == "POST" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("index")), 10, 64)
if err != nil {
return etcdErr.NewError(203, "Watch From Index", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(203, "Watch From Index", s.Store().Index())
}
}

// Start the watcher on the store.
c, err := s.Store().Watch(key, false, sinceIndex, s.CommitIndex(), s.Term())
c, err := s.Store().Watch(key, false, sinceIndex)
if err != nil {
return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(500, key, s.Store().Index())
}
event := <-c

Expand Down
36 changes: 10 additions & 26 deletions server/v2/get_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
var err error
var event *store.Event
events := make([]*store.Event, 0)

vars := mux.Vars(req)
key := "/" + vars["key"]
Expand All @@ -42,51 +41,36 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
if waitIndex != "" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64)
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
}
}

// Start the watcher on the store.
eventChan, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term())
eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
if err != nil {
return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(500, key, s.Store().Index())
}

cn, _ := w.(http.CloseNotifier)
closeChan := cn.CloseNotify()

eventLoop:
for {
select {
case <-closeChan:
return nil
case event = <-eventChan:
// for events other than expire, just one event for one watch
// for expire event, we might have a stream of events
// we use a nil item to terminate the expire event stream
if event != nil && event.Action == store.Expire {
events = append(events, event)
} else {
events = append(events, event)
break eventLoop
}
}
select {
case <-closeChan:
return nil
case event = <-eventChan:
}

} else { //get
// Retrieve the key from the store.
event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term())
event, err = s.Store().Get(key, recursive, sorted)
if err != nil {
return err
}
}

var b []byte

w.Header().Add("X-Etcd-Index", fmt.Sprint(events[0].Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(events[0].Term))
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.WriteHeader(http.StatusOK)
b, _ = json.Marshal(events)
b, _ := json.Marshal(event)

w.Write(b)

Expand Down
2 changes: 1 addition & 1 deletion server/v2/post_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
value := req.FormValue("value")
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
}

c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, true)
Expand Down
8 changes: 4 additions & 4 deletions server/v2/put_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
value := req.Form.Get("value")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store().Index())
}

_, valueOk := req.Form["prevValue"]
Expand Down Expand Up @@ -59,15 +59,15 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {

// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store().Index())
}
} else {
prevIndex = 0
}

if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store().Index())
}
}

Expand All @@ -88,7 +88,7 @@ func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, valu
func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store().Index())
}

c := s.Store().CommandFactory().CreateUpdateCommand(key, value, expireTime)
Expand Down
2 changes: 1 addition & 1 deletion server/v2/tests/delete_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ func TestV2DeleteKey(t *testing.T) {
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4,"term":0}`, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":2}`, "")
})
}
Loading

0 comments on commit 6156d5c

Please sign in to comment.