Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a SQLite bucket #59

Merged
merged 3 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [2.6.0] - TBD
### Added
* SQLite bucket backend

## [2.5.0] - 2021-12-08
### Added
* Custom time source

## [2.4.6] - 2021-09-30
* Add `RedisClusterBucket` to support using `PyrateLimiter` with `redis-py-cluster`
* Update README, add Table of Content
Expand Down
69 changes: 63 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ The request rate limiter using Leaky-bucket algorithm
- [PyrateLimiter](#pyratelimiter)
* [Introduction](#introduction)
* [Available modules](#available-modules)
* [Bucket backends](#bucket-backends)
+ [Memory](#memory)
+ [SQLite](#sqlite)
+ [Redis](#redis)
+ [Custom backends](#custom-backends)
* [Strategies](#strategies)
+ [Subscription strategies](#subscription-strategies)
+ [BucketFullException](#bucketfullexception)
Expand All @@ -28,13 +33,8 @@ The request rate limiter using Leaky-bucket algorithm
* [Development](#development)
* [Notes](#notes)

<small><i><a href='http://ecotrust-canada.github.io/markdown-toc/'>Table of contents generated with markdown-toc</a></i></small>

## Introduction
This module can be used to apply rate-limit for API request. User defines window duration and the limit of function calls within such interval.
To hold the state of the Bucket, you can use MemoryListBucket/MemoryQueueBucket as internal bucket.
To use PyrateLimiter with Redis, redis-py is required to be installed.
It is also possible to use your own Bucket implementation, by extending AbstractBucket from pyrate_limiter.core

## Available modules
```python
Expand All @@ -45,9 +45,66 @@ from pyrate_limiter import (
Limiter,
MemoryListBucket,
MemoryQueueBucket,
SQLiteBucket,
RedisBucket,
RedisClusterBucket,
)
```

## Bucket backends
A few different bucket backends are available, which can be selected using the `bucket_class`
argument for `Limiter`. Any additional backend-specific arguments can be passed
via `bucket_kwargs`.

### Memory
The default bucket is stored in memory, backed by a `queue.Queue`. A list implementation is also available:
```python
from pyrate_limiter import Limiter, MemoryListBucket

limiter = Limiter(bucket_class=MemoryListBucket)
```

### SQLite
If you need to persist the bucket state, a SQLite backend is available.

By default it will store the state in the system temp directory, and you can use
the `path` argument to use a different location:
```python
from pyrate_limiter import Limiter, SQLiteBucket

limiter = Limiter(
bucket_class=SQLiteBucket,
bucket_kwargs={'path': '/tmp/pyrate_limiter.sqlite'},
)
```

### Redis
If you have a larger, distributed application, Redis is an ideal backend. This
option requires [redis-py](https://github.com/andymccurdy/redis-py).

You can use the `redis_pool` argument to pass any connection settings:
```python
from pyrate_limiter import Limiter, RedisBucket
from redis import ConnectionPool

redis_pool = ConnectionPool(host='localhost', port=6379, db=0)
limiter = Limiter(
bucket_class=RedisBucket,
bucket_kwargs={'redis_pool': redis_pool},
)
```

Redis clusters are also supported, which requires
[redis-py-cluster](https://github.com/Grokzen/redis-py-cluster):
```python
from pyrate_limiter import Limiter, RedisClusterBucket

limiter = Limiter(bucket_class=RedisClusterBucket)
```

### Custom backends
If these don't suit your needs, you can also create your own bucket backend by extending `pyrate_limiter.bucket.AbstractBucket`.

## Strategies

### Subscription strategies
Expand Down Expand Up @@ -109,7 +166,7 @@ limiter.try_acquire(item)
```

### BucketFullException
If the Bucket is full, an exception *BucketFullException* will be raised, with meta-info about the identity it received, the rate that has raised, and the remaining time until the next request can be processed.
If the Bucket is full, an exception `BucketFullException` will be raised, with meta-info about the identity it received, the rate that has raised, and the remaining time until the next request can be processed.

```python
rate = RequestRate(3, 5 * Duration.SECOND)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyrate-limiter"
version = "2.5.0"
version = "2.6.0"
description = "Python Rate-Limiter using Leaky-Bucket Algorimth Family"
authors = ["vutr <me@vutr.io>"]
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions pyrate_limiter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from .exceptions import *
from .limiter import *
from .request_rate import *
from .sqlite_bucket import *
84 changes: 84 additions & 0 deletions pyrate_limiter/sqlite_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import sqlite3
from hashlib import sha1
from os.path import join
from tempfile import gettempdir
from typing import List

from pyrate_limiter.bucket import AbstractBucket


class SQLiteBucket(AbstractBucket):
"""Bucket backed by a SQLite database

Args:
maxsize: Maximum number of items in the bucket
identity: Bucket identity, used as the table name
path: Path to the SQLite database file; defaults to a temp file in the system temp directory
kwargs: Additional keyword arguments for :py:func:`sqlite3.connect`
"""

def __init__(self, maxsize=0, identity: str = None, path: str = None, **kwargs):
super().__init__(maxsize=maxsize)
self._connection = None
self.connection_kwargs = kwargs
self.path = path or join(gettempdir(), "pyrate_limiter.sqlite")

if not identity:
raise ValueError("Bucket identity is required")

# Hash identity to use as a table name, to avoid potential issues with user-provided values
self.table = f"ratelimit_{sha1(identity.encode()).hexdigest()}"

@property
def connection(self) -> sqlite3.Connection:
"""Create a database connection and initialize the table, if it hasn't already been done.
This is safe to leave open, but may be manually closed with :py:meth:`.close`, if needed.
"""
if not self._connection:
self.connection_kwargs.setdefault("isolation_level", None) # Use autocommit by default
self._connection = sqlite3.connect(self.path, **self.connection_kwargs)
self._connection.execute(
f"CREATE TABLE IF NOT EXISTS {self.table} ("
"idx INTEGER PRIMARY KEY AUTOINCREMENT, "
"value REAL)"
)
return self._connection

def close(self):
"""Close the database connection"""
if self._connection:
self._connection.close()

def size(self) -> int:
return self.connection.execute(f"SELECT COUNT(*) FROM {self.table}").fetchone()[0]

def put(self, item: float) -> int:
"""Put an item in the bucket.
Return 1 if successful, else 0
"""
if self.size() < self.maxsize():
self.connection.execute(f"INSERT INTO {self.table} (value) VALUES (?)", (item,))
return 1
return 0

def get(self, number: int = 1) -> int:
"""Get items and remove them from the bucket in the FIFO fashion.
Return the number of items that have been removed.
"""
keys = [str(key) for key in self._get_keys(number)]

placeholders = ",".join("?" * len(keys))
self.connection.execute(f"DELETE FROM {self.table} WHERE idx IN ({placeholders})", keys)

return len(keys)

def _get_keys(self, number: int = 1) -> List[float]:
rows = self.connection.execute(
f"SELECT idx FROM {self.table} ORDER BY idx LIMIT ?", (number,)
).fetchall()
return [row[0] for row in rows]

def all_items(self) -> List[float]:
"""Return a list as copies of all items in the bucket"""
rows = self.connection.execute(f"SELECT value FROM {self.table} ORDER BY idx").fetchall()
return [row[0] for row in rows]
87 changes: 87 additions & 0 deletions tests/test_sqlite_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# TODO: This could be turned into either a test class or parametrized tests
# to run for all bucket classes
from sqlite3 import ProgrammingError
from tempfile import gettempdir
from time import time

import pytest

from pyrate_limiter.sqlite_bucket import SQLiteBucket


def get_test_bucket():
return SQLiteBucket(identity="id", path=":memory:", maxsize=1000)


def test_init():
# The system temp directory should be used unless a path is provided
assert SQLiteBucket(identity="id").path.startswith(gettempdir())
assert SQLiteBucket(identity="id", path="bucket.db").path == "bucket.db"

# The table name should be hashed
assert SQLiteBucket(identity="id").table == "ratelimit_87ea5dfc8b8e384d848979496e706390b497e547"

# An identity is required since it's used as the table name
with pytest.raises(ValueError):
SQLiteBucket()


def test_close():
bucket = get_test_bucket()
connection = bucket.connection
bucket.close()
with pytest.raises(ProgrammingError):
connection.execute("SELECT * FROM default_bucket")

# Closing an unopened connection should have no effect
bucket = get_test_bucket()
bucket.close()
assert bucket._connection is None


def test_put_get_size():
"""Test put, get, and size methods"""
bucket = get_test_bucket()
for i in range(1000):
bucket.put(i)
assert bucket.size() == 1000

# Items should be retrieved in FIFO order, optionally with multiple items at a time
for _ in range(10):
assert bucket.get() == 1
assert bucket.get(10) == 10

next_key = bucket._get_keys()[0]
assert next_key == 21
assert bucket.size() == 980


def test_maxsize():
"""Test that no more items can be added when the bucket is full"""
bucket = get_test_bucket()
for i in range(1000):
assert bucket.put(i) == 1
assert bucket.put(1) == 0
assert bucket.size() == 1000


def test_all_items():
bucket = get_test_bucket()
for i in range(10):
bucket.put(i + 0.1)
assert bucket.all_items() == [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1]


def test_inspect_expired_items():
bucket = get_test_bucket()
current_time = time()

# Add 30 items at 1-second intervals
for i in range(30):
seconds_ago = 29 - i
bucket.put(current_time - seconds_ago)

# Expect 10 expired items within a time window starting 10 seconds ago
item_count, remaining_time = bucket.inspect_expired_items(current_time - 10)
assert item_count == 10
assert remaining_time == 1.0