Skip to content

Feature Request: CollectPlugin for Data Collection and Analysis #21

@genro

Description

@genro

Overview

Add a dedicated CollectPlugin for collecting handler execution data and providing analysis capabilities. This separates data collection concerns from output/logging (handled by LoggingPlugin in issue #20).

Motivation

Currently, LoggingPlugin mixes two concerns:

  1. Output (showing info in real-time)
  2. Collection (storing data for later analysis)

These should be separate plugins so users can:

  • Use LoggingPlugin alone for debugging/tutorials
  • Use CollectPlugin alone for performance monitoring
  • Use both together when needed

Proposed Features

1. Data Collection

Collect detailed execution data for each handler call:

  • Handler name
  • Arguments (serialized)
  • Return value (serialized)
  • Exception (if raised)
  • Execution time
  • Timestamp
  • Context information (switcher name, etc.)

2. Storage

  • In-memory storage with configurable max size
  • Automatic serialization of complex objects
  • Circular buffer (oldest entries discarded when full)

3. Analysis API

Provide rich query methods:

# Get recent calls
history = collect.history(last=100)
history = collect.history(first=50)
history = collect.history(handler='process')

# Performance analysis
slow_calls = collect.slowest(n=10)
fast_calls = collect.fastest(n=10)
slow_calls = collect.slower_than(threshold=0.1)  # > 100ms

# Error analysis
errors = collect.errors()
errors_by_handler = collect.errors(handler='process')

# Statistics
stats = collect.stats()  # Per-handler aggregates
stats = collect.stats(handler='process')  # Single handler

4. Export

Export collected data for external analysis:

# Export to file
collect.export('analysis.json')
collect.export('analysis.jsonl', format='jsonl')

# Export to dict
data = collect.to_dict()

# Clear history
collect.clear()

Example Usage

Basic Collection (no output)

from smartswitch import Switcher
from smartswitch.plugins import CollectPlugin

sw = Switcher(plugins=[CollectPlugin(max_history=5000)])

@sw
def process(data):
    return f"Processed: {data}"

# Make many calls...
for i in range(1000):
    sw('process')(f"item-{i}")

# Analyze later
slow_calls = sw.plugins['collect'].slowest(10)
error_calls = sw.plugins['collect'].errors()
stats = sw.plugins['collect'].stats()

Combined with LoggingPlugin

sw = Switcher(plugins=[
    LoggingPlugin(mode='print,after'),  # Real-time output
    CollectPlugin(max_history=10000)     # Background collection
])

# LoggingPlugin shows output as it happens
# CollectPlugin silently collects for later analysis

Performance Monitoring

collect = CollectPlugin(max_history=100000)
api = Switcher(plugins=[collect])

# ... run production workload ...

# Find performance bottlenecks
slow_handlers = collect.stats().sort_by('avg_time', reverse=True)
print(f"Slowest handler: {slow_handlers[0]['handler']} - {slow_handlers[0]['avg_time']:.4f}s")

# Export for detailed analysis
collect.export('performance_report.json')

API Design

Constructor

CollectPlugin(
    max_history: int = 1000,
    serialize: bool = True,  # Serialize args/results
    include_context: bool = True  # Include switcher name, timestamp, etc.
)

Query Methods

history(
    last: int | None = None,
    first: int | None = None,
    handler: str | None = None,
    errors: bool | None = None  # True=errors only, False=successes only
) -> list[dict]

slowest(n: int = 10) -> list[dict]
fastest(n: int = 10) -> list[dict]
slower_than(threshold: float) -> list[dict]

errors(handler: str | None = None) -> list[dict]

stats(handler: str | None = None) -> dict | list[dict]

Entry Format

{
    'handler': str,
    'switcher': str,
    'timestamp': float,
    'args': tuple,  # Serialized if serialize=True
    'kwargs': dict,
    'result': Any,  # If success, serialized if serialize=True
    'exception': {   # If error
        'type': str,
        'message': str,
        'traceback': str
    },
    'elapsed': float  # Seconds
}

Stats Format

{
    'handler_name': {
        'calls': int,
        'errors': int,
        'avg_time': float,
        'min_time': float,
        'max_time': float,
        'total_time': float,
        'last_call': float  # timestamp
    }
}

Implementation Notes

Serialization

For complex objects, provide basic serialization:

def _serialize(self, obj):
    """Serialize object for storage."""
    if isinstance(obj, (str, int, float, bool, type(None))):
        return obj
    if isinstance(obj, (list, tuple)):
        return [self._serialize(x) for x in obj]
    if isinstance(obj, dict):
        return {k: self._serialize(v) for k, v in obj.items()}
    # Complex objects -> string representation
    return repr(obj)

Memory Management

Use collections.deque with maxlen for automatic circular buffer:

self._history = deque(maxlen=self.max_history)

Thread Safety

Collection should be thread-safe:

self._lock = threading.Lock()

def wrap_handler(self, func, name, switcher):
    def wrapper(*args, **kwargs):
        # ... collect data ...
        with self._lock:
            self._history.append(entry)
        return result
    return wrapper

Benefits

Separation of concerns - Collection separate from logging
Performance monitoring - Find slow handlers and bottlenecks
Error tracking - Collect and analyze failures
Production-ready - Memory-bounded, thread-safe
Flexible - Use alone or with LoggingPlugin

Testing

Add tests for:

  • Data collection accuracy
  • Serialization of various types
  • Query methods (history, slowest, errors, stats)
  • Memory limits (max_history enforcement)
  • Thread safety (concurrent collection)
  • Export formats

Future Enhancements

  • Sampling (collect only X% of calls)
  • Custom serialization hooks
  • Integration with monitoring systems (Prometheus, StatsD)
  • Persistence to disk/database

Priority: Low-Medium (nice to have, not blocking)
Dependencies: None (standalone plugin)
Breaking: No (new feature)
Version: Target 0.11.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    deferredFeature deferred for future considerationenhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions