Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TTLs to DynamoDB implementation #35

Merged
merged 3 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add TTL. minor adjustments to db patterns
  • Loading branch information
Sayan Samanta committed Mar 26, 2020
commit 2b10f8bba96956cee753e3aa8129d9877ffeb989
11 changes: 7 additions & 4 deletions dynamodb/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ func (b *bucket) Reset() time.Time {
func (b *bucket) Add(amount uint) (leakybucket.BucketState, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
dbBucket, err := b.db.bucket(b.name)
// Storage.Create guarantees the DB Bucket with a configured TTL. For long running executions it
// is possible old buckets will get deleted, so we use `findOrCreate` rather than `bucket`
dbBucket, err := b.db.findOrCreateBucket(b.name, b.rate)
if err != nil {
return b.state(), err
}
if dbBucket.expired() {
dbBucket, err = b.db.resetBucket(*dbBucket, time.Now().Add(b.rate))
dbBucket, err = b.db.resetBucket(*dbBucket, b.rate)
if err != nil {
return b.state(), err
}
Expand Down Expand Up @@ -96,7 +98,7 @@ func (s *Storage) Create(name string, capacity uint, rate time.Duration) (leakyb
rate: rate,
db: s.db,
}
dbBucket, err := s.db.createOrFindBucket(newDDBBucket(name, bucket.reset))
dbBucket, err := s.db.findOrCreateBucket(name, rate)
if err != nil {
return nil, err
}
Expand All @@ -114,12 +116,13 @@ func (s *Storage) Create(name string, capacity uint, rate time.Duration) (leakyb
}

// New initializes the connection to dynamodb
func New(tableName string, s *session.Session) (*Storage, error) {
func New(tableName string, s *session.Session, itemTTL time.Duration) (*Storage, error) {
ddb := dynamodb.New(s)

db := bucketDB{
ddb: ddb,
tableName: tableName,
ttl: itemTTL,
}

// fail early if the table doesn't exist or we have any other issues with the DynamoDB API
Expand Down
39 changes: 25 additions & 14 deletions dynamodb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
type bucketDB struct {
ddb dynamodbiface.DynamoDBAPI
tableName string
ttl time.Duration
}

type ddbBucketStatePrimaryKey struct {
Expand Down Expand Up @@ -47,27 +48,32 @@ func (d ddbBucketStatePrimaryKey) KeySchema() []*dynamodb.KeySchemaElement {
// ddbBucket implements the db interface using dynamodb as the backend
type ddbBucket struct {
ddbBucketStatePrimaryKey
// Value is the sum of all increments in the current sliding window for the bucket
Value uint `dynamodbav:"value"`
// Expiration indicates when the current rate limit expires. We opt not to use DyanamoDB TTLs
// because they don't have strong deletion guarantees.
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
// "DynamoDB typically deletes expired items within 48 hours of expiration. The exact duration within
// which an item truly gets deleted after expiration is specific to the nature of the workload
// and the size of the table."
Expiration time.Time `dynamodbav:"expiration,unixtime"`
// Value is the sum of all increments in the current sliding window for the bucket
Value uint `dynamodbav:"value"`
// Version is an internal field used to control flushing/draining the Value field concurrently
Version uint `dynamodbav:"version"`
// TTL is an internal attribute to define how long the item will live in dynamodb prior to being
// set for removal. This TTL mechanism is only used for good hygiene to ensure we don't leave
// unused buckets in the database forever
TTL time.Time `dynamodbav:"_ttl,unixtime"`
}

func newDDBBucket(name string, expiration time.Time) ddbBucket {
func newDDBBucket(name string, expiresIn time.Duration, ttl time.Duration) ddbBucket {
return ddbBucket{
ddbBucketStatePrimaryKey: ddbBucketStatePrimaryKey{
Name: name,
},
Expiration: time.Now().Add(expiresIn),
Value: 0,
Expiration: expiration,
Version: 0,
TTL: time.Now().Add(ttl),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: my OCD gets triggered by these two calls to time.Now() and how they'll be slightly different. Can be replaced with one call + storing it in a local variable e.g. now := time.Now()

}
}

Expand Down Expand Up @@ -113,7 +119,16 @@ func (db bucketDB) bucket(name string) (*ddbBucket, error) {
return decodeBucket(res.Item)
}

func (db bucketDB) createOrFindBucket(bucket ddbBucket) (*ddbBucket, error) {
func (db bucketDB) findOrCreateBucket(name string, expiresIn time.Duration) (*ddbBucket, error) {
dbBucket, err := db.bucket(name)
if err == nil {
return dbBucket, nil
} else if err != errBucketNotFound {
return nil, err
}

// otherwise create the bucket
bucket := newDDBBucket(name, expiresIn, db.ttl)
data, err := encodeBucket(bucket)
if err != nil {
return nil, err
Expand All @@ -130,7 +145,8 @@ func (db bucketDB) createOrFindBucket(bucket ddbBucket) (*ddbBucket, error) {
if !awsErr(err, dynamodb.ErrCodeConditionalCheckFailedException) {
return nil, err
}
// for existing buckets simply fetch
// insane edge case because we know we can have multiple consumers
// for existing buckets simply re-fetch
return db.bucket(bucket.Name)
}

Expand Down Expand Up @@ -170,20 +186,15 @@ func (db bucketDB) incrementBucketValue(name string, amount, capacity uint) (*dd
}

// resetBucket will reset the bucket's value to 0 iff the versions match
func (db bucketDB) resetBucket(bucket ddbBucket, expiration time.Time) (*ddbBucket, error) {
func (db bucketDB) resetBucket(bucket ddbBucket, expiresIn time.Duration) (*ddbBucket, error) {
// dbMaxVersion is an arbitrary constant to prevent the version field from overflowing
var dbMaxVersion uint = 2 << 28
newVersion := bucket.Version + 1
if newVersion > dbMaxVersion {
newVersion = 0
}

updatedBucket := ddbBucket{
ddbBucketStatePrimaryKey: bucket.ddbBucketStatePrimaryKey,
Value: 0,
Expiration: expiration,
Version: newVersion,
}
updatedBucket := newDDBBucket(bucket.ddbBucketStatePrimaryKey.Name, expiresIn, db.ttl)
updatedBucket.Version = newVersion
data, err := encodeBucket(updatedBucket)
if err != nil {
return nil, err
Expand Down