Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc/test update: Recommend FileLockSQLiteBucket for all usage with SQLite + multiprocessing #95

Merged
merged 1 commit into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,9 @@ limiter = Limiter(
```

#### Concurrency
This backend is thread-safe, and may also be used with multiple child processes that share the same
`Limiter` object, e.g. if created with `ProcessPoolExecutor` or `multiprocessing.Process`.
This backend is thread-safe.

If you want to use SQLite with multiple processes with no shared state, for example if created by
running multiple scripts or by an external process, some additional protections are needed. For
If you want to use SQLite with multiprocessing, some additional protections are needed. For
these cases, a separate `FileLockSQLiteBucket` class is available. This requires installing the
[py-filelock](https://py-filelock.readthedocs.io) library.
```python
Expand Down
5 changes: 1 addition & 4 deletions pyrate_limiter/sqlite_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ class SQLiteBucket(AbstractBucket):
Notes on concurrency:

* Thread-safe
* Safe for use with multiple child processes with a shared initial state and using the same
:py:class:`.Limiter` object, e.g. if created with :py:class:`.ProcessPoolExecutor` or
:py:class:`multiprocessing.Process`.
* For other usage with multiple processes, see :py:class:`.FileLockSQLiteBucket`.
* For usage with multiprocessing, see :py:class:`.FileLockSQLiteBucket`.
* Transactions are locked at the bucket level, but not at the connection or database level.
* The default isolation level is used (autocommit).
* Multitple buckets may be used in parallel, but a given bucket will only be used by one
Expand Down
15 changes: 6 additions & 9 deletions tests/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Multithreaded and multiprocess stress tests"""
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from logging import getLogger
Expand Down Expand Up @@ -30,16 +29,15 @@
# TODO: This test could potentially be run for all bucket classes
# @pytest.mark.parametrize("bucket_class", [SQLiteBucket, MemoryListBucket, MemoryQueueBucket, RedisBucket])
@pytest.mark.parametrize("bucket_class", [SQLiteBucket])
@pytest.mark.parametrize("executor_class", [ThreadPoolExecutor, ProcessPoolExecutor])
def test_concurrency(executor_class, bucket_class):
def test_concurrency(bucket_class):
"""Make a fixed number of concurrent requests using a shared Limiter, and check the total time
they take to run
"""
logger.info(f"Testing {bucket_class.__name__} with {executor_class.__name__}")
logger.info(f"Testing {bucket_class.__name__}")

# Set up limiter
bucket_kwargs = {
"path": join(gettempdir(), f"test_{executor_class.__name__}.sqlite"),
"path": join(gettempdir(), "test_concurrency.sqlite"),
}
limiter = Limiter(
RequestRate(LIMIT_REQUESTS_PER_SECOND, Duration.SECOND),
Expand All @@ -48,20 +46,19 @@ def test_concurrency(executor_class, bucket_class):
)

# Set up request function
bucket_ids = [f"{executor_class.__name__}_bucket_{i}" for i in range(N_BUCKETS)]
bucket_ids = [f"bucket_{i}" for i in range(N_BUCKETS)]
start_time = perf_counter()
request_func = partial(_send_request, limiter, bucket_ids, start_time)

# Distribute requests across workers
with executor_class(max_workers=N_WORKERS) as executor:
with ThreadPoolExecutor(max_workers=N_WORKERS) as executor:
list(executor.map(request_func, range(N_REQUESTS), timeout=300))

# Check total time, with debug logging
elapsed = perf_counter() - start_time
expected_min_time = (N_REQUESTS - 1) / LIMIT_REQUESTS_PER_SECOND
worker_type = "threads" if executor_class is ThreadPoolExecutor else "processes"
logger.info(
f"Ran {N_REQUESTS} requests with {N_WORKERS} {worker_type} in {elapsed:.2f} seconds\n"
f"Ran {N_REQUESTS} requests with {N_WORKERS} threads in {elapsed:.2f} seconds\n"
f"With a rate limit of {LIMIT_REQUESTS_PER_SECOND}/second, expected at least "
f"{expected_min_time} seconds"
)
Expand Down