-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Overview
Add a dedicated CollectPlugin for collecting handler execution data and providing analysis capabilities. This separates data collection concerns from output/logging (handled by LoggingPlugin in issue #20).
Motivation
Currently, LoggingPlugin mixes two concerns:
- Output (showing info in real-time)
- Collection (storing data for later analysis)
These should be separate plugins so users can:
- Use LoggingPlugin alone for debugging/tutorials
- Use CollectPlugin alone for performance monitoring
- Use both together when needed
Proposed Features
1. Data Collection
Collect detailed execution data for each handler call:
- Handler name
- Arguments (serialized)
- Return value (serialized)
- Exception (if raised)
- Execution time
- Timestamp
- Context information (switcher name, etc.)
2. Storage
- In-memory storage with configurable max size
- Automatic serialization of complex objects
- Circular buffer (oldest entries discarded when full)
3. Analysis API
Provide rich query methods:
# Get recent calls
history = collect.history(last=100)
history = collect.history(first=50)
history = collect.history(handler='process')
# Performance analysis
slow_calls = collect.slowest(n=10)
fast_calls = collect.fastest(n=10)
slow_calls = collect.slower_than(threshold=0.1) # > 100ms
# Error analysis
errors = collect.errors()
errors_by_handler = collect.errors(handler='process')
# Statistics
stats = collect.stats() # Per-handler aggregates
stats = collect.stats(handler='process') # Single handler4. Export
Export collected data for external analysis:
# Export to file
collect.export('analysis.json')
collect.export('analysis.jsonl', format='jsonl')
# Export to dict
data = collect.to_dict()
# Clear history
collect.clear()Example Usage
Basic Collection (no output)
from smartswitch import Switcher
from smartswitch.plugins import CollectPlugin
sw = Switcher(plugins=[CollectPlugin(max_history=5000)])
@sw
def process(data):
return f"Processed: {data}"
# Make many calls...
for i in range(1000):
sw('process')(f"item-{i}")
# Analyze later
slow_calls = sw.plugins['collect'].slowest(10)
error_calls = sw.plugins['collect'].errors()
stats = sw.plugins['collect'].stats()Combined with LoggingPlugin
sw = Switcher(plugins=[
LoggingPlugin(mode='print,after'), # Real-time output
CollectPlugin(max_history=10000) # Background collection
])
# LoggingPlugin shows output as it happens
# CollectPlugin silently collects for later analysisPerformance Monitoring
collect = CollectPlugin(max_history=100000)
api = Switcher(plugins=[collect])
# ... run production workload ...
# Find performance bottlenecks
slow_handlers = collect.stats().sort_by('avg_time', reverse=True)
print(f"Slowest handler: {slow_handlers[0]['handler']} - {slow_handlers[0]['avg_time']:.4f}s")
# Export for detailed analysis
collect.export('performance_report.json')API Design
Constructor
CollectPlugin(
max_history: int = 1000,
serialize: bool = True, # Serialize args/results
include_context: bool = True # Include switcher name, timestamp, etc.
)Query Methods
history(
last: int | None = None,
first: int | None = None,
handler: str | None = None,
errors: bool | None = None # True=errors only, False=successes only
) -> list[dict]
slowest(n: int = 10) -> list[dict]
fastest(n: int = 10) -> list[dict]
slower_than(threshold: float) -> list[dict]
errors(handler: str | None = None) -> list[dict]
stats(handler: str | None = None) -> dict | list[dict]Entry Format
{
'handler': str,
'switcher': str,
'timestamp': float,
'args': tuple, # Serialized if serialize=True
'kwargs': dict,
'result': Any, # If success, serialized if serialize=True
'exception': { # If error
'type': str,
'message': str,
'traceback': str
},
'elapsed': float # Seconds
}Stats Format
{
'handler_name': {
'calls': int,
'errors': int,
'avg_time': float,
'min_time': float,
'max_time': float,
'total_time': float,
'last_call': float # timestamp
}
}Implementation Notes
Serialization
For complex objects, provide basic serialization:
def _serialize(self, obj):
"""Serialize object for storage."""
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
if isinstance(obj, (list, tuple)):
return [self._serialize(x) for x in obj]
if isinstance(obj, dict):
return {k: self._serialize(v) for k, v in obj.items()}
# Complex objects -> string representation
return repr(obj)Memory Management
Use collections.deque with maxlen for automatic circular buffer:
self._history = deque(maxlen=self.max_history)Thread Safety
Collection should be thread-safe:
self._lock = threading.Lock()
def wrap_handler(self, func, name, switcher):
def wrapper(*args, **kwargs):
# ... collect data ...
with self._lock:
self._history.append(entry)
return result
return wrapperBenefits
✅ Separation of concerns - Collection separate from logging
✅ Performance monitoring - Find slow handlers and bottlenecks
✅ Error tracking - Collect and analyze failures
✅ Production-ready - Memory-bounded, thread-safe
✅ Flexible - Use alone or with LoggingPlugin
Testing
Add tests for:
- Data collection accuracy
- Serialization of various types
- Query methods (history, slowest, errors, stats)
- Memory limits (max_history enforcement)
- Thread safety (concurrent collection)
- Export formats
Future Enhancements
- Sampling (collect only X% of calls)
- Custom serialization hooks
- Integration with monitoring systems (Prometheus, StatsD)
- Persistence to disk/database
Priority: Low-Medium (nice to have, not blocking)
Dependencies: None (standalone plugin)
Breaking: No (new feature)
Version: Target 0.11.0