-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QUESTION] How to create accumulated batch for processing? #17
Comments
Hey Ilia, Since you ask about the Pathway-like approach, the "mental model" of Pathway computations is that of reactive computation. The best known tool with a similar model is a spreadsheet (Excel) so I will use this analogy to answer. In Excel, you would probably paste in the rows of your table into the spreadsheet, one under another. Excel would recompute all the cells that depend on them, as soon as it can. Pathway does the same in streaming mode, as soon as you connect your inputs to a connector. From what I see, your question covers two separate elements:
For point 1, there are two separate topics, what we would call in Excel AGGREGATES (like sum over rows) and VOLATILE functions (like API calls to a financial/currency API that changes its answers each time you call). I'm guessing your question is more about Volatiles but let's cover both. Starting with AGGREGATES, Pathway recomputes aggregates whenever fresh data arrives. Like in SQL, the mechanism is based on "groupby", followed by "reduce". So, what you would write in Excel as summing column A, Now, moving on to VOLATILE functions. How to force the recompute of a VOLATILE function when a trigger event happens (e.g., every 1M new rows)? This one is a bit magical to handle - as it is in Excel! You will need to quantify the precise trigger that you want to activate the recompute, as a sort of global variable. You will then pass this variable as a parameter to your transformers to make sure results are recomputed each time this variable changes. For example, consider a table
And perform on it:
This creates a single-row table
Next, divide this value by 1M rounding down:
to get:
Finally, join this in with
And here is what
The value of Then, for point 2 above: if you really need it (there are not so many use cases where you actually do need it, it's an artificial slow down), there is a way to achieve this with a queue using tumbling window mechanisms - if you do need this, I'll ask someone from the team to contribute the cleanest answer. In that case, please specify whether you need the computation to be done every 1M rows, precisely and always, or if you tolerate skipping over a batch of 1M and bundling it together with the next one if your system is not catching up (this may happen e.g. if you have 1M rows arriving every second). Happy to clarify further if needed! |
Hey @dxtrous, thank you very much for such a detailed answer! This is incredibly helpful, I have spent more than half of the week reading entire Pathway documentation and doing POC implementations to see what works best. One thing I do not fully understand is how do we force AsyncTransformer to recompute things once 2M rows are available (and we have already processed the first 1M rows downstream)? I also need to do this for 2M, 3M, 4M rows, and so on. Do I define 10 different AsyncTransformers in this case? Is it possible to do it all with one AsyncTransformer? So there is essentially a need to accumulate the data for all years and then run the computation... I also do not see Or is it possible to take the entire data available in To add more context here, I actually need to compute things per year, which is not too different from the count of rows (I will just rewrite the logic to indicate that as soon as a new year appears in the rows - run calc). So this particular operation I am trying to achieve is more of a batch pattern, but then I will need streaming per row everywhere else in the system, so I am looking for a way to combine this. Perhaps the second point is more applicable here? I do not think I will actually be getting 1M rows a second, or if I do, it's ok to wait for the system to catch up in this case I think. Thank you very much in advance |
Hey @ilyanoskov, thanks for the additional context! Please disregard the discussion of "volatile" functions and triggers from my previous reply in that case. Please note that So, to pass multiple (say 1M) rows at one go into AsyncTransformer, you will first need to reduce these rows to one. If you have date or some other form of timestamp as a column in your data and want to group data by year, that's brilliant, it is likely to simplify the logic a lot. With this in mind, my understanding of what you need is the following (listing keywords) - hopefully it will be 1 longish line of code :-):
Would this make sense? By the way, if you are looking for an introduction to Tumbling Windows as a concept, you can also take a look at this tutorial https://pathway.com/developers/showcases/suspicious_activity_tumbling_window/ to see if it resonates. |
@dxtrous thank you very much ! I will try just that and report back :) |
@ilyanoskov Yes do keep us posted! |
I was able to do this with the @dxtrous do you by any chance know, how to prevent the sliding window from sliding beyond available data? I did the duration for 10 years, and for 2017 the window is looking into 2027 (and creating more rows). How do I stop at 2024? |
This is a great question which applies to finite (bounded) data streams. @ilyanoskov an (ugly) workaround is to filter out results larger than some given year (or max - 9 years) in post-processing of the windowing results. |
I'd say the workaround is not that ugly. If the windows that are filtered out are not a significant majority of all the windows, the performance overhead should be small. Syntax-wise, I think an additional
It makes things more complicated as entries need to be aware of the maximal entry in the stream. If they're not aware of it, we can trigger window production later (or never) but its state will be initialized anyway so performance-wise it'll be similar to filtering out unwanted windows. |
Hello,
I have a use-case where I need to process 10 million rows. First, I want to process 1M rows when they arrive, then I want to process 2M rows (1M previous + 1M new), then 3M rows, and so on in that order. How can I do it with Pathway?
I was able to do something like this with AsyncTransformer and accumulating the rows inside of it, but I find this solution clunky and perhaps there is more Pathway-like approach I could take here?
Thank you very much in advance for your help
The text was updated successfully, but these errors were encountered: