Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Streaming execution support roadmap #4285

Open
metesynnada opened this issue Nov 18, 2022 · 12 comments
Open

[Proposal] Streaming execution support roadmap #4285

metesynnada opened this issue Nov 18, 2022 · 12 comments
Labels
enhancement New feature or request

Comments

@metesynnada
Copy link
Contributor

[Proposal] Streaming execution support roadmap

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Adding streaming support to Datafusion and executing queries continuously on unbounded datasets is a frequent topic of discussion. Streaming is also an item on the roadmap, discussed in Ballista #30, and a part of the general desiderata.

In the recent past, there have been some attempts and PoC implementations to explore how this could be done. Some examples are:

We would like to use this issue to coordinate a fresh re-think and a disciplined push toward achieving the streaming support goals and making progress on the roadmap.

Describe the solution you'd like
We have a proposal-stage roadmap that details how streaming support can be achieved as a sequence/collection of multiple tasks. You can find our proposal here

Within this proposal, you can find design discussions, code snippets, and individual task/issue descriptions paving the way for full support.

We have been experimenting with many different candidate approaches and worked on a few PoC implementations as we went through the design process. Still, this is a huge topic and we are sure there are certain subtleties, perspectives, and challenges we might have missed.

Looking forward to hearing the community’s thoughts on this proposal. Thanks!

Describe alternatives you've considered
We studied @hntd187 's valuable contributions on #1544 for Kafka provider.

Additional context
If this proposal is found to be a sensible path forward, we are happy to turn it into an epic GitHub issue and start tracking the progress through that. We are also happy to take on the implementation work of a significant number of the steps/tasks in this proposal.

@metesynnada metesynnada added the enhancement New feature or request label Nov 18, 2022
@ozankabak
Copy link
Contributor

ozankabak commented Nov 18, 2022

Thanks @metesynnada, I am looking forward to hearing how it is received.

@alamb and @andygrove: I am especially keen on hearing your thoughts and feedback. We are willing to put serious work on making progress on this front, and getting the design right is of utmost importance before starting the implementation work. Please also tag others who may help enrich the discussion and improve the proposal/design.

@alamb
Copy link
Contributor

alamb commented Nov 18, 2022

Thanks @ozankabak and @metesynnada -- I will review this proposal carefully over the next few days

@alamb
Copy link
Contributor

alamb commented Nov 19, 2022

Thank you for https://synnada.notion.site/synnada/EPIC-Long-running-stateful-execution-support-for-unbounded-data-with-mini-batches-a416b29ae9a5438492663723dbeca805. I read it carefully this morning. It was well written -- thank you.

In summary, I think you are on the right track. DataFusion already supports a large chunk of what is needed to make a streaming query processing system. The open question in my mind is "what else belongs in DataFusion (the repository) and what belongs in streaming system built on top of DataFusion).

I think it would be possible to implement Streaming operators (e.g. StreamJoin) outside the DataFusion codebase (more details below), perhaps adding some supporting APIs in DataFusion (e.g. ExecutionPlan::flush()). Implementing the streaming system outside the core of DataFusion would be my preference because:

  1. It would help refine and enforce API boundaries in DataFusion between what is specific to streaming and what is more generally useful
  2. DataFusion is already quite large so adding more features may limit out scaling (people and process)

More thoughts

I already think of DataFusion's Execution plans as "streaming" in some sense.

The signature of the execute() method returns a SendableRecordBatchStream which is a rust async stream. Ths means that the operators can, and do produce record batches of approximately batch_size and then yield control (await). In this way existing table sources signature could produce an unbounded stream.

As you have noted, some ExecutionPlan nodes such as FilterExec, and ProjectionExec already "stream" in the sense that they get one batch in, process it, and produce one batch out. This is also called pipelined execution in some literature. There are a few (important) operators, GroupByHash, Sort, and HashJoin that are so-called "pipeline breakers" that must read their entire input before they will produce any output rows.

It is possible (we do it in InfluxDB IOx) to implement and use operators outside the DataFusion repository by impl ExecutionPlan.

I think it is worth considering building a streaming system outside of DataFusion (like Ballista or IOx or CeresDB). These systems add specialized processing, and the core of DataFusion is applicable across most/all of the usecases.

Another point that has to be considered is optimization rules. The optimization rules can be different for bounded and unbounded data sources.

This is a great point. It is already possible to add / modify the optimization passes in both the Logical and Physical plans. For example, perhaps you would write a pass for a LogicalPlan that transformed normal LogicalPlan::Join to LogicalPlan::Extension(SpecializedStreamingJoin) and did whatever other transformations were required.

All in all, it is a great writeup -- thanks again

@ozankabak
Copy link
Contributor

Thank you for sharing your thoughts! We will digest your points and especially think about how to decide what could/should belong to Datafusion and what not. Hopefully we (as the community) will converge on the next steps soon and get the real work started 🚀

@ozankabak
Copy link
Contributor

Hello everybody! During the last few days, we have been re-analyzing the proposal from the lens of "what belongs where" and what to implement first. We also did some prototyping and experimentation for this purpose. It seems there are three main categories of improvements here:

  1. Core functionality that seem best fit to Datafusion proper, which are things like:
    a. Pipelined execution supporting variants of certain fundamental operators (e.g. windowing, repartitioning): These result in drastic memory requirement improvements (and possibly runtime reductions) across a very wide range of use cases, so adding them to Datafusion proper sounds very reasonable. In some extreme cases, such operators may even retire their old, pipeline-breaking precursors if the new variant dominates across all use cases.
    b. Small infrastructural modifications to support special-purpose externally-implemented operators: These are things like column annotations for signaling monotonicity and basic mechanisms for differentiating between bounded and unbounded streams for planning/optimization purposes.
  2. Core functionality that seem best fit to Ballista, which we can again break down into two analogous sub-categories: (a) New distribution-related operators that improve performance and/or user experience over a wide range of use cases, and (b) small infrastructural modifications to support externally-implemented distribution-related operators.
  3. Functionality that is not general enough and best left to new/special-purpose projects: My impression is that as a community, we are not sure how much of "streaming" belongs to this category, and I am sure we will keep discussing and discovering over time and reach clarity in the future.

In this light, we decided to start with things that seem to obviously fall under (1.a) and (1.b). Particularly, we chose windowing as our first area to work on, given that we (as the Synnada team) have already done significant work in this area and know the details of it well. @mustafasrepo has already linked our in-progress work to this PR. We aim to complete it, add accompanying benchmarks/analysis, go through our internal review process, and turn it into a full-fledged PR to this repository some time next week.

As we make progress towards implementing the functionality outlined in this proposal; the community may decide some of the functionality in question falls under category (3). When/if that case arises, we (as the Synnada team) would be happy to organize them under a new project and help get it off the ground with all the interested stakeholders' participation.

Thanks again for all the interest and constructive comments. I am sincerely excited about what lies ahead and all the cool tech we are building (and will build) together!

@alamb
Copy link
Contributor

alamb commented Nov 27, 2022

Thank you very much for the clear, cohesive and constructive writeup @ozankabak -- I think the plan sounds well thought out and aligns well to the overall DataFusion vision and structure. Thank you

@alamb
Copy link
Contributor

alamb commented Sep 5, 2023

I wonder if this ticket is tracking anything useful anymore or if we should close it now?

@alamb
Copy link
Contributor

alamb commented Sep 5, 2023

Or perhaps we should add some of the text / context of this ticket into the users guide 🤔

@ozankabak
Copy link
Contributor

It still tracks a few things we are working on, but it is close to completion. The user guide definitely needs some updating, we will be happy to help with that once we are done with the critical features.

@alamb
Copy link
Contributor

alamb commented Nov 11, 2023

I believe a major related proposal is on #7994

@mwaaas
Copy link

mwaaas commented Aug 7, 2024

Whats the progress on this

@alamb
Copy link
Contributor

alamb commented Aug 7, 2024

I think #11404 might be a good plan to discuss -- there are many streaming features already in datafusion but I am not sure how well documented it is / if we have any good examples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants