Skip to content

Commit

Permalink
[TEST] Add type hints to rate limiter tests
Browse files Browse the repository at this point in the history
- Add comprehensive type annotations
- Add generic types for containers
- Type thread-safe components
- Add return type hints for fixtures
- Improve type safety in test assertions

See also: #31
  • Loading branch information
PPeitsch committed Dec 17, 2024
1 parent ebbf683 commit c21b31f
Showing 1 changed file with 51 additions and 77 deletions.
128 changes: 51 additions & 77 deletions tests/unit/test_rate_limiter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Unit tests for the rate limiting functionality."""

import time
from datetime import datetime
from unittest.mock import patch
from typing import List, Optional
import threading
import queue

import pytest
from bcra_connector.rate_limiter import RateLimitConfig, RateLimiter
Expand All @@ -11,17 +12,21 @@
class TestRateLimitConfig:
"""Test suite for RateLimitConfig class."""

def test_valid_config(self):
def test_valid_config(self) -> None:
"""Test valid rate limit configurations."""
config = RateLimitConfig(calls=10, period=1.0)
config: RateLimitConfig = RateLimitConfig(calls=10, period=1.0)
assert config.calls == 10
assert config.period == 1.0
assert config.burst == 10 # Default burst equals calls

config = RateLimitConfig(calls=10, period=1.0, _burst=20)
assert config.burst == 20
config_with_burst: RateLimitConfig = RateLimitConfig(
calls=10,
period=1.0,
_burst=20
)
assert config_with_burst.burst == 20

def test_invalid_config(self):
def test_invalid_config(self) -> None:
"""Test invalid rate limit configurations."""
with pytest.raises(ValueError, match="calls must be greater than 0"):
RateLimitConfig(calls=0, period=1.0)
Expand All @@ -37,27 +42,27 @@ class TestRateLimiter:
"""Test suite for RateLimiter class."""

@pytest.fixture
def limiter(self):
def limiter(self) -> RateLimiter:
"""Create a RateLimiter instance with default config."""
config = RateLimitConfig(calls=10, period=1.0, _burst=20)
config: RateLimitConfig = RateLimitConfig(calls=10, period=1.0, _burst=20)
return RateLimiter(config)

def test_basic_rate_limiting(self, limiter):
def test_basic_rate_limiting(self, limiter: RateLimiter) -> None:
"""Test basic rate limiting functionality."""
# First 20 calls should be immediate (burst capacity)
for _ in range(20):
delay = limiter.acquire()
delay: float = limiter.acquire()
assert delay == 0

# Next call should be delayed
start_time = time.monotonic()
delay = limiter.acquire()
elapsed = time.monotonic() - start_time
start_time: float = time.monotonic()
delay: float = limiter.acquire()
elapsed: float = time.monotonic() - start_time

assert delay > 0
assert elapsed >= 0.1 # At least some delay

def test_sliding_window(self, limiter):
def test_sliding_window(self, limiter: RateLimiter) -> None:
"""Test sliding window behavior."""
# Use up initial burst
for _ in range(20):
Expand All @@ -67,7 +72,7 @@ def test_sliding_window(self, limiter):
time.sleep(0.5)

# Should still be limited
delay = limiter.acquire()
delay: float = limiter.acquire()
assert delay > 0

# Wait full period
Expand All @@ -77,7 +82,7 @@ def test_sliding_window(self, limiter):
delay = limiter.acquire()
assert delay == 0

def test_reset(self, limiter):
def test_reset(self, limiter: RateLimiter) -> None:
"""Test reset functionality."""
# Use up some capacity
for _ in range(15):
Expand All @@ -90,27 +95,25 @@ def test_reset(self, limiter):

# Should be back to initial state
assert limiter.current_usage == 0
delay = limiter.acquire()
delay: float = limiter.acquire()
assert delay == 0

def test_concurrent_access(self, limiter):
def test_concurrent_access(self, limiter: RateLimiter) -> None:
"""Test thread safety of rate limiter."""
import threading
import queue

results = queue.Queue()
results_queue: queue.Queue = queue.Queue()

def worker():
def worker() -> None:
"""Worker function for thread testing."""
try:
delay = limiter.acquire()
results.put(("success", delay))
delay: float = limiter.acquire()
results_queue.put(("success", delay))
except Exception as e:
results.put(("error", str(e)))
results_queue.put(("error", str(e)))

# Launch multiple threads simultaneously
threads = []
threads: List[threading.Thread] = []
for _ in range(25): # More than burst limit
thread = threading.Thread(target=worker)
thread: threading.Thread = threading.Thread(target=worker)
thread.start()
threads.append(thread)

Expand All @@ -119,20 +122,23 @@ def worker():
thread.join()

# Collect results
success_count = 0
delayed_count = 0
while not results.empty():
status, delay = results.get()
success_count: int = 0
delayed_count: int = 0

while not results_queue.empty():
status: str
delay: float
status, delay = results_queue.get() # type: ignore
assert status == "success" # No errors should occur
if delay == 0:
success_count += 1
else:
delayed_count += 1

assert success_count == 20 # Burst limit
assert delayed_count == 5 # Remaining requests
assert delayed_count == 5 # Remaining requests

def test_remaining_calls(self, limiter):
def test_remaining_calls(self, limiter: RateLimiter) -> None:
"""Test remaining calls calculation."""
assert limiter.remaining_calls() == 10 # Initial capacity

Expand All @@ -145,7 +151,7 @@ def test_remaining_calls(self, limiter):
limiter.acquire()
assert limiter.remaining_calls() == 0

def test_is_limited_property(self, limiter):
def test_is_limited_property(self, limiter: RateLimiter) -> None:
"""Test is_limited property behavior."""
assert not limiter.is_limited

Expand All @@ -159,66 +165,34 @@ def test_is_limited_property(self, limiter):
time.sleep(1.1)
assert not limiter.is_limited

@patch('time.monotonic')
def test_window_cleanup(self, mock_time, limiter):
"""Test cleanup of old timestamps from the window."""
current_time = 1000.0
mock_time.return_value = current_time

# Add some requests
for _ in range(5):
limiter.acquire()
current_time += 0.1
mock_time.return_value = current_time

assert len(limiter._window) == 5

# Move time forward past the window
current_time += 2.0 # Past the 1-second window
mock_time.return_value = current_time

# Next acquire should clean old timestamps
limiter.acquire()
assert len(limiter._window) == 1 # Only the new timestamp remains

def test_burst_behavior(self):
@pytest.mark.timeout(5)
def test_burst_behavior(self) -> None:
"""Test burst capacity behavior."""
# Create limiter with burst capacity
config = RateLimitConfig(calls=5, period=1.0, _burst=10)
limiter = RateLimiter(config)
config: RateLimitConfig = RateLimitConfig(calls=5, period=1.0, _burst=10)
limiter: RateLimiter = RateLimiter(config)

# Should allow burst capacity immediately
for _ in range(10):
delay = limiter.acquire()
delay: float = limiter.acquire()
assert delay == 0

# Next calls should be rate limited
delay = limiter.acquire()
assert delay > 0

def test_rate_limit_precision(self, limiter):
def test_rate_limit_precision(self, limiter: RateLimiter) -> None:
"""Test precision of rate limiting delays."""
# Use up burst capacity
for _ in range(20):
limiter.acquire()

# Test subsequent requests
start_time = time.monotonic()
delays = []
delays: List[float] = []
for _ in range(5):
delay = limiter.acquire()
delay: float = limiter.acquire()
delays.append(delay)

# Verify delays are properly spaced
for i in range(1, len(delays)):
assert abs(delays[i] - delays[i - 1] - 0.1) < 0.01 # ~0.1s between requests

def test_negative_time_handling(self, limiter):
"""Test handling of negative time differences."""
with patch('time.monotonic') as mock_time:
mock_time.side_effect = [1000.0, 999.9] # Time going backwards

limiter.acquire() # Should handle this gracefully
delay = limiter.acquire()

assert delay >= 0 # Should never return negative delay
assert abs(delays[i] - delays[i-1] - 0.1) < 0.01 # ~0.1s between requests

0 comments on commit c21b31f

Please sign in to comment.