-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Prism] Support AfterProcessingTime triggers - part 2 #36333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3b1db11
b3cd953
d623c6f
c6c0ee5
41f6fdf
2179b24
f869d53
d6ccb90
28b2caf
1059e0f
4179266
0d6f0a9
a3b986e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1215,7 +1215,9 @@ type stageKind interface { | |
| // buildEventTimeBundle handles building bundles for the stage per it's kind. | ||
| buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], | ||
| holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool, pendingAdjustment int) | ||
|
|
||
| // buildProcessingTimeBundle handles building processing-time bundles for the stage per it's kind. | ||
| buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], | ||
| holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool) | ||
| // getPaneOrDefault based on the stage state, element metadata, and bundle id. | ||
| getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo | ||
| } | ||
|
|
@@ -1327,17 +1329,54 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t | |
| ready := ss.strat.IsTriggerReady(triggerInput{ | ||
| newElementCount: 1, | ||
| endOfWindowReached: endOfWindowReached, | ||
| emNow: em.ProcessingTimeNow(), | ||
| }, &state) | ||
|
|
||
| if ready { | ||
| state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached) | ||
| } else { | ||
| if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil { | ||
| for _, t := range pts { | ||
| ts := (&state).getTriggerState(t) | ||
| if ts.extra == nil || t.shouldFire((&state)) { | ||
| // Skipping inserting a processing time timer if the firing time | ||
| // is not set or it already should fire. | ||
| // When the after processing time triggers should fire, there are | ||
| // two scenarios: | ||
| // (1) the entire trigger of this window is ready to fire. In this | ||
| // case, `ready` should be true and we won't reach here. | ||
| // (2) we are still waiting for other triggers (subtriggers) to | ||
| // fire (e.g. AfterAll). | ||
| continue | ||
| } | ||
| firingTime := ts.extra.(afterProcessingTimeState).firingTime | ||
| notYetHolds := map[mtime.Time]int{} | ||
| timer := element{ | ||
| window: window, | ||
| timestamp: firingTime, | ||
| holdTimestamp: window.MaxTimestamp(), | ||
| pane: typex.NoFiringPane(), | ||
| transform: ss.ID, // Use stage id to fake transform id | ||
| family: "AfterProcessingTime", | ||
| tag: "", | ||
| sequence: 1, | ||
| elmBytes: nil, | ||
| keyBytes: []byte(key), | ||
| } | ||
| // TODO: how to deal with watermark holds for this implicit processing time timer | ||
| // ss.watermarkHolds.Add(timer.holdTimestamp, 1) | ||
| ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds) | ||
| em.processTimeEvents.Schedule(firingTime, ss.ID) | ||
| em.wakeUpAt(firingTime) | ||
| } | ||
| } | ||
| } | ||
| // Store the state as triggers may have changed it. | ||
| ss.state[LinkID{}][window][key] = state | ||
|
|
||
| // If we're ready, it's time to fire! | ||
| if ready { | ||
| count += ss.buildTriggeredBundle(em, key, window) | ||
| count += ss.startTriggeredBundle(em, key, window) | ||
| } | ||
| return count | ||
| } | ||
|
|
@@ -1524,16 +1563,11 @@ func (ss *stageState) savePanes(bundID string, panesInBundle []bundlePane) { | |
| } | ||
| } | ||
|
|
||
| // buildTriggeredBundle must be called with the stage.mu lock held. | ||
| // When in discarding mode, returns 0. | ||
| // When in accumulating mode, returns the number of fired elements to maintain a correct pending count. | ||
| func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win typex.Window) int { | ||
| func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win typex.Window) ([]element, int) { | ||
| var toProcess []element | ||
| dnt := ss.pendingByKeys[key] | ||
| var notYet []element | ||
|
|
||
| rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input} | ||
|
|
||
| // Look at all elements for this key, and only for this window. | ||
| for dnt.elements.Len() > 0 { | ||
| e := heap.Pop(&dnt.elements).(element) | ||
|
|
@@ -1564,6 +1598,19 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t | |
| heap.Init(&dnt.elements) | ||
| } | ||
|
|
||
| return toProcess, accumulationDiff | ||
| } | ||
|
|
||
| // startTriggeredBundle must be called with the stage.mu lock held. | ||
| // Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count. | ||
| // When in discarding mode, returns 0, as the pending work already includes these elements. | ||
| // When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired. | ||
| func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win typex.Window) int { | ||
| toProcess, accumulationDiff := ss.buildTriggeredBundle(em, key, win) | ||
| if len(toProcess) == 0 { | ||
| return accumulationDiff | ||
| } | ||
|
|
||
| if ss.inprogressKeys == nil { | ||
| ss.inprogressKeys = set[string]{} | ||
| } | ||
|
|
@@ -1575,6 +1622,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t | |
| }, | ||
| } | ||
|
|
||
| rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input} | ||
| ss.makeInProgressBundle( | ||
| func() string { return rb.BundleID }, | ||
| toProcess, | ||
|
|
@@ -1585,9 +1633,11 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t | |
| ) | ||
| slog.Debug("started a triggered bundle", "stageID", ss.ID, "bundleID", rb.BundleID, "size", len(toProcess)) | ||
|
|
||
| ss.bundlesToInject = append(ss.bundlesToInject, rb) | ||
| // TODO: Use ss.bundlesToInject rather than em.injectedBundles | ||
| // ss.bundlesToInject = append(ss.bundlesToInject, rb) | ||
| // Bundle is marked in progress here to prevent a race condition. | ||
| em.refreshCond.L.Lock() | ||
| em.injectedBundles = append(em.injectedBundles, rb) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If using this mechanism also add "em.inprogressBundles.insert(rb.BundleID)" to avoid "losing" the bundle. injectedBundles are considered "inprogress" even when they aren't yet executing. This is to avoid incorrect premature terminations.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very odd that there's a stuckness going on with
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, the line you mentioned is already there. |
||
| em.inprogressBundles.insert(rb.BundleID) | ||
| em.refreshCond.L.Unlock() | ||
| return accumulationDiff | ||
|
|
@@ -1927,6 +1977,20 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. | |
| ss.mu.Lock() | ||
| defer ss.mu.Unlock() | ||
|
|
||
| toProcess, minTs, newKeys, holdsInBundle, panesInBundle, stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow) | ||
|
|
||
| if len(toProcess) == 0 { | ||
| // If we have nothing | ||
| return "", false, stillSchedulable | ||
| } | ||
| bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle, panesInBundle) | ||
| slog.Debug("started a processing time bundle", "stageID", ss.ID, "bundleID", bundID, "size", len(toProcess), "emNow", emNow) | ||
| return bundID, true, stillSchedulable | ||
| } | ||
|
|
||
| // handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages. | ||
| func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow mtime.Time, | ||
| processTimerFn func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane)) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { | ||
| // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime | ||
| // Special Case for ProcessingTime handling. | ||
| // Eg. Always queue EventTime elements at minTime. | ||
|
|
@@ -1935,6 +1999,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. | |
| // Potentially puts too much work on the scheduling thread though. | ||
|
|
||
| var toProcess []element | ||
| var panesInBundle []bundlePane | ||
| minTs := mtime.MaxTimestamp | ||
| holdsInBundle := map[mtime.Time]int{} | ||
|
|
||
|
|
@@ -1968,10 +2033,8 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. | |
| if e.timestamp < minTs { | ||
| minTs = e.timestamp | ||
| } | ||
| holdsInBundle[e.holdTimestamp]++ | ||
|
|
||
| // We're going to process this timer! | ||
| toProcess = append(toProcess, e) | ||
| toProcess, panesInBundle = processTimerFn(e, toProcess, holdsInBundle, panesInBundle) | ||
| } | ||
|
|
||
| nextTime = ss.processingTimeTimers.Peek() | ||
|
|
@@ -1986,19 +2049,58 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. | |
| for _, v := range notYet { | ||
| ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) | ||
| em.processTimeEvents.Schedule(v.firing, ss.ID) | ||
| em.wakeUpAt(v.firing) | ||
| } | ||
|
|
||
| // Add a refresh if there are still processing time events to process. | ||
| stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0) | ||
|
|
||
| if len(toProcess) == 0 { | ||
| // If we have nothing | ||
| return "", false, stillSchedulable | ||
| } | ||
| bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle, nil) | ||
| return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, stillSchedulable | ||
| } | ||
|
|
||
| slog.Debug("started a processing time bundle", "stageID", ss.ID, "bundleID", bundID, "size", len(toProcess), "emNow", emNow) | ||
| return bundID, true, stillSchedulable | ||
| // buildProcessingTimeBundle for stateful stages prepares bundles for processing-time timers | ||
| func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { | ||
| return handleProcessingTimeTimer(ss, em, emNow, func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane) { | ||
| holdsInBundle[e.holdTimestamp]++ | ||
| // We're going to process this timer! | ||
| toProcess = append(toProcess, e) | ||
| return toProcess, nil | ||
| }) | ||
| } | ||
|
|
||
| // buildProcessingTimeBundle for aggregation stages prepares bundles for after-processing-time triggers | ||
| func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { | ||
| return handleProcessingTimeTimer(ss, em, emNow, func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane) { | ||
| // Different from `buildProcessingTimeBundle` for stateful stage, | ||
| // triggers don't hold back the watermark, so no holds are in the triggered bundle. | ||
| state := ss.state[LinkID{}][e.window][string(e.keyBytes)] | ||
| endOfWindowReached := e.window.MaxTimestamp() < ss.input | ||
| ready := ss.strat.IsTriggerReady(triggerInput{ | ||
| newElementCount: 0, | ||
| endOfWindowReached: endOfWindowReached, | ||
| emNow: emNow, | ||
| }, &state) | ||
|
|
||
| if ready { | ||
| state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached) | ||
|
|
||
| // We're going to process this trigger! | ||
| elems, _ := ss.buildTriggeredBundle(em, string(e.keyBytes), e.window) | ||
| toProcess = append(toProcess, elems...) | ||
|
|
||
| ss.state[LinkID{}][e.window][string(e.keyBytes)] = state | ||
|
|
||
| panesInBundle = append(panesInBundle, bundlePane{}) | ||
| } | ||
|
|
||
| return toProcess, panesInBundle | ||
| }) | ||
| } | ||
|
|
||
| // buildProcessingTimeBundle for stateless stages is not supposed to be called currently | ||
| func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { | ||
| slog.Error("ordinary stages can't have processing time elements") | ||
| return nil, mtime.MinTimestamp, nil, nil, nil, false | ||
| } | ||
|
|
||
| // makeInProgressBundle is common code to store a set of elements as a bundle in progress. | ||
|
|
@@ -2281,13 +2383,23 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T | |
| inputW := ss.input | ||
| _, upstreamW := ss.UpstreamWatermark() | ||
| previousInputW := ss.previousInput | ||
| if inputW == upstreamW && previousInputW == inputW { | ||
|
|
||
| _, isOrdinaryStage := ss.kind.(*ordinaryStageKind) | ||
| if isOrdinaryStage && len(ss.sides) == 0 { | ||
| // For ordinary stage with no side inputs, we use whether there are pending elements to determine | ||
| // whether a bundle is ready or not. | ||
| if len(ss.pending) == 0 { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use watermark to determine whether to create a bundle for aggregation and stateful stages, but not stateless stages without sideinputs.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady | ||
| } | ||
| } else if inputW == upstreamW && previousInputW == inputW { | ||
| // Otherwise, use the progression of watermark to determine the bundle readiness. | ||
| slog.Debug("bundleReady: unchanged upstream watermark", | ||
| slog.String("stage", ss.ID), | ||
| slog.Group("watermark", | ||
| slog.Any("upstream == input == previousInput", inputW))) | ||
| return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady | ||
| } | ||
|
|
||
| ready := true | ||
| for _, side := range ss.sides { | ||
| pID, ok := em.pcolParents[side.Global] | ||
|
|
@@ -2329,3 +2441,17 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { | |
| func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time { | ||
| return localNow + (scheduled - mtime.Now()) | ||
| } | ||
|
|
||
| // wakeUpAt schedules a wakeup signal for the bundle processing loop. | ||
| // This is used for processing time timers to ensure the loop re-evaluates | ||
| // stages when a processing time timer is expected to fire. | ||
| func (em *ElementManager) wakeUpAt(t mtime.Time) { | ||
| if em.testStreamHandler == nil && em.config.EnableRTC { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed https://github.com/apache/beam/pull/36069/files#r2342213066.
|
||
| // only create this goroutine if we have real-time clock enabled and the pipeline does not have TestStream. | ||
| go func(fireAt time.Time) { | ||
| time.AfterFunc(time.Until(fireAt), func() { | ||
| em.refreshCond.Broadcast() | ||
| }) | ||
| }(t.ToTime()) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.