-
Notifications
You must be signed in to change notification settings - Fork 1
Stream2py design notes
See also creek design notes
from typing import Iterator, Callable
CursorFunc = Callable
# example of a CursorFunc: iter([1,2,3]).__next__
src = Source(**s)
reader1 = src.mk_reader()
reader2 = src.mk_reader()
assert isinstance(reader1, Iterator)
assert isinstance(reader2, Iterator)
assert isinstance(reader1, CursorFunc)
assert isinstance(reader2, CursorFunc)
assert not src.is_started # or like io: .closed
with reader1:
assert src.is_started
reader1 # it's a cursor func
next(reader1) # it's an iterator
reader1[1:4] # it's sliceable! (default is "absolute" buffer index (see creek: Buffer
reader2.enter()
next(reader2)
assert src.is_started
reader2.exit()
assert not src.is_started
This has to do with how to implement a buffer reader's range (slicing) function that takes care of "interval queries" on the data.
buff[bt:tt] -> items_for_bt_tt_interval
Currently, the source_reader
defines the key
function that defines the index (order and reference)
and this key is used by the buffer reader.
We propose to move the concern of the key to the buffer reader instead.
The proposed architecture for this is
- Buffer writes are coupled with incrementing a
number_of_items_inserted
counter - Buffer reads use, by default, absolute indices which are mapped to the buffer's index via
number_of_items_inserted
- Any other indexing that is needed by (buffer) readers is achieved by converting the desired external index to the internal buffer's (absolute) index
This architecture was implemented in the IndexedBuffer
class of creek.infinite_sequence
.
A BufferReader
object would have a key_trans
attribute that specifies how to transform the readers key to
the actual absolute base key, which is then translated to the physical buffer's key to actual get the data.
By default key_trans
would be the identity, but any function could be specified
(though it should be monotonous to get non-degenerate behavior).
This key_trans
could be formulaic (such as a linear/affine transformation),
but may need to be more complex in some other situations.
For example, consider the case where the data itself contains the index. No formula can solve our index mapping in this case. It needs to be explicit.
Since data in the buffer is ordered, one fall back method for explicit index mapping, would be to use
sorted lists bisect search
to find items matching a bt:tt
query. This achieves O(log(n))
performance, which may be sufficient in many cases.
One can achieve O(1)
query time performance by maintaining an explicit mapping.
This mapping would be coupled with the key_trans
function and used
(possibly shared, through the sharing of the key_trans
function)
by any readers that need to work in that index.
Same query key to a buffer that changed.
Problem A: Given a stream
of (dbt, d)
items ordered by dbt
, s = Sliceable(stream)
is such that s[qbt:qtt]
gives us the items that intersect with (qbt, qtt)
interval. Intersection is a configurable concept.
Problem B: Like problem A, but we have ((dbt, dtt), d)
items. Intersection is a bit more clear here, but still configurable.
Problem A solves itself by computing ((dbt, dtt), d)
from (dbt, d)
, and solving Problem B.
Often the items of a stream relate to an interval of time. Even if we have a single timestamp, the data that is timestamp implicitly or explicitly relates to an interval around that timestamp. There is no really instantaneous phenomenon.
In the face of that, when we say "give me the data that happened between bt
and tt
),
what do we mean?
Often we'll take take the timestamp at face value (the instantaneous assumptions) and pick items up if they're
timestamps ts
are such that bt <= ts < tt
for instance.
In many situations this may be good enough.
In some, it may not.
In all cases, you are making an assumption (whether you know it or not) about the underlying interval that
ts
represents.
Maybe ts
represents the beginning of some event. In that case, if the duration
of the event is large enough,
it may make more sense to define containment as (bt <= ts) & (ts - duration < tt)
.
Perhaps ts
represents the end of some even (you'll note that more often than not, it does since events
are often timestamped when the data is read, therefore once the event has effectively ended),
in which case you should use (bt <= ts - duration) & (ts < tt)
instead.
In other cases, the stream items will be explicitly timestamped with (bt, tt)
intervals.
In other words, if we want to manage interval queries and multi-stream time alignment correctly, we had better have a setup that accommodates for interval-timestamped stream items.
source -> timestamped_segments -> intersect_with_slice_query -> aggregate -> output
timestamped_segments
are streams of (bt, tt, segment)
(or (bt, segment)
, or just segment
, with a way get to the normalized form from there). We assume that the stream items are index-sorted already.

One detail to settle on is what index will be used.
There's multiple indices (one per stream) but only one query index.
Therefore we need the key_trans
of each of the stream reader to have get us from
the query index to the index of the stream's buffer.
In many situations all we require is to accommodate a sorted stream of interval queries. For example, we're "slicing time" into intervals of 2s and asking for the data for each one of these intervals.
For this, MergedStreams
will be of use.
It creates an iterable of (stream_id, stream_item)
pairs from a stream Mapping,
that is, {stream_id: stream, ...}
.
>>> streams_map = {
... 'hello': [(2, 'two'), (3, 'three'), (5, 'five')],
... 'world': [(0, 'zero'), (1, 'one'), (3, 'three'), (6, 'six')]
... }
>>> streams_items = MergedStreams(streams_map)
>>> it = iter(streams_items)
>>> list(it) # doctest: +NORMALIZE_WHITESPACE
[('world', (0, 'zero')),
('world', (1, 'one')),
('hello', (2, 'two')),
('hello', (3, 'three')),
('world', (3, 'three')),
('hello', (5, 'five')),
('world', (6, 'six'))]
source_reader = SourceReader(**s)
stream_buffer = StreamBuffer(source_reader, **b)
buffer_reader = BufferReader(stream_buffer, **r)
buffer_reader = SourceReader(**s).stream_buffer(**b).buffer_reader(**r)
buffer_reader = Aggreg(
source_reader=SourceReader(**s),
stream_buffer=StreamBuffer(**b),
buffer_reader=BufferReader(**r)
)
buffer_reader = Aggreg(**s, **b, **r)
As a general rule, I'm for breaking things into small objects like the three objects we're talking about here, then combining them to get interfaces that might be more appropriate for some contexts.
This means that the two aggregators above don't preclude the other two proposals. In fact, I would argue that they should be built using the fluent interface or straight code methods.
The nested aggregator nicely separates the three roles and their parameters. The same parameter name (with different values) can be used in more than one role. On the other hand, the flat aggregator allows one to share and align parameters, and only a few of the params are actually required, allows for a sparse simple interface for most cases.
Lets take this as the point of departure:
stream_reader = SourceReader(**s).stream_buffer(**b).mk_reader(**r)
We want to make a StreamReader
that gives us the chain automatically with default (smart or not) b
and r
params,
but with a means to specify these if and when needed.
We could do the flat version:
stream_reader = StreamReader(**s, **b, **r)
Or more like this this:
dflt_stream_reader = StreamReader(**s) # .stream_buffer(**b).mk_reader(**r) done automatically with defaults
stream_reader_with_custom_reader = StreamReader(**s).mk_reader(**r) # .stream_buffer(**b) done automatically with defaults
stream_reader_with_custom_buffer = StreamReader(**s).stream_buffer(**b) # .mk_reader(**r) done automatically with defaults
stream_reader_with_all_customs = StreamReader(**s).stream_buffer(**b).mk_reader(**r)
A problem to keep an eye on with the above is that we want multiple readers to share the same source and buffer.
source = PyAudioSource(**s).buffer_cfg(**b)
source.start()
# source can be used as reader
# source can create another reader
reader = source.fork()
source.stop()