Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: add interval to window/rolling #246

Open
martindurant opened this issue May 2, 2019 · 6 comments
Open

ENH: add interval to window/rolling #246

martindurant opened this issue May 2, 2019 · 6 comments

Comments

@martindurant
Copy link
Member

Currently we keep a certain buffer length, and emit on every new item. Might be useful to ony emit every Nth item, like Spark-streaming does https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#window-operations

This can be achieved already with normal window-aggregate followed by the Slice operator in #241 , but would save on unnecessary computations.

@ea42gh
Copy link

ea42gh commented Sep 25, 2019

one pattern I need frequently is
to skip a certain buffer length initially,
followed by repeating
take n, skip m
e.g., given a stream of integers 1,2,3...
skip 2, take 3, skip1 would get
(3,4,5)
(7,8,9)
(11,12,13)
...

@martindurant
Copy link
Member Author

This is a bit like the "range" functionality

@ea42gh
Copy link

ea42gh commented Sep 26, 2019

This is the kludge I had come up with. There may be better:

def kill_grab_skip( source, kill, grab, skip, element=0 ):
    '''remove an initial number of elements, then repeat a fixed grab,skip pattern
    
    Args:
        source  : stream node to append to
        kill    : number of initial elements to ignore
        grab    : number of elements to return
        skip    : number of subsequent elements to ignore
    
    Returns:
        the streamz node chain created
    '''
    def skip_first( x, skip):
        return x[skip:]

    if skip < kill:
        # since skip is greater than kill: transform to partition of length (skip, kill)
        p      = source.slice(kill-skip, None, None ).partition(skip+grab)
        if skip > 0:
            p = p.map(lambda x: skip_first(x,skip))
    elif skip == kill:
        p      = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
    else:
        # kill is less than skip: emit startup elements to get to the kill == skip case
        p      = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
        for i in range(skip-kill):
            source.emit(element)
    return p

print('3,8,0 pattern')
source = streamz.Stream();p= kill_grab_skip(source,3,8,0); p.sink(print)
for i in range(1,30): source.emit(i)

@martindurant
Copy link
Member Author

Are you sure slice does not do the same thing as this?

In [2]: s = streamz.Stream()
In [3]: s2 = s.slice(3, None, 8)
In [4]: s2.sink(print)
Out[4]: <sink: print>
In [6]: for i in range(1, 30):
   ...:     s.emit(i)
   ...:
9
17
25

https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L771

@ea42gh
Copy link

ea42gh commented Sep 27, 2019

The call to slice picks every 8th element.
I need to pick a contiguous set of more than 1 element, e.g., [8,9,10], [16,17,18], ...

So yes, slice comes close, and I did use it in the function I posted above,
followed by partition() to make sure I obtain the wanted data set as soon as it is available.
I really dislike the emit() calls in the function though.

Not sure github issues is a good place to discuss this, though?

@martindurant
Copy link
Member Author

OK, understood. In that case, feel free to add as PR with relevant tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants