Skip to content

Commit bb29004

Browse files
committed
Update cache on CAS for DDB
1 parent a5fa68d commit bb29004

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

pkg/ring/kv/dynamodb/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,12 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
189189
}
190190

191191
if len(putRequests) > 0 || len(deleteRequests) > 0 {
192-
return c.kv.Batch(ctx, putRequests, deleteRequests)
192+
err = c.kv.Batch(ctx, putRequests, deleteRequests)
193+
if err != nil {
194+
return err
195+
}
196+
c.updateStaleData(key, r, time.Now().UTC())
197+
return nil
193198
}
194199

195200
if len(putRequests) == 0 && len(deleteRequests) == 0 {

pkg/ring/kv/dynamodb/client_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,40 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
228228
})
229229
}
230230

231+
func Test_CAS_UpdateStale(t *testing.T) {
232+
ddbMock := NewDynamodbClientMock()
233+
codecMock := &CodecMock{}
234+
descMock := &DescMock{}
235+
descMockResult := &DescMock{id: "result"}
236+
startTime := time.Now().UTC().Add(-time.Millisecond)
237+
238+
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
239+
expectedUpdatedKeys := []string{"t1", "t2"}
240+
expectedUpdated := map[string][]byte{
241+
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
242+
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
243+
}
244+
expectedBatch := map[dynamodbKey][]byte{
245+
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
246+
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
247+
}
248+
249+
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
250+
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
251+
descMock.On("Clone").Return(descMock).Once()
252+
descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once()
253+
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
254+
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
255+
256+
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
257+
return descMockResult, true, nil
258+
})
259+
260+
require.NoError(t, err)
261+
require.Equal(t, descMockResult, c.staleData[key].data)
262+
require.True(t, startTime.Before(c.staleData[key].timestamp))
263+
}
264+
231265
func Test_WatchPrefix(t *testing.T) {
232266
ddbMock := NewDynamodbClientMock()
233267
codecMock := &CodecMock{}
@@ -365,6 +399,7 @@ func (m *CodecMock) DecodeMultiKey(map[string][]byte) (interface{}, error) {
365399
}
366400

367401
type DescMock struct {
402+
id string
368403
mock.Mock
369404
}
370405

0 commit comments

Comments
 (0)