Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ sudo: required
language: go
go: "1.10"
services: redis-server
env:
- REDIS_SOCKET_TYPE=tcp REDIS_URL="localhost:6379"
install: make bootstrap
before_script: redis-server --port 6380 &
script: make check_format tests
45 changes: 40 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
- [Request Fields](#request-fields)
- [Statistics](#statistics)
- [Debug Port](#debug-port)
- [Redis](#redis)
- [One Redis Instance](#one-redis-instance)
- [Two Redis Instances](#two-redis-instances)
- [Contact](#contact)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->
Expand All @@ -28,7 +31,7 @@

The rate limit service is a Go/gRPC service designed to enable generic rate limit scenarios from different types of
applications. Applications request a rate limit decision based on a domain and a set of descriptors. The service
reads the configuration from disk via [runtime](https://github.com/lyft/goruntime), composes a cache key, and talks to the redis cache. A
reads the configuration from disk via [runtime](https://github.com/lyft/goruntime), composes a cache key, and talks to the Redis cache. A
decision is then returned to the caller.

# Deprecation of Legacy Ratelimit Proto
Expand All @@ -55,13 +58,13 @@ to give time to community members running ratelimit off of `master`.

# Building and Testing

* Install redis-server.
* Install Redis-server.
* Make sure go is setup correctly and checkout rate limit service into your go path. More information about installing
go [here](https://golang.org/doc/install).
* In order to run the integration tests using a local default redis install you will also need these environment variables set:
* In order to run the integration tests using a local Redis server please run two Redis-server instances: one on port `6379` and another on port `6380`
```bash
export REDIS_SOCKET_TYPE=tcp
export REDIS_URL=localhost:6379
Redis-server --port 6379 &
Redis-server --port 6380 &
```
* To setup for the first time (only done once):
```bash
Expand Down Expand Up @@ -352,6 +355,38 @@ $ curl 0:6070/

You can specify the debug port with the `DEBUG_PORT` environment variable. It defaults to `6070`.

# Redis

Ratelimit uses Redis as its caching layer. Ratelimit supports two operation modes:

1. One Redis server for all limits.
1. Two Redis instances: one for per second limits and another one for all other limits.

## One Redis Instance

To configure one Redis instance use the following environment variables:

1. `REDIS_SOCKET_TYPE`
1. `REDIS_URL`
1. `REDIS_POOL_SIZE`

This setup will use the same Redis server for all limits.

## Two Redis Instances

To configure two Redis instances use the following environment variables:

1. `REDIS_SOCKET_TYPE`
1. `REDIS_URL`
1. `REDIS_POOL_SIZE`
1. `REDIS_PERSECOND`: set this to `"true"`.
1. `REDIS_PERSECOND_SOCKET_TYPE`
1. `REDIS_PERSECOND_URL`
1. `REDIS_PERSECOND_POOL_SIZE`

This setup will use the Redis server configured with the `_PERSECOND_` vars for
per second limits, and the other Redis server for all other limits.

# Contact

* [#lyft-envoy](http://webchat.freenode.net/?channels=lyft-envoy): IRC
Expand Down
119 changes: 99 additions & 20 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis

import (
"bytes"
"math"
"math/rand"
"strconv"
Expand All @@ -16,10 +17,17 @@ import (
)

type rateLimitCacheImpl struct {
pool Pool
pool Pool
// Optional Pool for a dedicated cache of per second limits.
// If this pool is nil, then the Cache will use the pool for all
// limits regardless of unit. If this pool is not nil, then it
// is used for limits that have a SECOND unit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i were to set a 3600 second limit it would end up in the per second pool. is that correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you meant a limit of 3600 with a SECOND unit, or a limit that has an expiration of 3600 seconds?

I think you mean the latter. You are right that the current code would need to be modified to handle cases like that, but the nice thing is that we currently don't support that type of unit semantics. So you can't specify a limit with an expiration of 3600 seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right.

perSecondPool Pool
timeSource TimeSource
jitterRand *rand.Rand
expirationJitterMaxSeconds int64
// bytes.Buffer pool used to efficiently generate cache keys.
bufferPool sync.Pool
}

// Convert a rate limit into a time divider.
Expand All @@ -45,23 +53,41 @@ func unitToDivider(unit pb.RateLimitResponse_RateLimit_Unit) int64 {
// @param descriptor supplies the descriptor to generate the key for.
// @param limit supplies the rate limit to generate the key for (may be nil).
// @param now supplies the current unix time.
// @return the cache key.
// @return cacheKey struct.
func (this *rateLimitCacheImpl) generateCacheKey(
domain string, descriptor *pb_struct.RateLimitDescriptor, limit *config.RateLimit, now int64) string {
domain string, descriptor *pb_struct.RateLimitDescriptor, limit *config.RateLimit, now int64) cacheKey {

if limit == nil {
return ""
return cacheKey{
key: "",
perSecond: false,
}
}

var cacheKey string = domain + "_"
b := this.bufferPool.Get().(*bytes.Buffer)
defer this.bufferPool.Put(b)
b.Reset()

b.WriteString(domain)
b.WriteByte('_')

for _, entry := range descriptor.Entries {
cacheKey += entry.Key + "_"
cacheKey += entry.Value + "_"
b.WriteString(entry.Key)
b.WriteByte('_')
b.WriteString(entry.Value)
b.WriteByte('_')
}

divider := unitToDivider(limit.Limit.Unit)
cacheKey += strconv.FormatInt((now/divider)*divider, 10)
return cacheKey
b.WriteString(strconv.FormatInt((now/divider)*divider, 10))

return cacheKey{
key: b.String(),
perSecond: isPerSecondLimit(limit.Limit.Unit)}
}

func isPerSecondLimit(unit pb.RateLimitResponse_RateLimit_Unit) bool {
return unit == pb.RateLimitResponse_RateLimit_SECOND
}

func max(a uint32, b uint32) uint32 {
Expand All @@ -71,22 +97,50 @@ func max(a uint32, b uint32) uint32 {
return b
}

type cacheKey struct {
key string
// True if the key corresponds to a limit with a SECOND unit. False otherwise.
perSecond bool
}

func pipelineAppend(conn Connection, key string, hitsAddend uint32, expirationSeconds int64) {
conn.PipeAppend("INCRBY", key, hitsAddend)
conn.PipeAppend("EXPIRE", key, expirationSeconds)
}

func pipelineFetch(conn Connection) uint32 {
ret := uint32(conn.PipeResponse().Int())
// Pop off EXPIRE response and check for error.
conn.PipeResponse()
return ret
}

func (this *rateLimitCacheImpl) DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus {

logger.Debugf("starting cache lookup")

conn := this.pool.Get()
defer this.pool.Put(conn)

// Optional connection for per second limits. If the cache has a perSecondPool setup,
// then use a connection from the pool for per second limits.
var perSecondConn Connection = nil
if this.perSecondPool != nil {
perSecondConn = this.perSecondPool.Get()
defer this.perSecondPool.Put(perSecondConn)
}

// request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request.
hitsAddend := max(1, request.HitsAddend)

// First build a list of all cache keys that we are actually going to hit. generateCacheKey()
// returns "" if there is no limit so that we can keep the arrays all the same size.
// returns an empty string in the key if there is no limit so that we can keep the arrays
// all the same size.
assert.Assert(len(request.Descriptors) == len(limits))
cacheKeys := make([]string, len(request.Descriptors))
cacheKeys := make([]cacheKey, len(request.Descriptors))
now := this.timeSource.UnixNow()
for i := 0; i < len(request.Descriptors); i++ {
cacheKeys[i] = this.generateCacheKey(request.Domain, request.Descriptors[i], limits[i], now)
Expand All @@ -99,7 +153,7 @@ func (this *rateLimitCacheImpl) DoLimit(

// Now, actually setup the pipeline, skipping empty cache keys.
for i, cacheKey := range cacheKeys {
if cacheKey == "" {
if cacheKey.key == "" {
continue
}
logger.Debugf("looking up cache key: %s", cacheKey)
Expand All @@ -109,15 +163,19 @@ func (this *rateLimitCacheImpl) DoLimit(
expirationSeconds += this.jitterRand.Int63n(this.expirationJitterMaxSeconds)
}

conn.PipeAppend("INCRBY", cacheKey, hitsAddend)
conn.PipeAppend("EXPIRE", cacheKey, expirationSeconds)
// Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit.
if perSecondConn != nil && cacheKey.perSecond {
pipelineAppend(perSecondConn, cacheKey.key, hitsAddend, expirationSeconds)
} else {
pipelineAppend(conn, cacheKey.key, hitsAddend, expirationSeconds)
}
}

// Now fetch the pipeline.
responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus,
len(request.Descriptors))
for i, cacheKey := range cacheKeys {
if cacheKey == "" {
if cacheKey.key == "" {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OK,
Expand All @@ -126,16 +184,22 @@ func (this *rateLimitCacheImpl) DoLimit(
}
continue
}
limitAfterIncrease := uint32(conn.PipeResponse().Int())
conn.PipeResponse() // Pop off EXPIRE response and check for error.

var limitAfterIncrease uint32
// Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit.
if this.perSecondPool != nil && cacheKey.perSecond {
limitAfterIncrease = pipelineFetch(perSecondConn)
} else {
limitAfterIncrease = pipelineFetch(conn)
}

limitBeforeIncrease := limitAfterIncrease - hitsAddend
overLimitThreshold := limits[i].Limit.RequestsPerUnit
// The nearLimitThreshold is the number of requests that can be made before hitting the NearLimitRatio.
// We need to know it in both the OK and OVER_LIMIT scenarios.
nearLimitThreshold := uint32(math.Floor(float64(float32(overLimitThreshold) * config.NearLimitRatio)))

logger.Debugf("cache key: %s current: %d", cacheKey, limitAfterIncrease)
logger.Debugf("cache key: %s current: %d", cacheKey.key, limitAfterIncrease)
if limitAfterIncrease > overLimitThreshold {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Expand Down Expand Up @@ -184,8 +248,23 @@ func (this *rateLimitCacheImpl) DoLimit(
return responseDescriptorStatuses
}

func NewRateLimitCacheImpl(pool Pool, timeSource TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64) RateLimitCache {
return &rateLimitCacheImpl{pool, timeSource, jitterRand, expirationJitterMaxSeconds}
func NewRateLimitCacheImpl(pool Pool, perSecondPool Pool, timeSource TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64) RateLimitCache {
return &rateLimitCacheImpl{
pool: pool,
perSecondPool: perSecondPool,
timeSource: timeSource,
jitterRand: jitterRand,
expirationJitterMaxSeconds: expirationJitterMaxSeconds,
bufferPool: newBufferPool(),
}
}

func newBufferPool() sync.Pool {
return sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
}

type timeSourceImpl struct{}
Expand Down
13 changes: 6 additions & 7 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redis
import (
"github.com/lyft/gostats"
"github.com/lyft/ratelimit/src/assert"
"github.com/lyft/ratelimit/src/settings"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
logger "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -65,13 +64,13 @@ func (this *poolImpl) Put(c Connection) {
}
}

func NewPoolImpl(scope stats.Scope) Pool {
s := settings.NewSettings()

logger.Warnf("connecting to redis on %s %s with pool size %d", s.RedisSocketType, s.RedisUrl, s.RedisPoolSize)
pool, err := pool.New(s.RedisSocketType, s.RedisUrl, s.RedisPoolSize)
func NewPoolImpl(scope stats.Scope, socketType string, url string, poolSize int) Pool {
logger.Warnf("connecting to redis on %s %s with pool size %d", socketType, url, poolSize)
pool, err := pool.New(socketType, url, poolSize)
checkError(err)
return &poolImpl{pool, newPoolStats(scope)}
return &poolImpl{
pool: pool,
stats: newPoolStats(scope)}
}

func (this *connectionImpl) PipeAppend(cmd string, args ...interface{}) {
Expand Down
10 changes: 9 additions & 1 deletion src/service_cmd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@ func Run() {
srv := server.NewServer("ratelimit", settings.GrpcUnaryInterceptor(nil))

s := settings.NewSettings()

var perSecondPool redis.Pool
if s.RedisPerSecond {
perSecondPool = redis.NewPoolImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondSocketType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize)

}

service := ratelimit.NewService(
srv.Runtime(),
redis.NewRateLimitCacheImpl(
redis.NewPoolImpl(srv.Scope().Scope("redis_pool")),
redis.NewPoolImpl(srv.Scope().Scope("redis_pool"), s.RedisSocketType, s.RedisUrl, s.RedisPoolSize),
perSecondPool,
redis.NewTimeSourceImpl(),
rand.New(redis.NewLockedSource(time.Now().Unix())),
s.ExpirationJitterMaxSeconds),
Expand Down
4 changes: 4 additions & 0 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type Settings struct {
RedisSocketType string `envconfig:"REDIS_SOCKET_TYPE" default:"unix"`
RedisUrl string `envconfig:"REDIS_URL" default:"/var/run/nutcracker/ratelimit.sock"`
RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"`
RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"`
RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"`
RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"`
RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"`
ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"`
}

Expand Down
Loading