How easy it could be to re-use a pipeline for online and offline processing:
# the main processor to take raw data and turn it into features
processor = Pipeline([Windower(), Filter(), FeatureExtractor()])
# stream an entire recording into the processor in segments and concatenate
offline_pipeline = Streamer(processor, segment_length=10)
features = offline_pipeline.process(recording)
# realtime pipeline uses the same processing but tacks on an estimator
realtime_pipeline = Pipeline([processor, Estimator()])
realtime_pipeline.named_blocks('Estimator').fit(features)
def device_callback(raw_data):
prediction = realtime_pipeline.process(raw_data)
One issue: gesture classification fits well into this because you have one label per recording, but say cursor position regression doesn't really because you'll have a label per segment. In the former case, the labeling can be independent of the streamer, but they need to be integrated in the latter case.
Another issue: once you have this, you'll naturally want to zoom out one more time and have a streamer that handles iterating over multiple files. This could be handled with nested streamers, since they will themselves be a pipeline.
The solution to both of these is probably to make the streamer a non-functional block that you have to implement yourself through an iterator. For example:
def custom_iter(array_source, label_source):
for array, labels in zip(array_source, label_source):
yield from seg, label