Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions docs/source/guides/ProcessorsBase.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ The `ezmsg.baseproc` module contains the base classes for message processors. Th

### Generic TypeVars

| Idx | Class | Description |
|-----|-----------------------|----------------------------------------------------------------------------|
| 1 | `MessageInType` (Mi) | for messages passed to a consumer, processor, or transformer |
| 2 | `MessageOutType` (Mo) | for messages returned by a producer, processor, or transformer |
| 3 | `SettingsType` | bound to ez.Settings |
| 4 | `StateType` (St) | bound to ProcessorState which is simply ez.State with a `hash: int` field. |
| Idx | Class | Description |
|-----|----------------------------|----------------------------------------------------------------------------|
| 1 | `MessageInType` (Mi) | for messages passed to a consumer, processor, or transformer |
| 2 | `MessageOutType` (Mo) | for messages returned by a producer, processor, or transformer |
| 3 | `SettingsType` | bound to ez.Settings |
| 4 | `StateType` (St) | bound to ProcessorState which is simply ez.State with a `hash: int` field. |
| 5 | `ClockDrivenSettingsType` | bound to `ClockDrivenSettings` (provides `fs` and `n_time`) |


### Protocols
Expand Down Expand Up @@ -47,6 +48,7 @@ Note: `__call__` and `partial_fit` both have asynchronous alternatives: `__acall
| 10 | `BaseAsyncTransformer` | 8 | 8 | `__acall__` wraps abstract `_aprocess`; `__call__` runs `__acall__`. |
| 11 | `CompositeProcessor` | 1 | 5 | Methods iterate over sequence of processors created in `_initialize_processors`. |
| 12 | `CompositeProducer` | 2 | 6 | Similar to `CompositeProcessor`, but first processor must be a producer. |
| 13 | `BaseClockDrivenProducer` | 5 | 8 | Clock-driven data generator. Implements `_produce(n_samples, time_axis)`. |

NOTES:
1. Producers do not inherit from `BaseProcessor`, so concrete implementations should subclass `BaseProducer` or `BaseStatefulProducer`.
Expand All @@ -60,25 +62,27 @@ do not inherit from `BaseStatefulProcessor` and `BaseStatefulProducer`. They acc

### Generic TypeVars for ezmsg Units

| Idx | Class | Description |
|-----|---------------------------|------------------------------------------------------------------------------------------------------------------|
| 5 | `ProducerType` | bound to `BaseProducer` (hence, also `BaseStatefulProducer`, `CompositeProducer`) |
| 6 | `ConsumerType` | bound to `BaseConsumer`, `BaseStatefulConsumer` |
| 7 | `TransformerType` | bound to `BaseTransformer`, `BaseStatefulTransformer`, `CompositeProcessor` (hence, also `BaseAsyncTransformer`) |
| 8 | `AdaptiveTransformerType` | bound to `BaseAdaptiveTransformer` |
| Idx | Class | Description |
|-----|----------------------------|------------------------------------------------------------------------------------------------------------------|
| 5 | `ProducerType` | bound to `BaseProducer` (hence, also `BaseStatefulProducer`, `CompositeProducer`) |
| 6 | `ConsumerType` | bound to `BaseConsumer`, `BaseStatefulConsumer` |
| 7 | `TransformerType` | bound to `BaseTransformer`, `BaseStatefulTransformer`, `CompositeProcessor` (hence, also `BaseAsyncTransformer`) |
| 8 | `AdaptiveTransformerType` | bound to `BaseAdaptiveTransformer` |
| 9 | `ClockDrivenProducerType` | bound to `BaseClockDrivenProducer` |


### Abstract implementations (Base Classes) for ezmsg Units using processors:

| Idx | Class | Parents | Expected TypeVars |
|-----|-------------------------------|---------|---------------------------|
| 1 | `BaseProcessorUnit` | - | - |
| 2 | `BaseProducerUnit` | - | `ProducerType` |
| 3 | `BaseConsumerUnit` | 1 | `ConsumerType` |
| 4 | `BaseTransformerUnit` | 1 | `TransformerType` |
| 5 | `BaseAdaptiveTransformerUnit` | 1 | `AdaptiveTransformerType` |
| Idx | Class | Parents | Expected TypeVars |
|-----|--------------------------------|---------|----------------------------|
| 1 | `BaseProcessorUnit` | - | - |
| 2 | `BaseProducerUnit` | - | `ProducerType` |
| 3 | `BaseConsumerUnit` | 1 | `ConsumerType` |
| 4 | `BaseTransformerUnit` | 1 | `TransformerType` |
| 5 | `BaseAdaptiveTransformerUnit` | 1 | `AdaptiveTransformerType` |
| 6 | `BaseClockDrivenProducerUnit` | 1 | `ClockDrivenProducerType` |

Note, it is strongly recommended to use `BaseConsumerUnit`, `BaseTransformerUnit`, or `BaseAdaptiveTransformerUnit` for implementing concrete subclasses rather than `BaseProcessorUnit`.
Note, it is strongly recommended to use `BaseConsumerUnit`, `BaseTransformerUnit`, `BaseAdaptiveTransformerUnit`, or `BaseClockDrivenProducerUnit` for implementing concrete subclasses rather than `BaseProcessorUnit`.


## Implementing a custom standalone processor
Expand Down Expand Up @@ -125,6 +129,7 @@ flowchart TD
* For stateful processors that need to respond to a change in the incoming data, implement `_hash_message`.
* For adaptive transformers, implement `partial_fit`.
* For chains of processors (`CompositeProcessor`/ `CompositeProducer`), need to implement `_initialize_processors`.
* For clock-driven producers (`BaseClockDrivenProducer`), implement `_reset_state(time_axis)` and `_produce(n_samples, time_axis)`. See the [clock-driven how-to guide](how-tos/processors/clockdriven.rst).
* See processors in `ezmsg.sigproc` for signal processing examples, or `ezmsg.learn` for machine learning examples.
5. Override non-abstract methods if you need special behaviour.

Expand Down
224 changes: 224 additions & 0 deletions docs/source/guides/how-tos/processors/clockdriven.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
How to implement a clock-driven producer?
#########################################

Clock-driven producers generate data synchronized to clock ticks. They are useful
for signal generators, simulators, and other components that need to produce
timed data streams.

The ``BaseClockDrivenProducer`` base class simplifies this pattern by handling
all the timing and sample counting logic internally. You only need to implement
the data generation.

When to use BaseClockDrivenProducer
===================================

Use ``BaseClockDrivenProducer`` when you need to:

- Generate synthetic signals (sine waves, noise, test patterns)
- Simulate sensor data at a specific sample rate
- Produce timed data streams driven by a ``Clock``

This base class eliminates the need for the ``Clock → Counter → Generator``
pattern by combining the counter functionality into the generator.

Basic Structure
===============

A clock-driven producer consists of three parts:

1. **Settings** - Extends ``ClockDrivenSettings`` (which provides ``fs`` and ``n_time``)
2. **State** - Extends ``ClockDrivenState`` (which provides ``counter`` and ``fractional_samples``)
3. **Producer** - Extends ``BaseClockDrivenProducer`` and implements ``_reset_state`` and ``_produce``

Example: Sine Wave Generator
============================

Here's a complete example of a sine wave generator:

.. code-block:: python

import numpy as np
from ezmsg.util.messages.axisarray import AxisArray, LinearAxis

from ezmsg.baseproc import (
BaseClockDrivenProducer,
BaseClockDrivenProducerUnit,
ClockDrivenSettings,
ClockDrivenState,
processor_state,
)


class SinGeneratorSettings(ClockDrivenSettings):
"""
Settings for the sine wave generator.

Inherits from ClockDrivenSettings which provides:
- fs: Output sampling rate in Hz
- n_time: Samples per block (optional, derived from clock if None)
"""

freq: float = 1.0
"""Frequency of the sine wave in Hz."""

amp: float = 1.0
"""Amplitude of the sine wave."""

phase: float = 0.0
"""Initial phase in radians."""


@processor_state
class SinGeneratorState(ClockDrivenState):
"""
State for the sine wave generator.

Inherits from ClockDrivenState which provides:
- counter: Current sample counter (total samples produced)
- fractional_samples: For accumulating sub-sample timing
"""

ang_freq: float = 0.0
"""Pre-computed angular frequency (2 * pi * freq)."""


class SinGenerator(
BaseClockDrivenProducer[SinGeneratorSettings, SinGeneratorState]
):
"""
Generates sine wave data synchronized to clock ticks.
"""

def _reset_state(self, time_axis: LinearAxis) -> None:
"""
Initialize state. Called once before first production.

Use this to pre-compute values that don't change between chunks.
"""
self._state.ang_freq = 2 * np.pi * self.settings.freq

def _produce(self, n_samples: int, time_axis: LinearAxis) -> AxisArray:
"""
Generate sine wave data for this chunk.

Args:
n_samples: Number of samples to generate
time_axis: LinearAxis with correct offset and gain (1/fs)

Returns:
AxisArray containing the sine wave data
"""
# Calculate time values using the internal counter
t = (np.arange(n_samples) + self._state.counter) * time_axis.gain

# Generate sine wave
data = self.settings.amp * np.sin(
self._state.ang_freq * t + self.settings.phase
)

return AxisArray(
data=data,
dims=["time"],
axes={"time": time_axis},
)


class SinGeneratorUnit(
BaseClockDrivenProducerUnit[SinGeneratorSettings, SinGenerator]
):
"""
ezmsg Unit wrapper for SinGenerator.

Receives clock ticks on INPUT_CLOCK and outputs AxisArray on OUTPUT_SIGNAL.
"""

SETTINGS = SinGeneratorSettings


Key Points
==========

**Settings inheritance**: Your settings class should extend ``ClockDrivenSettings``,
which provides:

- ``fs``: The output sampling rate in Hz
- ``n_time``: Optional fixed chunk size. If ``None``, chunk size is derived from
the clock's gain (``fs * clock.gain``)

**State inheritance**: Your state class should extend ``ClockDrivenState``,
which provides:

- ``counter``: Tracks total samples produced (use this for continuous signals)
- ``fractional_samples``: Accumulates sub-sample timing for accurate chunk sizes

**The _produce method**: This is where you generate data. You receive:

- ``n_samples``: How many samples to generate this chunk
- ``time_axis``: A ``LinearAxis`` with the correct ``offset`` and ``gain`` (1/fs)

The base class automatically:

- Computes ``n_samples`` from clock timing or settings
- Manages the sample counter (incremented after ``_produce`` returns)
- Handles fractional sample accumulation for non-integer chunk sizes
- Supports both fixed ``n_time`` and variable chunk modes

Using Standalone (Outside ezmsg)
================================

Clock-driven producers can be used standalone for testing or offline processing:

.. code-block:: python

from ezmsg.util.messages.axisarray import AxisArray

# Create the producer
producer = SinGenerator(SinGeneratorSettings(
fs=1000.0, # 1000 Hz sample rate
n_time=100, # 100 samples per chunk
freq=10.0, # 10 Hz sine wave
amp=1.0,
))

# Simulate clock ticks (LinearAxis with gain=1/dispatch_rate, offset=timestamp)
clock_tick = AxisArray.LinearAxis(gain=0.1, offset=0.0) # 10 Hz dispatch

# Generate data
result = producer(clock_tick)
print(f"Shape: {result.data.shape}") # (100,)
print(f"Sample rate: {1/result.axes['time'].gain} Hz") # 1000.0 Hz


Using with ezmsg
================

In an ezmsg pipeline, connect a ``Clock`` to your generator's ``INPUT_CLOCK``:

.. code-block:: python

import ezmsg.core as ez
from ezmsg.baseproc import Clock, ClockSettings


class SinPipeline(ez.Collection):
SETTINGS = SinGeneratorSettings

CLOCK = Clock()
GENERATOR = SinGeneratorUnit()

def configure(self) -> None:
self.CLOCK.apply_settings(ClockSettings(dispatch_rate=10.0))
self.GENERATOR.apply_settings(self.SETTINGS)

def network(self) -> ez.NetworkDefinition:
return (
(self.CLOCK.OUTPUT_SIGNAL, self.GENERATOR.INPUT_CLOCK),
)


See Also
========

- :doc:`API Reference for clockdriven module <../../../api/generated/ezmsg.baseproc.clockdriven>`
- :doc:`stateful` - For general stateful processor patterns
- :doc:`unit` - For converting processors to ezmsg Units
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ Processor HOW TOs
adaptive
composite
unit
clockdriven
checkpoint
19 changes: 19 additions & 0 deletions src/ezmsg/baseproc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
ClockState,
)

# Clock-driven producers
from .clockdriven import (
BaseClockDrivenProducer,
ClockDrivenSettings,
ClockDrivenSettingsType,
ClockDrivenState,
)

# Composite processor classes
from .composite import (
CompositeProcessor,
Expand Down Expand Up @@ -74,15 +82,18 @@
from .units import (
AdaptiveTransformerType,
BaseAdaptiveTransformerUnit,
BaseClockDrivenProducerUnit,
BaseConsumerUnit,
BaseProcessorUnit,
BaseProducerUnit,
BaseTransformerUnit,
ClockDrivenProducerType,
ConsumerType,
GenAxisArray,
ProducerType,
TransformerType,
get_base_adaptive_transformer_type,
get_base_clockdriven_producer_type,
get_base_consumer_type,
get_base_producer_type,
get_base_transformer_type,
Expand Down Expand Up @@ -116,6 +127,7 @@
"ConsumerType",
"TransformerType",
"AdaptiveTransformerType",
"ClockDrivenProducerType",
# Decorators
"processor_state",
# Base processor classes
Expand All @@ -131,6 +143,11 @@
"BaseStatefulTransformer",
"BaseAdaptiveTransformer",
"BaseAsyncTransformer",
# Clock-driven producers
"BaseClockDrivenProducer",
"ClockDrivenSettings",
"ClockDrivenSettingsType",
"ClockDrivenState",
# Composite classes
"CompositeStateful",
"CompositeProcessor",
Expand All @@ -141,12 +158,14 @@
"BaseConsumerUnit",
"BaseTransformerUnit",
"BaseAdaptiveTransformerUnit",
"BaseClockDrivenProducerUnit",
"GenAxisArray",
# Type resolution helpers
"get_base_producer_type",
"get_base_consumer_type",
"get_base_transformer_type",
"get_base_adaptive_transformer_type",
"get_base_clockdriven_producer_type",
"_get_base_processor_settings_type",
"_get_base_processor_message_in_type",
"_get_base_processor_message_out_type",
Expand Down
Loading