Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flushing limiter #71

Merged
merged 6 commits into from
Apr 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ except BucketFullException as err:
# {'error': 'Bucket for vutran with Rate 3/5 is already full', 'identity': 'tranvu', 'rate': '5/5', 'remaining_time': 2}
```

- [ ] *RequestRate may be required to `reset` on a fixed schedule, eg: every first-day of a month

### Decorator
Rate-limiting is also available in decorator form, using `Limiter.ratelimit`. Example:
```python
Expand Down Expand Up @@ -330,9 +328,6 @@ When the number of incoming requets go beyond the limit, we can either do..
### More complex scenario
https://www.keycdn.com/support/rate-limiting#types-of-rate-limits

- [ ] *Sometimes, we may need to apply specific rate-limiting strategies based on schedules/region or some other metrics. It
requires the capability to `switch` the strategies instantly without re-deploying the whole service.

## Development

### Setup & Commands
Expand Down
14 changes: 13 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyrate-limiter"
version = "2.7.0"
version = "2.8.0"
description = "Python Rate-Limiter using Leaky-Bucket Algorimth Family"
authors = ["vutr <me@vutr.io>"]
license = "MIT"
Expand Down Expand Up @@ -43,6 +43,7 @@ pytest-asyncio = "^0.12"
pytest-cov = "^3.0"
pytest-xdist = "^2.5.0"
PyYAML = "^5.4.1"
schedule = "^1.1.0"

[tool.black]
line-length = 120
Expand Down
15 changes: 15 additions & 0 deletions pyrate_limiter/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def get(self, number: int) -> float:
def all_items(self) -> List[float]:
"""Return a list as copies of all items in the bucket"""

@abstractmethod
def flush(self) -> None:
"""Flush/reset bucket"""

def inspect_expired_items(self, time: float) -> Tuple[int, float]:
"""Find how many items in bucket that have slipped out of the time-window

Expand Down Expand Up @@ -95,6 +99,10 @@ def get(self, number):
def all_items(self):
return list(self._q.queue)

def flush(self):
while not self._q.empty():
self._q.get()


class MemoryListBucket(AbstractBucket):
"""A bucket that resides in memory
Expand Down Expand Up @@ -128,6 +136,9 @@ def get(self, number):
def all_items(self):
return self._q.copy()

def flush(self):
self._q = list()


class RedisBucket(AbstractBucket):
"""A bucket with Redis
Expand Down Expand Up @@ -193,6 +204,10 @@ def all_items(self):
items = conn.lrange(self._bucket_name, 0, -1)
return sorted([float(i.decode("utf-8")) for i in items])

def flush(self):
conn = self.get_connection()
conn.delete(self._bucket_name)


class RedisClusterBucket(RedisBucket):
"""A bucket with RedisCluster"""
Expand Down
12 changes: 10 additions & 2 deletions pyrate_limiter/limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Any
from typing import Callable
from typing import Dict
from typing import Type
from typing import Union

from .bucket import AbstractBucket
Expand All @@ -22,7 +21,7 @@ class Limiter:
def __init__(
self,
*rates: RequestRate,
bucket_class: Type[AbstractBucket] = MemoryQueueBucket,
bucket_class=MemoryQueueBucket,
bucket_kwargs=None,
time_function: Callable[[], float] = None,
):
Expand Down Expand Up @@ -124,3 +123,12 @@ def get_current_volume(self, identity) -> int:
"""Get current bucket volume for a specific identity"""
bucket = self.bucket_group[identity]
return bucket.size()

def flush_all(self) -> int:
cnt = 0

for _, bucket in self.bucket_group.items():
bucket.flush()
cnt += 1

return cnt
4 changes: 4 additions & 0 deletions pyrate_limiter/sqlite_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def all_items(self) -> List[float]:
rows = self.connection.execute(f"SELECT value FROM {self.table} ORDER BY idx").fetchall()
return [row[0] for row in rows]

def flush(self):
self.connection.execute(f"DELETE FROM {self.table}")
self.connection.commit()


# Create file lock in module scope to reuse across buckets
try:
Expand Down
6 changes: 6 additions & 0 deletions tests/test_01.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def test_simple_02():
# Exceed Rate-2 again
limiter2.try_acquire(item)

assert limiter2.flush_all() == 1
assert limiter2.get_current_volume(item) == 0


def test_simple_03():
"""Single-rate Limiter with MemoryListBucket"""
Expand Down Expand Up @@ -232,6 +235,9 @@ def test_simple_04():
# Exceed Rate-2 again
limiter2.try_acquire(item)

assert limiter2.flush_all() == 1
assert limiter2.get_current_volume(item) == 0


def test_remaining_time(time_function):
"""The remaining_time metadata returned from a BucketFullException should take into account
Expand Down
24 changes: 24 additions & 0 deletions tests/test_02.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@ def test_simple_02(time_function):
limiter4.try_acquire(item)


def test_flushing():
"""Multi-rates Limiter with RedisBucket"""
rate_1 = RequestRate(5, 5 * Duration.SECOND)
limiter = Limiter(
rate_1,
bucket_class=RedisBucket,
bucket_kwargs={
"redis_pool": pool,
"bucket_name": "Flushing-Bucket",
},
)
item = "redis-test-item"

for _ in range(3):
limiter.try_acquire(item)

size = limiter.get_current_volume(item)
assert size == 3
assert limiter.flush_all() == 1

size = limiter.get_current_volume(item)
assert size == 0


def test_redis_cluster():
"""Testing RedisClusterBucket initialization"""
rate = RequestRate(3, 5 * Duration.SECOND)
Expand Down
6 changes: 6 additions & 0 deletions tests/test_sqlite_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ def test_inspect_expired_items():
assert item_count == 10
# Expect 1 second until the next item expires
assert remaining_time == 1.0


def test_flushing_bucket():
bucket = get_test_bucket()
bucket.flush()
assert bucket.size() == 0