Skip to content

Stream2py design notes

Thor Whalen edited this page Dec 10, 2021 · 12 revisions

See also creek design notes

Misc

Multireader context management

from typing import Iterator, Callable

CursorFunc = Callable
# example of a CursorFunc: iter([1,2,3]).__next__

src = Source(**s)
reader1 = src.mk_reader()
reader2 = src.mk_reader()

assert isinstance(reader1, Iterator)
assert isinstance(reader2, Iterator)
assert isinstance(reader1, CursorFunc)
assert isinstance(reader2, CursorFunc)

assert not src.is_started  # or like io: .closed

with reader1:
    assert src.is_started
    reader1  # it's a cursor func
    next(reader1)  # it's an iterator
    reader1[1:4]  # it's sliceable!  (default is "absolute" buffer index (see creek: Buffer
    reader2.enter()
    next(reader2)

assert src.is_started
reader2.exit()
assert not src.is_started

Indexing infinite sequences

This has to do with how to implement a buffer reader's range (slicing) function that takes care of "interval queries" on the data.

buff[bt:tt] -> items_for_bt_tt_interval

Currently, the source_reader defines the key function that defines the index (order and reference) and this key is used by the buffer reader. We propose to move the concern of the key to the buffer reader instead.

The proposed architecture for this is

  • Buffer writes are coupled with incrementing a number_of_items_inserted counter
  • Buffer reads use, by default, absolute indices which are mapped to the buffer's index via number_of_items_inserted
  • Any other indexing that is needed by (buffer) readers is achieved by converting the desired external index to the internal buffer's (absolute) index

This architecture was implemented in the IndexedBuffer class of creek.infinite_sequence.

A BufferReader object would have a key_trans attribute that specifies how to transform the readers key to the actual absolute base key, which is then translated to the physical buffer's key to actual get the data.

By default key_trans would be the identity, but any function could be specified (though it should be monotonous to get non-degenerate behavior).

This key_trans could be formulaic (such as a linear/affine transformation), but may need to be more complex in some other situations.

For example, consider the case where the data itself contains the index. No formula can solve our index mapping in this case. It needs to be explicit.

Since data in the buffer is ordered, one fall back method for explicit index mapping, would be to use sorted lists bisect search to find items matching a bt:tt query. This achieves O(log(n)) performance, which may be sufficient in many cases.

One can achieve O(1) query time performance by maintaining an explicit mapping. This mapping would be coupled with the key_trans function and used (possibly shared, through the sharing of the key_trans function) by any readers that need to work in that index.

To test

Test that reads are consistent when the buffer advances due to new writes

Same query key to a buffer that changed.

indexes transformed by monotonous function

ledger-based index mapping (maintained and applied)

Multi-stream slicing

The problem in a nutshell: Slicing intervals

Problem A: Given a stream of (dbt, d) items ordered by dbt, s = Sliceable(stream) is such that s[qbt:qtt] gives us the items that intersect with (qbt, qtt) interval. Intersection is a configurable concept.

Problem B: Like problem A, but we have ((dbt, dtt), d) items. Intersection is a bit more clear here, but still configurable.

Problem A solves itself by computing ((dbt, dtt), d) from (dbt, d), and solving Problem B.

More details

Often the items of a stream relate to an interval of time. Even if we have a single timestamp, the data that is timestamp implicitly or explicitly relates to an interval around that timestamp. There is no really instantaneous phenomenon.

In the face of that, when we say "give me the data that happened between bt and tt), what do we mean? Often we'll take take the timestamp at face value (the instantaneous assumptions) and pick items up if they're timestamps ts are such that bt <= ts < tt for instance. In many situations this may be good enough. In some, it may not. In all cases, you are making an assumption (whether you know it or not) about the underlying interval that ts represents.

Maybe ts represents the beginning of some event. In that case, if the duration of the event is large enough, it may make more sense to define containment as (bt <= ts) & (ts - duration < tt). Perhaps ts represents the end of some even (you'll note that more often than not, it does since events are often timestamped when the data is read, therefore once the event has effectively ended), in which case you should use (bt <= ts - duration) & (ts < tt) instead.

In other cases, the stream items will be explicitly timestamped with (bt, tt) intervals.

In other words, if we want to manage interval queries and multi-stream time alignment correctly, we had better have a setup that accommodates for interval-timestamped stream items.

Solution design idea

source -> timestamped_segments -> intersect_with_slice_query -> aggregate -> output

timestamped_segments are streams of (bt, tt, segment) (or (bt, segment), or just segment, with a way get to the normalized form from there). We assume that the stream items are index-sorted already.

interval_query_--_ordered,_non-overlapping

Other problems

A common index

One detail to settle on is what index will be used. There's multiple indices (one per stream) but only one query index. Therefore we need the key_trans of each of the stream reader to have get us from the query index to the index of the stream's buffer.

Flattening

In many situations all we require is to accommodate a sorted stream of interval queries. For example, we're "slicing time" into intervals of 2s and asking for the data for each one of these intervals.

For this, MergedStreams will be of use. It creates an iterable of (stream_id, stream_item) pairs from a stream Mapping, that is, {stream_id: stream, ...}.

    >>> streams_map = {
    ...     'hello': [(2, 'two'), (3, 'three'), (5, 'five')],
    ...     'world': [(0, 'zero'), (1, 'one'), (3, 'three'), (6, 'six')]
    ... }
    >>> streams_items = MergedStreams(streams_map)
    >>> it = iter(streams_items)
    >>> list(it)  # doctest: +NORMALIZE_WHITESPACE
    [('world', (0, 'zero')),
     ('world', (1, 'one')),
     ('hello', (2, 'two')),
     ('hello', (3, 'three')),
     ('world', (3, 'three')),
     ('hello', (5, 'five')),
     ('world', (6, 'six'))]

Interface options in making a stream reader

A few proposals

straight code

source_reader = SourceReader(**s)
stream_buffer = StreamBuffer(source_reader, **b)
buffer_reader = BufferReader(stream_buffer, **r)

fluent interface

buffer_reader = SourceReader(**s).stream_buffer(**b).buffer_reader(**r)

nested aggregator

buffer_reader = Aggreg(
    source_reader=SourceReader(**s),
    stream_buffer=StreamBuffer(**b),
    buffer_reader=BufferReader(**r)
)

flat aggregator

buffer_reader = Aggreg(**s, **b, **r)

Discussion

As a general rule, I'm for breaking things into small objects like the three objects we're talking about here, then combining them to get interfaces that might be more appropriate for some contexts.

This means that the two aggregators above don't preclude the other two proposals. In fact, I would argue that they should be built using the fluent interface or straight code methods.

The nested aggregator nicely separates the three roles and their parameters. The same parameter name (with different values) can be used in more than one role. On the other hand, the flat aggregator allows one to share and align parameters, and only a few of the params are actually required, allows for a sparse simple interface for most cases.

More proposals

Lets take this as the point of departure:

stream_reader = SourceReader(**s).stream_buffer(**b).mk_reader(**r)

We want to make a StreamReader that gives us the chain automatically with default (smart or not) b and r params, but with a means to specify these if and when needed.

We could do the flat version:

stream_reader = StreamReader(**s, **b, **r)

Or more like this this:

dflt_stream_reader = StreamReader(**s)  # .stream_buffer(**b).mk_reader(**r) done automatically with defaults
stream_reader_with_custom_reader = StreamReader(**s).mk_reader(**r)  # .stream_buffer(**b) done automatically with defaults
stream_reader_with_custom_buffer = StreamReader(**s).stream_buffer(**b)  # .mk_reader(**r) done automatically with defaults
stream_reader_with_all_customs = StreamReader(**s).stream_buffer(**b).mk_reader(**r)

A problem to keep an eye on with the above is that we want multiple readers to share the same source and buffer.

Misc thoughts

source = PyAudioSource(**s).buffer_cfg(**b)
source.start()
# source can be used as reader
# source can create another reader
reader = source.fork()
source.stop()