Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-windowed updating aggregates using datafusion. #588

Merged
merged 1 commit into from
Apr 22, 2024

Conversation

jacksonrnewhouse
Copy link
Contributor

@jacksonrnewhouse jacksonrnewhouse commented Apr 12, 2024

The main functionality this provides is the ability to run aggregates without windows, emitting update and retract messages that can be written to a debezium sink.

The logic for calculating the aggregate is done in UpdatingAggregatingFunc. This operator has three different versions of the aggregate exec, with three different modes:
Partial: Takes input and emits partial aggregate representations.
CombinePartial: Merge multiple partials into a single partial. This mode was added in https://github.com/ArroyoSystems/arrow-datafusion/pull/1/files.
Final: The final aggregation that finishes any aggregates, expecting partials as input.

These are combined with the new LastKeyValueView. This is a simple key-value map that uses the _timestamp field as expiration time. For any group by tuple there'll be at most one live entry in the map. Writes to state include a _generation field in parquet, which is used to ensure we restore the newest value.

In the operator data is fed into the partial exec until it is time to flush, which happens under the following conditions:

  • A 1 second tick has passed.
  • A checkpoint is received.
  • The watermark has advanced such that there is data in the backing tables that is ready to be expired (this is necessary because there may be fresh data that should keep that key alive).

Flushing follows the following steps:

  1. Close the active sender for computing partials. If there isn't one, there's no work to be done, so just exit.
  2. Compute the new partial data that has been received since the last flush.
  3. Look for data in the store of partial data that have the same key-set as the new partials. If there aren't any, skip to step 5.
  4. Feed the data in 2 and 3 to the combine exec, then spool out its output, writing them to state and storing them as the input to the final step.
  5. For the final result, first check if the final table ("f") has any matches. These will become retracts. Then, write the new data to that final table. The retracts will be emitted before the appends.

In order to make progress between flushes, the partial exec is advanced. We panic in handle_future_result() because the input will never have been closed on that exec.

Some other things that were changed in this PR:

  • Reworked how we create the sort projections into Joins. It as running into some corner cases that were raised when updating the smoketest errors.
  • Removed the `"single_distinct_aggregation_to_group_by", which causes COUNT(*) to become a nested aggregate.

@jacksonrnewhouse jacksonrnewhouse changed the base branch from df_debezium to master April 22, 2024 17:32
@jacksonrnewhouse jacksonrnewhouse enabled auto-merge (squash) April 22, 2024 17:32
@jacksonrnewhouse jacksonrnewhouse merged commit 74d0097 into master Apr 22, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants