Skip to content

Commit

Permalink
Add refresh parameter to allow TTL refreshes without firing watch/wai…
Browse files Browse the repository at this point in the history
…t responses
  • Loading branch information
cchamplin committed Feb 8, 2016
1 parent 2be7f7c commit 82778ed
Show file tree
Hide file tree
Showing 16 changed files with 452 additions and 160 deletions.
44 changes: 44 additions & 0 deletions Documentation/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,50 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t
}
```

### Refreshing key TTL

Keys in etcd can be refreshed notifying watchers
this can be achieved by setting the refresh to true when updating a TTL

You cannot update the value of a key when refreshing it

```sh
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d ttl=5 -d refresh=true -d prevExist=true
```

```json
{
"action": "set",
"node": {
"createdIndex": 5,
"expiration": "2013-12-04T12:01:21.874888581-08:00",
"key": "/foo",
"modifiedIndex": 5,
"ttl": 5,
"value": "bar"
}
}
{
"action":"update",
"node":{
"key":"/foo",
"value":"bar",
"expiration": "2013-12-04T12:01:26.874888581-08:00",
"ttl":5,
"modifiedIndex":6,
"createdIndex":5
},
"prevNode":{
"key":"/foo",
"value":"bar",
"expiration":"2013-12-04T12:01:21.874888581-08:00",
"ttl":3,
"modifiedIndex":5,
"createdIndex":5
}
}
```

### Waiting for a change

Expand Down
11 changes: 11 additions & 0 deletions client/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ type SetOptions struct {
// a TTL of 0.
TTL time.Duration

// When refresh is set to true a TTL value can be updated
// without firing a watch or changing the node value. A
// value must not provided when refreshing a key.
Refresh bool

// Dir specifies whether or not this Node should be created as a directory.
Dir bool
}
Expand Down Expand Up @@ -327,6 +332,7 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions
act.PrevIndex = opts.PrevIndex
act.PrevExist = opts.PrevExist
act.TTL = opts.TTL
act.Refresh = opts.Refresh
act.Dir = opts.Dir
}

Expand Down Expand Up @@ -518,6 +524,7 @@ type setAction struct {
PrevIndex uint64
PrevExist PrevExistType
TTL time.Duration
Refresh bool
Dir bool
}

Expand Down Expand Up @@ -549,6 +556,10 @@ func (a *setAction) HTTPRequest(ep url.URL) *http.Request {
form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10))
}

if a.Refresh {
form.Add("refresh", "true")
}

u.RawQuery = params.Encode()
body := strings.NewReader(form.Encode())

Expand Down
12 changes: 12 additions & 0 deletions client/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,18 @@ func TestSetAction(t *testing.T) {
wantURL: "http://example.com/foo",
wantBody: "ttl=180&value=",
},

// Refresh is set
{
act: setAction{
Key: "foo",
TTL: 3 * time.Minute,
Refresh: true,
},
wantURL: "http://example.com/foo",
wantBody: "refresh=true&ttl=180&value=",
},

// Dir is set
{
act: setAction{
Expand Down
4 changes: 4 additions & 0 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var errors = map[int]string{
ecodeIndexValueMutex: "Index and value cannot both be specified",
EcodeInvalidField: "Invalid field",
EcodeInvalidForm: "Invalid POST form",
EcodeRefreshValue: "Value provided on refresh",
EcodeRefreshTTLRequired: "A TTL must be provided on refresh",

// raft related errors
EcodeRaftInternal: "Raft Internal Error",
Expand Down Expand Up @@ -99,6 +101,8 @@ const (
ecodeIndexValueMutex = 208
EcodeInvalidField = 209
EcodeInvalidForm = 210
EcodeRefreshValue = 211
EcodeRefreshTTLRequired = 212

EcodeRaftInternal = 300
EcodeLeaderElect = 301
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (c *cluster) AddMember(m *Member) {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create raftAttributes should never fail: %v", err)
}
c.members[m.ID] = m
Expand All @@ -321,7 +321,7 @@ func (c *cluster) RemoveMember(id types.ID) {
plog.Panicf("delete member should never fail: %v", err)
}
delete(c.members, id)
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create removedMember should never fail: %v", err)
}
c.removed[id] = true
Expand Down Expand Up @@ -352,7 +352,7 @@ func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(id), raftAttributesSuffix)
if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("update raftAttributes should never fail: %v", err)
}
c.members[id].RaftAttributes = raftAttr
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestClusterAddMember(t *testing.T) {
false,
`{"peerURLs":null}`,
false,
store.Permanent,
store.TTLOptionSet{ExpireTime: store.Permanent},
},
},
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestClusterRemoveMember(t *testing.T) {

wactions := []testutil.Action{
{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}},
{Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
{Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)
Expand Down
32 changes: 32 additions & 0 deletions etcdserver/etcdhttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,34 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
pe = &bv
}

// refresh is nullable, so leave it null if not specified
var refresh *bool
if _, ok := r.Form["refresh"]; ok {
bv, err := getBool(r.Form, "refresh")
if err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
"invalid value for refresh",
)
}
refresh = &bv
if refresh != nil && *refresh {
val := r.FormValue("value")
if _, ok := r.Form["value"]; ok && val != "" {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeRefreshValue,
`A value was provided on a refresh`,
)
}
if ttl == nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeRefreshTTLRequired,
`No TTL value set`,
)
}
}
}

rr := etcdserverpb.Request{
Method: r.Method,
Path: p,
Expand All @@ -578,6 +606,10 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
rr.PrevExist = pe
}

if refresh != nil {
rr.Refresh = refresh
}

// Null TTL is equivalent to unset Expiration
if ttl != nil {
expr := time.Duration(*ttl) * time.Second
Expand Down
43 changes: 43 additions & 0 deletions etcdserver/etcdhttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,49 @@ func (w *dummyWatcher) EventChan() chan *store.Event {
func (w *dummyWatcher) StartIndex() uint64 { return w.sidx }
func (w *dummyWatcher) Remove() {}

func TestBadRefreshRequest(t *testing.T) {
tests := []struct {
in *http.Request
wcode int
}{
{
mustNewRequest(t, "foo?refresh=true&value=test"),
etcdErr.EcodeRefreshValue,
},
{
mustNewRequest(t, "foo?refresh=true&value=10"),
etcdErr.EcodeRefreshValue,
},
{
mustNewRequest(t, "foo?refresh=true"),
etcdErr.EcodeRefreshTTLRequired,
},
{
mustNewRequest(t, "foo?refresh=true&ttl="),
etcdErr.EcodeRefreshTTLRequired,
},
}
for i, tt := range tests {
got, err := parseKeyRequest(tt.in, clockwork.NewFakeClock())
if err == nil {
t.Errorf("#%d: unexpected nil error!", i)
continue
}
ee, ok := err.(*etcdErr.Error)
if !ok {
t.Errorf("#%d: err is not etcd.Error!", i)
continue
}
if ee.ErrorCode != tt.wcode {
t.Errorf("#%d: code=%d, want %v", i, ee.ErrorCode, tt.wcode)
t.Logf("cause: %#v", ee.Cause)
}
if !reflect.DeepEqual(got, etcdserverpb.Request{}) {
t.Errorf("#%d: unexpected non-empty Request: %#v", i, got)
}
}
}

func TestBadParseRequest(t *testing.T) {
tests := []struct {
in *http.Request
Expand Down
37 changes: 37 additions & 0 deletions etcdserver/etcdserverpb/etcdserver.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions etcdserver/etcdserverpb/etcdserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ message Request {
optional bool Quorum = 14 [(gogoproto.nullable) = false];
optional int64 Time = 15 [(gogoproto.nullable) = false];
optional bool Stream = 16 [(gogoproto.nullable) = false];
optional bool Refresh = 17 [(gogoproto.nullable) = true];
}

message Metadata {
Expand Down
14 changes: 8 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,23 +1049,25 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
return Response{Event: ev, err: err}
}
expr := timeutil.UnixNanoToTime(r.Expiration)
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := store.TTLOptionSet{ExpireTime: expr, Refresh: refresh}
switch r.Method {
case "POST":
return f(s.store.Create(r.Path, r.Dir, r.Val, true, expr))
return f(s.store.Create(r.Path, r.Dir, r.Val, true, ttlOptions))
case "PUT":
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return f(s.store.Update(r.Path, r.Val, expr))
return f(s.store.Update(r.Path, r.Val, ttlOptions))
} else {
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
}
}
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
return f(s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
// TODO (yicheng): cluster should be the owner of cluster prefix store
// we should not modify cluster store here.
Expand All @@ -1083,7 +1085,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
if r.Path == path.Join(StoreClusterPrefix, "version") {
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
}
return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
}
case "DELETE":
switch {
Expand Down
Loading

0 comments on commit 82778ed

Please sign in to comment.