diff --git a/Documentation/api.md b/Documentation/api.md index 76fda75ab92..7ebffbd6f4b 100644 --- a/Documentation/api.md +++ b/Documentation/api.md @@ -234,6 +234,50 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t } ``` +### Refreshing key TTL + +Keys in etcd can be refreshed notifying watchers +this can be achieved by setting the refresh to true when updating a TTL + +You cannot update the value of a key when refreshing it + +```sh +curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5 +curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d ttl=5 -d refresh=true -d prevExist=true +``` + +```json +{ + "action": "set", + "node": { + "createdIndex": 5, + "expiration": "2013-12-04T12:01:21.874888581-08:00", + "key": "/foo", + "modifiedIndex": 5, + "ttl": 5, + "value": "bar" + } +} +{ + "action":"update", + "node":{ + "key":"/foo", + "value":"bar", + "expiration": "2013-12-04T12:01:26.874888581-08:00", + "ttl":5, + "modifiedIndex":6, + "createdIndex":5 + }, + "prevNode":{ + "key":"/foo", + "value":"bar", + "expiration":"2013-12-04T12:01:21.874888581-08:00", + "ttl":3, + "modifiedIndex":5, + "createdIndex":5 + } +} +``` ### Waiting for a change diff --git a/client/keys.go b/client/keys.go index 97e27500883..c209cab37f1 100644 --- a/client/keys.go +++ b/client/keys.go @@ -184,6 +184,11 @@ type SetOptions struct { // a TTL of 0. TTL time.Duration + // When refresh is set to true a TTL value can be updated + // without firing a watch or changing the node value. A + // value must not provided when refreshing a key. + Refresh bool + // Dir specifies whether or not this Node should be created as a directory. Dir bool } @@ -327,6 +332,7 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions act.PrevIndex = opts.PrevIndex act.PrevExist = opts.PrevExist act.TTL = opts.TTL + act.Refresh = opts.Refresh act.Dir = opts.Dir } @@ -518,6 +524,7 @@ type setAction struct { PrevIndex uint64 PrevExist PrevExistType TTL time.Duration + Refresh bool Dir bool } @@ -549,6 +556,10 @@ func (a *setAction) HTTPRequest(ep url.URL) *http.Request { form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10)) } + if a.Refresh { + form.Add("refresh", "true") + } + u.RawQuery = params.Encode() body := strings.NewReader(form.Encode()) diff --git a/client/keys_test.go b/client/keys_test.go index cafe6bea4cc..14bc45b2b86 100644 --- a/client/keys_test.go +++ b/client/keys_test.go @@ -356,6 +356,18 @@ func TestSetAction(t *testing.T) { wantURL: "http://example.com/foo", wantBody: "ttl=180&value=", }, + + // Refresh is set + { + act: setAction{ + Key: "foo", + TTL: 3 * time.Minute, + Refresh: true, + }, + wantURL: "http://example.com/foo", + wantBody: "refresh=true&ttl=180&value=", + }, + // Dir is set { act: setAction{ diff --git a/error/error.go b/error/error.go index 499823dbbd9..4b84b3c9ad2 100644 --- a/error/error.go +++ b/error/error.go @@ -48,6 +48,8 @@ var errors = map[int]string{ ecodeIndexValueMutex: "Index and value cannot both be specified", EcodeInvalidField: "Invalid field", EcodeInvalidForm: "Invalid POST form", + EcodeRefreshValue: "Value provided on refresh", + EcodeRefreshTTLRequired: "A TTL must be provided on refresh", // raft related errors EcodeRaftInternal: "Raft Internal Error", @@ -99,6 +101,8 @@ const ( ecodeIndexValueMutex = 208 EcodeInvalidField = 209 EcodeInvalidForm = 210 + EcodeRefreshValue = 211 + EcodeRefreshTTLRequired = 212 EcodeRaftInternal = 300 EcodeLeaderElect = 301 diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index c6ebfda52c5..6d0d60156bc 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -306,7 +306,7 @@ func (c *cluster) AddMember(m *Member) { plog.Panicf("marshal raftAttributes should never fail: %v", err) } p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix) - if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { + if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { plog.Panicf("create raftAttributes should never fail: %v", err) } c.members[m.ID] = m @@ -321,7 +321,7 @@ func (c *cluster) RemoveMember(id types.ID) { plog.Panicf("delete member should never fail: %v", err) } delete(c.members, id) - if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil { + if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { plog.Panicf("create removedMember should never fail: %v", err) } c.removed[id] = true @@ -352,7 +352,7 @@ func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { plog.Panicf("marshal raftAttributes should never fail: %v", err) } p := path.Join(memberStoreKey(id), raftAttributesSuffix) - if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { + if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { plog.Panicf("update raftAttributes should never fail: %v", err) } c.members[id].RaftAttributes = raftAttr diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 906bfea2905..96745a73bad 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -460,7 +460,7 @@ func TestClusterAddMember(t *testing.T) { false, `{"peerURLs":null}`, false, - store.Permanent, + store.TTLOptionSet{ExpireTime: store.Permanent}, }, }, } @@ -499,7 +499,7 @@ func TestClusterRemoveMember(t *testing.T) { wactions := []testutil.Action{ {Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}}, - {Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}}, + {Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}}, } if !reflect.DeepEqual(st.Action(), wactions) { t.Errorf("actions = %v, want %v", st.Action(), wactions) diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 10ebb51e539..e9ccf59383b 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -558,6 +558,34 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque pe = &bv } + // refresh is nullable, so leave it null if not specified + var refresh *bool + if _, ok := r.Form["refresh"]; ok { + bv, err := getBool(r.Form, "refresh") + if err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + "invalid value for refresh", + ) + } + refresh = &bv + if refresh != nil && *refresh { + val := r.FormValue("value") + if _, ok := r.Form["value"]; ok && val != "" { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeRefreshValue, + `A value was provided on a refresh`, + ) + } + if ttl == nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeRefreshTTLRequired, + `No TTL value set`, + ) + } + } + } + rr := etcdserverpb.Request{ Method: r.Method, Path: p, @@ -578,6 +606,10 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque rr.PrevExist = pe } + if refresh != nil { + rr.Refresh = refresh + } + // Null TTL is equivalent to unset Expiration if ttl != nil { expr := time.Duration(*ttl) * time.Second diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go index fcb66482678..84efb419355 100644 --- a/etcdserver/etcdhttp/client_test.go +++ b/etcdserver/etcdhttp/client_test.go @@ -172,6 +172,49 @@ func (w *dummyWatcher) EventChan() chan *store.Event { func (w *dummyWatcher) StartIndex() uint64 { return w.sidx } func (w *dummyWatcher) Remove() {} +func TestBadRefreshRequest(t *testing.T) { + tests := []struct { + in *http.Request + wcode int + }{ + { + mustNewRequest(t, "foo?refresh=true&value=test"), + etcdErr.EcodeRefreshValue, + }, + { + mustNewRequest(t, "foo?refresh=true&value=10"), + etcdErr.EcodeRefreshValue, + }, + { + mustNewRequest(t, "foo?refresh=true"), + etcdErr.EcodeRefreshTTLRequired, + }, + { + mustNewRequest(t, "foo?refresh=true&ttl="), + etcdErr.EcodeRefreshTTLRequired, + }, + } + for i, tt := range tests { + got, err := parseKeyRequest(tt.in, clockwork.NewFakeClock()) + if err == nil { + t.Errorf("#%d: unexpected nil error!", i) + continue + } + ee, ok := err.(*etcdErr.Error) + if !ok { + t.Errorf("#%d: err is not etcd.Error!", i) + continue + } + if ee.ErrorCode != tt.wcode { + t.Errorf("#%d: code=%d, want %v", i, ee.ErrorCode, tt.wcode) + t.Logf("cause: %#v", ee.Cause) + } + if !reflect.DeepEqual(got, etcdserverpb.Request{}) { + t.Errorf("#%d: unexpected non-empty Request: %#v", i, got) + } + } +} + func TestBadParseRequest(t *testing.T) { tests := []struct { in *http.Request diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 974cea4c28b..e97bdb5edf1 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -85,6 +85,7 @@ type Request struct { Quorum bool `protobuf:"varint,14,opt,name=Quorum" json:"Quorum"` Time int64 `protobuf:"varint,15,opt,name=Time" json:"Time"` Stream bool `protobuf:"varint,16,opt,name=Stream" json:"Stream"` + Refresh *bool `protobuf:"varint,17,opt,name=Refresh" json:"Refresh,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -212,6 +213,18 @@ func (m *Request) MarshalTo(data []byte) (int, error) { data[i] = 0 } i++ + if m.Refresh != nil { + data[i] = 0x88 + i++ + data[i] = 0x1 + i++ + if *m.Refresh { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } @@ -297,6 +310,9 @@ func (m *Request) Size() (n int) { n += 2 n += 1 + sovEtcdserver(uint64(m.Time)) n += 3 + if m.Refresh != nil { + n += 3 + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -708,6 +724,27 @@ func (m *Request) Unmarshal(data []byte) error { } } m.Stream = bool(v != 0) + case 17: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Refresh", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEtcdserver + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + b := bool(v != 0) + m.Refresh = &b default: iNdEx = preIndex skippy, err := skipEtcdserver(data[iNdEx:]) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 024c3037f10..25e0aca5d9f 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -25,6 +25,7 @@ message Request { optional bool Quorum = 14 [(gogoproto.nullable) = false]; optional int64 Time = 15 [(gogoproto.nullable) = false]; optional bool Stream = 16 [(gogoproto.nullable) = false]; + optional bool Refresh = 17 [(gogoproto.nullable) = true]; } message Metadata { diff --git a/etcdserver/server.go b/etcdserver/server.go index 6c1ccb6dca2..c1208c6059c 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1049,23 +1049,25 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { return Response{Event: ev, err: err} } expr := timeutil.UnixNanoToTime(r.Expiration) + refresh, _ := pbutil.GetBool(r.Refresh) + ttlOptions := store.TTLOptionSet{ExpireTime: expr, Refresh: refresh} switch r.Method { case "POST": - return f(s.store.Create(r.Path, r.Dir, r.Val, true, expr)) + return f(s.store.Create(r.Path, r.Dir, r.Val, true, ttlOptions)) case "PUT": exists, existsSet := pbutil.GetBool(r.PrevExist) switch { case existsSet: if exists { if r.PrevIndex == 0 && r.PrevValue == "" { - return f(s.store.Update(r.Path, r.Val, expr)) + return f(s.store.Update(r.Path, r.Val, ttlOptions)) } else { - return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)) + return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) } } - return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr)) + return f(s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) case r.PrevIndex > 0 || r.PrevValue != "": - return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)) + return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) default: // TODO (yicheng): cluster should be the owner of cluster prefix store // we should not modify cluster store here. @@ -1083,7 +1085,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { if r.Path == path.Join(StoreClusterPrefix, "version") { s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) } - return f(s.store.Set(r.Path, r.Dir, r.Val, expr)) + return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) } case "DELETE": switch { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 537c03188e0..ebd45c2860a 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -229,7 +229,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Create", - Params: []interface{}{"", false, "", true, time.Time{}}, + Params: []interface{}{"", false, "", true, store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -240,7 +240,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Create", - Params: []interface{}{"", false, "", true, time.Unix(0, 1337)}, + Params: []interface{}{"", false, "", true, store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}}, }, }, }, @@ -251,7 +251,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Create", - Params: []interface{}{"", true, "", true, time.Time{}}, + Params: []interface{}{"", true, "", true, store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -262,7 +262,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Set", - Params: []interface{}{"", false, "", time.Time{}}, + Params: []interface{}{"", false, "", store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -273,7 +273,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Set", - Params: []interface{}{"", true, "", time.Time{}}, + Params: []interface{}{"", true, "", store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -284,7 +284,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Update", - Params: []interface{}{"", "", time.Time{}}, + Params: []interface{}{"", "", store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -295,7 +295,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Create", - Params: []interface{}{"", false, "", false, time.Time{}}, + Params: []interface{}{"", false, "", false, store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -306,7 +306,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "CompareAndSwap", - Params: []interface{}{"", "", uint64(1), "", time.Time{}}, + Params: []interface{}{"", "", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -317,7 +317,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "Create", - Params: []interface{}{"", false, "", false, time.Time{}}, + Params: []interface{}{"", false, "", false, store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -328,7 +328,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "CompareAndSwap", - Params: []interface{}{"", "", uint64(1), "", time.Time{}}, + Params: []interface{}{"", "", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -339,7 +339,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "CompareAndSwap", - Params: []interface{}{"", "bar", uint64(0), "", time.Time{}}, + Params: []interface{}{"", "bar", uint64(0), "", store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, @@ -350,7 +350,7 @@ func TestApplyRequest(t *testing.T) { []testutil.Action{ { Name: "CompareAndSwap", - Params: []interface{}{"", "bar", uint64(1), "", time.Time{}}, + Params: []interface{}{"", "bar", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}}, }, }, }, diff --git a/store/stats_test.go b/store/stats_test.go index da1bac077d8..28d096f15df 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -24,7 +24,7 @@ import ( // Ensure that a successful Get is recorded in the stats. func TestStoreStatsGetSuccess(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) s.Get("/foo", false, false) assert.Equal(t, uint64(1), s.Stats.GetSuccess, "") } @@ -32,7 +32,7 @@ func TestStoreStatsGetSuccess(t *testing.T) { // Ensure that a failed Get is recorded in the stats. func TestStoreStatsGetFail(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) s.Get("/no_such_key", false, false) assert.Equal(t, uint64(1), s.Stats.GetFail, "") } @@ -40,53 +40,53 @@ func TestStoreStatsGetFail(t *testing.T) { // Ensure that a successful Create is recorded in the stats. func TestStoreStatsCreateSuccess(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) assert.Equal(t, uint64(1), s.Stats.CreateSuccess, "") } // Ensure that a failed Create is recorded in the stats. func TestStoreStatsCreateFail(t *testing.T) { s := newStore() - s.Create("/foo", true, "", false, Permanent) - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) assert.Equal(t, uint64(1), s.Stats.CreateFail, "") } // Ensure that a successful Update is recorded in the stats. func TestStoreStatsUpdateSuccess(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) - s.Update("/foo", "baz", Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent}) assert.Equal(t, uint64(1), s.Stats.UpdateSuccess, "") } // Ensure that a failed Update is recorded in the stats. func TestStoreStatsUpdateFail(t *testing.T) { s := newStore() - s.Update("/foo", "bar", Permanent) + s.Update("/foo", "bar", TTLOptionSet{ExpireTime: Permanent}) assert.Equal(t, uint64(1), s.Stats.UpdateFail, "") } // Ensure that a successful CAS is recorded in the stats. func TestStoreStatsCompareAndSwapSuccess(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) - s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + s.CompareAndSwap("/foo", "bar", 0, "baz", TTLOptionSet{ExpireTime: Permanent}) assert.Equal(t, uint64(1), s.Stats.CompareAndSwapSuccess, "") } // Ensure that a failed CAS is recorded in the stats. func TestStoreStatsCompareAndSwapFail(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) - s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + s.CompareAndSwap("/foo", "wrong_value", 0, "baz", TTLOptionSet{ExpireTime: Permanent}) assert.Equal(t, uint64(1), s.Stats.CompareAndSwapFail, "") } // Ensure that a successful Delete is recorded in the stats. func TestStoreStatsDeleteSuccess(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) s.Delete("/foo", false, false) assert.Equal(t, uint64(1), s.Stats.DeleteSuccess, "") } @@ -104,7 +104,7 @@ func TestStoreStatsExpireCount(t *testing.T) { fc := newFakeClock() s.clock = fc - s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond)) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) assert.Equal(t, uint64(0), s.Stats.ExpireCount, "") fc.Advance(600 * time.Millisecond) s.DeleteExpiredKeys(fc.Now()) diff --git a/store/store.go b/store/store.go index 99022141695..72d466e0182 100644 --- a/store/store.go +++ b/store/store.go @@ -43,12 +43,12 @@ type Store interface { Index() uint64 Get(nodePath string, recursive, sorted bool) (*Event, error) - Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) - Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) + Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) + Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) Create(nodePath string, dir bool, value string, unique bool, - expireTime time.Time) (*Event, error) + expireOpts TTLOptionSet) (*Event, error) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, - value string, expireTime time.Time) (*Event, error) + value string, expireOpts TTLOptionSet) (*Event, error) Delete(nodePath string, dir, recursive bool) (*Event, error) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) @@ -64,6 +64,11 @@ type Store interface { DeleteExpiredKeys(cutoff time.Time) } +type TTLOptionSet struct { + ExpireTime time.Time + Refresh bool +} + type store struct { Root *node WatcherHub *watcherHub @@ -154,7 +159,7 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) { // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl. // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. -func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) { +func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) { var err *etcdErr.Error s.worldLock.Lock() @@ -171,7 +176,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp reportWriteFailure(Create) }() - e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create) + e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create) if err != nil { return nil, err } @@ -183,7 +188,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp } // Set creates or replace the node at nodePath. -func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) { +func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) { var err *etcdErr.Error s.worldLock.Lock() @@ -207,8 +212,17 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim return nil, err } + if expireOpts.Refresh { + if getErr != nil { + err = getErr + return nil, err + } else { + value = n.Value + } + } + // Set new value - e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set) + e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set) if err != nil { return nil, err } @@ -221,7 +235,9 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim e.PrevNode = prev.Node } - s.WatcherHub.notify(e) + if !expireOpts.Refresh { + s.WatcherHub.notify(e) + } return e, nil } @@ -239,7 +255,7 @@ func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) } func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, - value string, expireTime time.Time) (*Event, error) { + value string, expireOpts TTLOptionSet) (*Event, error) { var err *etcdErr.Error @@ -290,14 +306,16 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint // if test succeed, write the value n.Write(value, s.CurrentIndex) - n.UpdateTTL(expireTime) + n.UpdateTTL(expireOpts.ExpireTime) // copy the value for safety valueCopy := value eNode.Value = &valueCopy eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) - s.WatcherHub.notify(e) + if !expireOpts.Refresh { + s.WatcherHub.notify(e) + } return e, nil } @@ -462,7 +480,7 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string // Update updates the value/ttl of the node. // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. -func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) { +func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) { var err *etcdErr.Error s.worldLock.Lock() @@ -496,6 +514,10 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex) } + if expireOpts.Refresh { + newValue = n.Value + } + e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex) e.EtcdIndex = nextIndex e.PrevNode = n.Repr(false, false, s.clock) @@ -512,11 +534,13 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( } // update ttl - n.UpdateTTL(expireTime) + n.UpdateTTL(expireOpts.ExpireTime) eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) - s.WatcherHub.notify(e) + if !expireOpts.Refresh { + s.WatcherHub.notify(e) + } s.CurrentIndex = nextIndex @@ -778,31 +802,31 @@ func (s *storeRecorder) Get(path string, recursive, sorted bool) (*Event, error) }) return &Event{}, nil } -func (s *storeRecorder) Set(path string, dir bool, val string, expr time.Time) (*Event, error) { +func (s *storeRecorder) Set(path string, dir bool, val string, expireOpts TTLOptionSet) (*Event, error) { s.Record(testutil.Action{ Name: "Set", - Params: []interface{}{path, dir, val, expr}, + Params: []interface{}{path, dir, val, expireOpts}, }) return &Event{}, nil } -func (s *storeRecorder) Update(path, val string, expr time.Time) (*Event, error) { +func (s *storeRecorder) Update(path, val string, expireOpts TTLOptionSet) (*Event, error) { s.Record(testutil.Action{ Name: "Update", - Params: []interface{}{path, val, expr}, + Params: []interface{}{path, val, expireOpts}, }) return &Event{}, nil } -func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*Event, error) { +func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, expireOpts TTLOptionSet) (*Event, error) { s.Record(testutil.Action{ Name: "Create", - Params: []interface{}{path, dir, val, uniq, exp}, + Params: []interface{}{path, dir, val, uniq, expireOpts}, }) return &Event{}, nil } -func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*Event, error) { +func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expireOpts TTLOptionSet) (*Event, error) { s.Record(testutil.Action{ Name: "CompareAndSwap", - Params: []interface{}{path, prevVal, prevIdx, val, expr}, + Params: []interface{}{path, prevVal, prevIdx, val, expireOpts}, }) return &Event{}, nil } diff --git a/store/store_bench_test.go b/store/store_bench_test.go index 3922116f0e6..4e89354dece 100644 --- a/store/store_bench_test.go +++ b/store/store_bench_test.go @@ -56,7 +56,7 @@ func BenchmarkStoreDelete(b *testing.B) { runtime.ReadMemStats(memStats) for i := 0; i < b.N; i++ { - _, err := s.Set(kvs[i][0], false, kvs[i][1], Permanent) + _, err := s.Set(kvs[i][0], false, kvs[i][1], TTLOptionSet{ExpireTime: Permanent}) if err != nil { panic(err) } @@ -132,7 +132,7 @@ func BenchmarkWatchWithSet(b *testing.B) { for i := 0; i < b.N; i++ { w, _ := s.Watch(kvs[i][0], false, false, 0) - s.Set(kvs[i][0], false, "test", Permanent) + s.Set(kvs[i][0], false, "test", TTLOptionSet{ExpireTime: Permanent}) <-w.EventChan() } } @@ -150,7 +150,7 @@ func BenchmarkWatchWithSetBatch(b *testing.B) { } for i := 0; i < b.N; i++ { - s.Set(kvs[i][0], false, "test", Permanent) + s.Set(kvs[i][0], false, "test", TTLOptionSet{ExpireTime: Permanent}) } for i := 0; i < b.N; i++ { @@ -167,7 +167,7 @@ func BenchmarkWatchOneKey(b *testing.B) { watchers[i], _ = s.Watch("/foo", false, false, 0) } - s.Set("/foo", false, "", Permanent) + s.Set("/foo", false, "", TTLOptionSet{ExpireTime: Permanent}) for i := 0; i < b.N; i++ { <-watchers[i].EventChan() @@ -181,7 +181,7 @@ func benchStoreSet(b *testing.B, valueSize int, process func(interface{}) ([]byt b.StartTimer() for i := 0; i < b.N; i++ { - resp, err := s.Set(kvs[i][0], false, kvs[i][1], Permanent) + resp, err := s.Set(kvs[i][0], false, kvs[i][1], TTLOptionSet{ExpireTime: Permanent}) if err != nil { panic(err) } diff --git a/store/store_test.go b/store/store_test.go index b2f6040b835..81f8b23a07e 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -35,7 +35,7 @@ func TestNewStoreWithNamespaces(t *testing.T) { // Ensure that the store can retrieve an existing value. func TestStoreGetValue(t *testing.T) { s := newStore() - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) var eidx uint64 = 1 e, err := s.Get("/foo", false, false) assert.Nil(t, err, "") @@ -52,7 +52,7 @@ func TestMinExpireTime(t *testing.T) { s.clock = fc // FakeClock starts at 0, so minExpireTime should be far in the future.. but just in case assert.True(t, minExpireTime.After(fc.Now()), "minExpireTime should be ahead of FakeClock!") - s.Create("/foo", false, "Y", false, fc.Now().Add(3*time.Second)) + s.Create("/foo", false, "Y", false, TTLOptionSet{ExpireTime: fc.Now().Add(3 * time.Second)}) fc.Advance(5 * time.Second) // Ensure it hasn't expired s.DeleteExpiredKeys(fc.Now()) @@ -71,13 +71,13 @@ func TestStoreGetDirectory(t *testing.T) { s := newStore() fc := newFakeClock() s.clock = fc - s.Create("/foo", true, "", false, Permanent) - s.Create("/foo/bar", false, "X", false, Permanent) - s.Create("/foo/_hidden", false, "*", false, Permanent) - s.Create("/foo/baz", true, "", false, Permanent) - s.Create("/foo/baz/bat", false, "Y", false, Permanent) - s.Create("/foo/baz/_hidden", false, "*", false, Permanent) - s.Create("/foo/baz/ttl", false, "Y", false, fc.Now().Add(time.Second*3)) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/bar", false, "X", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/_hidden", false, "*", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/baz", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/baz/bat", false, "Y", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/baz/_hidden", false, "*", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/baz/ttl", false, "Y", false, TTLOptionSet{ExpireTime: fc.Now().Add(time.Second * 3)}) var eidx uint64 = 7 e, err := s.Get("/foo", true, false) assert.Nil(t, err, "") @@ -117,12 +117,12 @@ func TestStoreGetDirectory(t *testing.T) { // Ensure that the store can retrieve a directory in sorted order. func TestStoreGetSorted(t *testing.T) { s := newStore() - s.Create("/foo", true, "", false, Permanent) - s.Create("/foo/x", false, "0", false, Permanent) - s.Create("/foo/z", false, "0", false, Permanent) - s.Create("/foo/y", true, "", false, Permanent) - s.Create("/foo/y/a", false, "0", false, Permanent) - s.Create("/foo/y/b", false, "0", false, Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/x", false, "0", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/z", false, "0", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/y", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/y/a", false, "0", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/y/b", false, "0", false, TTLOptionSet{ExpireTime: Permanent}) var eidx uint64 = 6 e, err := s.Get("/foo", true, true) assert.Nil(t, err, "") @@ -153,7 +153,7 @@ func TestSet(t *testing.T) { // Set /foo="" var eidx uint64 = 1 - e, err := s.Set("/foo", false, "", Permanent) + e, err := s.Set("/foo", false, "", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "set", "") @@ -167,7 +167,7 @@ func TestSet(t *testing.T) { // Set /foo="bar" eidx = 2 - e, err = s.Set("/foo", false, "bar", Permanent) + e, err = s.Set("/foo", false, "bar", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "set", "") @@ -185,7 +185,7 @@ func TestSet(t *testing.T) { assert.Equal(t, e.PrevNode.ModifiedIndex, uint64(1), "") // Set /foo="baz" (for testing prevNode) eidx = 3 - e, err = s.Set("/foo", false, "baz", Permanent) + e, err = s.Set("/foo", false, "baz", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "set", "") @@ -204,7 +204,7 @@ func TestSet(t *testing.T) { // Set /dir as a directory eidx = 4 - e, err = s.Set("/dir", true, "", Permanent) + e, err = s.Set("/dir", true, "", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "set", "") @@ -222,7 +222,7 @@ func TestStoreCreateValue(t *testing.T) { s := newStore() // Create /foo=bar var eidx uint64 = 1 - e, err := s.Create("/foo", false, "bar", false, Permanent) + e, err := s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "create", "") @@ -236,7 +236,7 @@ func TestStoreCreateValue(t *testing.T) { // Create /empty="" eidx = 2 - e, err = s.Create("/empty", false, "", false, Permanent) + e, err = s.Create("/empty", false, "", false, TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "create", "") @@ -254,7 +254,7 @@ func TestStoreCreateValue(t *testing.T) { func TestStoreCreateDirectory(t *testing.T) { s := newStore() var eidx uint64 = 1 - e, err := s.Create("/foo", true, "", false, Permanent) + e, err := s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "create", "") @@ -266,10 +266,10 @@ func TestStoreCreateDirectory(t *testing.T) { func TestStoreCreateFailsIfExists(t *testing.T) { s := newStore() // create /foo as dir - s.Create("/foo", true, "", false, Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) // create /foo as dir again - e, _err := s.Create("/foo", true, "", false, Permanent) + e, _err := s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) err := _err.(*etcdErr.Error) assert.Equal(t, err.ErrorCode, etcdErr.EcodeNodeExist, "") assert.Equal(t, err.Message, "Key already exists", "") @@ -282,10 +282,10 @@ func TestStoreCreateFailsIfExists(t *testing.T) { func TestStoreUpdateValue(t *testing.T) { s := newStore() // create /foo=bar - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) // update /foo="bzr" var eidx uint64 = 2 - e, err := s.Update("/foo", "baz", Permanent) + e, err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "update", "") @@ -306,7 +306,7 @@ func TestStoreUpdateValue(t *testing.T) { // update /foo="" eidx = 3 - e, err = s.Update("/foo", "", Permanent) + e, err = s.Update("/foo", "", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "update", "") @@ -329,8 +329,8 @@ func TestStoreUpdateValue(t *testing.T) { // Ensure that the store cannot update a directory. func TestStoreUpdateFailsIfDirectory(t *testing.T) { s := newStore() - s.Create("/foo", true, "", false, Permanent) - e, _err := s.Update("/foo", "baz", Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + e, _err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent}) err := _err.(*etcdErr.Error) assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "") assert.Equal(t, err.Message, "Not a file", "") @@ -345,8 +345,8 @@ func TestStoreUpdateValueTTL(t *testing.T) { s.clock = fc var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, Permanent) - _, err := s.Update("/foo", "baz", fc.Now().Add(500*time.Millisecond)) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + _, err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) e, _ := s.Get("/foo", false, false) assert.Equal(t, *e.Node.Value, "baz", "") assert.Equal(t, e.EtcdIndex, eidx, "") @@ -364,9 +364,9 @@ func TestStoreUpdateDirTTL(t *testing.T) { s.clock = fc var eidx uint64 = 3 - s.Create("/foo", true, "", false, Permanent) - s.Create("/foo/bar", false, "baz", false, Permanent) - e, err := s.Update("/foo", "", fc.Now().Add(500*time.Millisecond)) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) + e, err := s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) assert.Equal(t, e.Node.Dir, true, "") assert.Equal(t, e.EtcdIndex, eidx, "") e, _ = s.Get("/foo/bar", false, false) @@ -384,7 +384,7 @@ func TestStoreUpdateDirTTL(t *testing.T) { func TestStoreDeleteValue(t *testing.T) { s := newStore() var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) e, err := s.Delete("/foo", false, false) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") @@ -400,7 +400,7 @@ func TestStoreDeleteDiretory(t *testing.T) { s := newStore() // create directory /foo var eidx uint64 = 2 - s.Create("/foo", true, "", false, Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) // delete /foo with dir = true and recursive = false // this should succeed, since the directory is empty e, err := s.Delete("/foo", true, false) @@ -413,7 +413,7 @@ func TestStoreDeleteDiretory(t *testing.T) { assert.Equal(t, e.PrevNode.Dir, true, "") // create directory /foo and directory /foo/bar - s.Create("/foo/bar", true, "", false, Permanent) + s.Create("/foo/bar", true, "", false, TTLOptionSet{ExpireTime: Permanent}) // delete /foo with dir = true and recursive = false // this should fail, since the directory is not empty _, err = s.Delete("/foo", true, false) @@ -433,7 +433,7 @@ func TestStoreDeleteDiretory(t *testing.T) { // and dir are not specified. func TestStoreDeleteDiretoryFailsIfNonRecursiveAndDir(t *testing.T) { s := newStore() - s.Create("/foo", true, "", false, Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) e, _err := s.Delete("/foo", false, false) err := _err.(*etcdErr.Error) assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "") @@ -445,19 +445,19 @@ func TestRootRdOnly(t *testing.T) { s := newStore("/0") for _, tt := range []string{"/", "/0"} { - _, err := s.Set(tt, true, "", Permanent) + _, err := s.Set(tt, true, "", TTLOptionSet{ExpireTime: Permanent}) assert.NotNil(t, err, "") _, err = s.Delete(tt, true, true) assert.NotNil(t, err, "") - _, err = s.Create(tt, true, "", false, Permanent) + _, err = s.Create(tt, true, "", false, TTLOptionSet{ExpireTime: Permanent}) assert.NotNil(t, err, "") - _, err = s.Update(tt, "", Permanent) + _, err = s.Update(tt, "", TTLOptionSet{ExpireTime: Permanent}) assert.NotNil(t, err, "") - _, err = s.CompareAndSwap(tt, "", 0, "", Permanent) + _, err = s.CompareAndSwap(tt, "", 0, "", TTLOptionSet{ExpireTime: Permanent}) assert.NotNil(t, err, "") } } @@ -465,7 +465,7 @@ func TestRootRdOnly(t *testing.T) { func TestStoreCompareAndDeletePrevValue(t *testing.T) { s := newStore() var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) e, err := s.CompareAndDelete("/foo", "bar", 0) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") @@ -483,7 +483,7 @@ func TestStoreCompareAndDeletePrevValue(t *testing.T) { func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) e, _err := s.CompareAndDelete("/foo", "baz", 0) err := _err.(*etcdErr.Error) assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") @@ -497,7 +497,7 @@ func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) { func TestStoreCompareAndDeletePrevIndex(t *testing.T) { s := newStore() var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) e, err := s.CompareAndDelete("/foo", "", 1) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") @@ -513,7 +513,7 @@ func TestStoreCompareAndDeletePrevIndex(t *testing.T) { func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) e, _err := s.CompareAndDelete("/foo", "", 100) assert.NotNil(t, _err, "") err := _err.(*etcdErr.Error) @@ -528,7 +528,7 @@ func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) { // Ensure that the store cannot delete a directory. func TestStoreCompareAndDeleteDiretoryFail(t *testing.T) { s := newStore() - s.Create("/foo", true, "", false, Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) _, _err := s.CompareAndDelete("/foo", "", 0) assert.NotNil(t, _err, "") err := _err.(*etcdErr.Error) @@ -539,8 +539,8 @@ func TestStoreCompareAndDeleteDiretoryFail(t *testing.T) { func TestStoreCompareAndSwapPrevValue(t *testing.T) { s := newStore() var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, Permanent) - e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "compareAndSwap", "") @@ -560,8 +560,8 @@ func TestStoreCompareAndSwapPrevValue(t *testing.T) { func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo", false, "bar", false, Permanent) - e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", TTLOptionSet{ExpireTime: Permanent}) err := _err.(*etcdErr.Error) assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") assert.Equal(t, err.Message, "Compare failed", "") @@ -575,8 +575,8 @@ func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) { func TestStoreCompareAndSwapPrevIndex(t *testing.T) { s := newStore() var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, Permanent) - e, err := s.CompareAndSwap("/foo", "", 1, "baz", Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + e, err := s.CompareAndSwap("/foo", "", 1, "baz", TTLOptionSet{ExpireTime: Permanent}) assert.Nil(t, err, "") assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "compareAndSwap", "") @@ -597,8 +597,8 @@ func TestStoreCompareAndSwapPrevIndex(t *testing.T) { func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo", false, "bar", false, Permanent) - e, _err := s.CompareAndSwap("/foo", "", 100, "baz", Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + e, _err := s.CompareAndSwap("/foo", "", 100, "baz", TTLOptionSet{ExpireTime: Permanent}) err := _err.(*etcdErr.Error) assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") assert.Equal(t, err.Message, "Compare failed", "") @@ -615,7 +615,7 @@ func TestStoreWatchCreate(t *testing.T) { w, _ := s.Watch("/foo", false, false, 0) c := w.EventChan() assert.Equal(t, w.StartIndex(), eidx, "") - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) eidx = 1 e := nbselect(c) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -632,7 +632,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { w, _ := s.Watch("/foo", true, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") eidx = 1 - s.Create("/foo/bar", false, "baz", false, Permanent) + s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "create", "") @@ -643,11 +643,11 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { func TestStoreWatchUpdate(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", false, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") eidx = 2 - s.Update("/foo", "baz", Permanent) + s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "update", "") @@ -658,11 +658,11 @@ func TestStoreWatchUpdate(t *testing.T) { func TestStoreWatchRecursiveUpdate(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo/bar", false, "baz", false, Permanent) + s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", true, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") eidx = 2 - s.Update("/foo/bar", "baz", Permanent) + s.Update("/foo/bar", "baz", TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "update", "") @@ -673,7 +673,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) { func TestStoreWatchDelete(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", false, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") eidx = 2 @@ -688,7 +688,7 @@ func TestStoreWatchDelete(t *testing.T) { func TestStoreWatchRecursiveDelete(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo/bar", false, "baz", false, Permanent) + s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", true, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") eidx = 2 @@ -703,11 +703,11 @@ func TestStoreWatchRecursiveDelete(t *testing.T) { func TestStoreWatchCompareAndSwap(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", false, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") eidx = 2 - s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) + s.CompareAndSwap("/foo", "bar", 0, "baz", TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "compareAndSwap", "") @@ -718,11 +718,11 @@ func TestStoreWatchCompareAndSwap(t *testing.T) { func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { s := newStore() var eidx uint64 = 1 - s.Create("/foo/bar", false, "baz", false, Permanent) + s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", true, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") eidx = 2 - s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent) + s.CompareAndSwap("/foo/bar", "baz", 0, "bat", TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "compareAndSwap", "") @@ -736,8 +736,8 @@ func TestStoreWatchExpire(t *testing.T) { s.clock = fc var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond)) - s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(500*time.Millisecond)) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) w, _ := s.Watch("/", true, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") @@ -760,13 +760,95 @@ func TestStoreWatchExpire(t *testing.T) { assert.Equal(t, e.Node.Key, "/foofoo", "") } +// Ensure that the store can watch for key expiration when refreshing. +func TestStoreWatchExpireRefresh(t *testing.T) { + s := newStore() + fc := newFakeClock() + s.clock = fc + + var eidx uint64 = 2 + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(1200 * time.Millisecond), Refresh: true}) + + // Make sure we set watch updates when Refresh is true for newly created keys + w, _ := s.Watch("/", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + c := w.EventChan() + e := nbselect(c) + assert.Nil(t, e, "") + fc.Advance(600 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 3 + e = nbselect(c) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foo", "") + + s.Update("/foofoo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + w, _ = s.Watch("/", true, false, 4) + fc.Advance(700 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 5 // We should skip 4 because a TTL update should occur with no watch notification + assert.Equal(t, w.StartIndex(), eidx-1, "") + e = nbselect(w.EventChan()) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foofoo", "") +} + +// Ensure that the store can watch for key expiration when refreshing with an empty value. +func TestStoreWatchExpireEmptyRefresh(t *testing.T) { + s := newStore() + fc := newFakeClock() + s.clock = fc + + var eidx uint64 = 1 + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + // Should be no-op + fc.Advance(200 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + + s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + w, _ := s.Watch("/", true, false, 2) + fc.Advance(700 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 3 // We should skip 2 because a TTL update should occur with no watch notification + assert.Equal(t, w.StartIndex(), eidx-1, "") + e := nbselect(w.EventChan()) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foo", "") + assert.Equal(t, *e.PrevNode.Value, "bar", "") +} + +// Ensure that the store can update the TTL on a value with refresh. +func TestStoreRefresh(t *testing.T) { + s := newStore() + fc := newFakeClock() + s.clock = fc + + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + s.Create("/bar", true, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + _, err := s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + assert.Nil(t, err, "") + + _, err = s.Set("/foo", false, "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + assert.Nil(t, err, "") + + _, err = s.Update("/bar", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + assert.Nil(t, err, "") + + _, err = s.CompareAndSwap("/foo", "bar", 0, "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + assert.Nil(t, err, "") +} + // Ensure that the store can watch in streaming mode. func TestStoreWatchStream(t *testing.T) { s := newStore() var eidx uint64 = 1 w, _ := s.Watch("/foo", false, true, 0) // first modification - s.Create("/foo", false, "bar", false, Permanent) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "create", "") @@ -776,7 +858,7 @@ func TestStoreWatchStream(t *testing.T) { assert.Nil(t, e, "") // second modification eidx = 2 - s.Update("/foo", "baz", Permanent) + s.Update("/foo", "baz", TTLOptionSet{ExpireTime: Permanent}) e = nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "update", "") @@ -790,10 +872,10 @@ func TestStoreWatchStream(t *testing.T) { func TestStoreRecover(t *testing.T) { s := newStore() var eidx uint64 = 4 - s.Create("/foo", true, "", false, Permanent) - s.Create("/foo/x", false, "bar", false, Permanent) - s.Update("/foo/x", "barbar", Permanent) - s.Create("/foo/y", false, "baz", false, Permanent) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/x", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + s.Update("/foo/x", "barbar", TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/y", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) b, err := s.Save() s2 := newStore() @@ -820,9 +902,9 @@ func TestStoreRecoverWithExpiration(t *testing.T) { fc := newFakeClock() var eidx uint64 = 4 - s.Create("/foo", true, "", false, Permanent) - s.Create("/foo/x", false, "bar", false, Permanent) - s.Create("/foo/y", false, "baz", false, fc.Now().Add(5*time.Millisecond)) + s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/x", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) + s.Create("/foo/y", false, "baz", false, TTLOptionSet{ExpireTime: fc.Now().Add(5 * time.Millisecond)}) b, err := s.Save() time.Sleep(10 * time.Millisecond) @@ -850,7 +932,7 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) { s := newStore() var eidx uint64 = 1 w, _ := s.Watch("/_foo", false, false, 0) - s.Create("/_foo", false, "bar", false, Permanent) + s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "create", "") @@ -863,14 +945,14 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) { func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) { s := newStore() w, _ := s.Watch("/foo", true, false, 0) - s.Create("/foo/_bar", false, "baz", false, Permanent) + s.Create("/foo/_bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Nil(t, e, "") w, _ = s.Watch("/foo", true, false, 0) - s.Create("/foo/_baz", true, "", false, Permanent) + s.Create("/foo/_baz", true, "", false, TTLOptionSet{ExpireTime: Permanent}) e = nbselect(w.EventChan()) assert.Nil(t, e, "") - s.Create("/foo/_baz/quux", false, "quux", false, Permanent) + s.Create("/foo/_baz/quux", false, "quux", false, TTLOptionSet{ExpireTime: Permanent}) e = nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -878,9 +960,9 @@ func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) { // Ensure that the store doesn't see hidden key updates. func TestStoreWatchUpdateWithHiddenKey(t *testing.T) { s := newStore() - s.Create("/_foo", false, "bar", false, Permanent) + s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/_foo", false, false, 0) - s.Update("/_foo", "baz", Permanent) + s.Update("/_foo", "baz", TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Node.Key, "/_foo", "") @@ -891,9 +973,9 @@ func TestStoreWatchUpdateWithHiddenKey(t *testing.T) { // Ensure that the store doesn't see hidden key updates without an exact path match in recursive mode. func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) { s := newStore() - s.Create("/foo/_bar", false, "baz", false, Permanent) + s.Create("/foo/_bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", true, false, 0) - s.Update("/foo/_bar", "baz", Permanent) + s.Update("/foo/_bar", "baz", TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -902,7 +984,7 @@ func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) { func TestStoreWatchDeleteWithHiddenKey(t *testing.T) { s := newStore() var eidx uint64 = 2 - s.Create("/_foo", false, "bar", false, Permanent) + s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/_foo", false, false, 0) s.Delete("/_foo", false, false) e := nbselect(w.EventChan()) @@ -916,7 +998,7 @@ func TestStoreWatchDeleteWithHiddenKey(t *testing.T) { // Ensure that the store doesn't see hidden key deletes without an exact path match in recursive mode. func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) { s := newStore() - s.Create("/foo/_bar", false, "baz", false, Permanent) + s.Create("/foo/_bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) w, _ := s.Watch("/foo", true, false, 0) s.Delete("/foo/_bar", false, false) e := nbselect(w.EventChan()) @@ -929,8 +1011,8 @@ func TestStoreWatchExpireWithHiddenKey(t *testing.T) { fc := newFakeClock() s.clock = fc - s.Create("/_foo", false, "bar", false, fc.Now().Add(500*time.Millisecond)) - s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(1000*time.Millisecond)) + s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(1000 * time.Millisecond)}) w, _ := s.Watch("/", true, false, 0) c := w.EventChan() @@ -952,7 +1034,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) { s := newStore() var eidx uint64 = 1 w, _ := s.Watch("/_foo/bar", true, false, 0) - s.Create("/_foo/bar/baz", false, "baz", false, Permanent) + s.Create("/_foo/bar/baz", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) e := nbselect(w.EventChan()) assert.NotNil(t, e, "") @@ -970,10 +1052,10 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) { // to operate correctly. func TestStoreWatchSlowConsumer(t *testing.T) { s := newStore() - s.Watch("/foo", true, true, 0) // stream must be true - s.Set("/foo", false, "1", Permanent) // ok - s.Set("/foo", false, "2", Permanent) // ok - s.Set("/foo", false, "3", Permanent) // must not panic + s.Watch("/foo", true, true, 0) // stream must be true + s.Set("/foo", false, "1", TTLOptionSet{ExpireTime: Permanent}) // ok + s.Set("/foo", false, "2", TTLOptionSet{ExpireTime: Permanent}) // ok + s.Set("/foo", false, "3", TTLOptionSet{ExpireTime: Permanent}) // must not panic } // Performs a non-blocking select on an event channel.