Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ def sickbayTests = [
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', // Uses processing time trigger for early firings.

// A regression introduced when we use number of pending elements rather than watermark to determine
// the bundle readiness of a stateless stage.
// Currently, Prism processes a bundle of [100, ..., 1000] when watermark is set to 100,
// and then a second bundle of [1, ... 99] when the watermark is set to +inf.
// As a result, it yields an output of [-999, 1, 1...], where -999 comes from the difference between 1000 and 1.
// According to https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html,
// the stateful dofn with `RequiresTimeSortedInput` annotation should buffer an element until the element's timestamp + allowed_lateness.
// This stateful dofn feature is not yet supported in Prism.
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',

// Triggered Side Inputs not yet implemented in Prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
Expand Down
166 changes: 146 additions & 20 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,9 @@ type stageKind interface {
// buildEventTimeBundle handles building bundles for the stage per it's kind.
buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool, pendingAdjustment int)

// buildProcessingTimeBundle handles building processing-time bundles for the stage per it's kind.
buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool)
// getPaneOrDefault based on the stage state, element metadata, and bundle id.
getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo
}
Expand Down Expand Up @@ -1327,17 +1329,54 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 1,
endOfWindowReached: endOfWindowReached,
emNow: em.ProcessingTimeNow(),
}, &state)

if ready {
state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached)
} else {
if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil {
for _, t := range pts {
ts := (&state).getTriggerState(t)
if ts.extra == nil || t.shouldFire((&state)) {
// Skipping inserting a processing time timer if the firing time
// is not set or it already should fire.
// When the after processing time triggers should fire, there are
// two scenarios:
// (1) the entire trigger of this window is ready to fire. In this
// case, `ready` should be true and we won't reach here.
// (2) we are still waiting for other triggers (subtriggers) to
// fire (e.g. AfterAll).
continue
}
firingTime := ts.extra.(afterProcessingTimeState).firingTime
notYetHolds := map[mtime.Time]int{}
timer := element{
window: window,
timestamp: firingTime,
holdTimestamp: window.MaxTimestamp(),
pane: typex.NoFiringPane(),
transform: ss.ID, // Use stage id to fake transform id
family: "AfterProcessingTime",
tag: "",
sequence: 1,
elmBytes: nil,
keyBytes: []byte(key),
}
// TODO: how to deal with watermark holds for this implicit processing time timer
// ss.watermarkHolds.Add(timer.holdTimestamp, 1)
ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds)
em.processTimeEvents.Schedule(firingTime, ss.ID)
em.wakeUpAt(firingTime)
}
}
}
// Store the state as triggers may have changed it.
ss.state[LinkID{}][window][key] = state

// If we're ready, it's time to fire!
if ready {
count += ss.buildTriggeredBundle(em, key, window)
count += ss.startTriggeredBundle(em, key, window)
}
return count
}
Expand Down Expand Up @@ -1524,16 +1563,11 @@ func (ss *stageState) savePanes(bundID string, panesInBundle []bundlePane) {
}
}

// buildTriggeredBundle must be called with the stage.mu lock held.
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win typex.Window) int {
func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win typex.Window) ([]element, int) {
var toProcess []element
dnt := ss.pendingByKeys[key]
var notYet []element

rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input}

// Look at all elements for this key, and only for this window.
for dnt.elements.Len() > 0 {
e := heap.Pop(&dnt.elements).(element)
Expand Down Expand Up @@ -1564,6 +1598,19 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
heap.Init(&dnt.elements)
}

return toProcess, accumulationDiff
}

// startTriggeredBundle must be called with the stage.mu lock held.
// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
// When in discarding mode, returns 0, as the pending work already includes these elements.
// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.
func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win typex.Window) int {
toProcess, accumulationDiff := ss.buildTriggeredBundle(em, key, win)
if len(toProcess) == 0 {
return accumulationDiff
}

if ss.inprogressKeys == nil {
ss.inprogressKeys = set[string]{}
}
Expand All @@ -1575,6 +1622,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
},
}

rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input}
ss.makeInProgressBundle(
func() string { return rb.BundleID },
toProcess,
Expand All @@ -1585,9 +1633,11 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
)
slog.Debug("started a triggered bundle", "stageID", ss.ID, "bundleID", rb.BundleID, "size", len(toProcess))

ss.bundlesToInject = append(ss.bundlesToInject, rb)
// TODO: Use ss.bundlesToInject rather than em.injectedBundles
// ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.injectedBundles = append(em.injectedBundles, rb)
Copy link
Collaborator Author

@shunping shunping Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use em.injectedBundles here, because the stage-wise bundlesToInject is causing the pipeline stuck. This needs to be consolidated in the future PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using this mechanism also add "em.inprogressBundles.insert(rb.BundleID)" to avoid "losing" the bundle. injectedBundles are considered "inprogress" even when they aren't yet executing. This is to avoid incorrect premature terminations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very odd that there's a stuckness going on with bundlesToInject. Not sure why that would be offhand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using this mechanism also add "em.inprogressBundles.insert(rb.BundleID)" to avoid "losing" the bundle. injectedBundles are considered "inprogress" even when they aren't yet executing. This is to avoid incorrect premature terminations.

Yes, the line you mentioned is already there.

em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
return accumulationDiff
Expand Down Expand Up @@ -1927,6 +1977,20 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
ss.mu.Lock()
defer ss.mu.Unlock()

toProcess, minTs, newKeys, holdsInBundle, panesInBundle, stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow)

if len(toProcess) == 0 {
// If we have nothing
return "", false, stillSchedulable
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle, panesInBundle)
slog.Debug("started a processing time bundle", "stageID", ss.ID, "bundleID", bundID, "size", len(toProcess), "emNow", emNow)
return bundID, true, stillSchedulable
}

// handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages.
func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow mtime.Time,
processTimerFn func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane)) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
// TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime
// Special Case for ProcessingTime handling.
// Eg. Always queue EventTime elements at minTime.
Expand All @@ -1935,6 +1999,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
// Potentially puts too much work on the scheduling thread though.

var toProcess []element
var panesInBundle []bundlePane
minTs := mtime.MaxTimestamp
holdsInBundle := map[mtime.Time]int{}

Expand Down Expand Up @@ -1968,10 +2033,8 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
if e.timestamp < minTs {
minTs = e.timestamp
}
holdsInBundle[e.holdTimestamp]++

// We're going to process this timer!
toProcess = append(toProcess, e)
toProcess, panesInBundle = processTimerFn(e, toProcess, holdsInBundle, panesInBundle)
}

nextTime = ss.processingTimeTimers.Peek()
Expand All @@ -1986,19 +2049,58 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
em.wakeUpAt(v.firing)
}

// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0)

if len(toProcess) == 0 {
// If we have nothing
return "", false, stillSchedulable
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle, nil)
return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, stillSchedulable
}

slog.Debug("started a processing time bundle", "stageID", ss.ID, "bundleID", bundID, "size", len(toProcess), "emNow", emNow)
return bundID, true, stillSchedulable
// buildProcessingTimeBundle for stateful stages prepares bundles for processing-time timers
func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
return handleProcessingTimeTimer(ss, em, emNow, func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane) {
holdsInBundle[e.holdTimestamp]++
// We're going to process this timer!
toProcess = append(toProcess, e)
return toProcess, nil
})
}

// buildProcessingTimeBundle for aggregation stages prepares bundles for after-processing-time triggers
func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
return handleProcessingTimeTimer(ss, em, emNow, func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane) {
// Different from `buildProcessingTimeBundle` for stateful stage,
// triggers don't hold back the watermark, so no holds are in the triggered bundle.
state := ss.state[LinkID{}][e.window][string(e.keyBytes)]
endOfWindowReached := e.window.MaxTimestamp() < ss.input
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 0,
endOfWindowReached: endOfWindowReached,
emNow: emNow,
}, &state)

if ready {
state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached)

// We're going to process this trigger!
elems, _ := ss.buildTriggeredBundle(em, string(e.keyBytes), e.window)
toProcess = append(toProcess, elems...)

ss.state[LinkID{}][e.window][string(e.keyBytes)] = state

panesInBundle = append(panesInBundle, bundlePane{})
}

return toProcess, panesInBundle
})
}

// buildProcessingTimeBundle for stateless stages is not supposed to be called currently
func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
slog.Error("ordinary stages can't have processing time elements")
return nil, mtime.MinTimestamp, nil, nil, nil, false
}

// makeInProgressBundle is common code to store a set of elements as a bundle in progress.
Expand Down Expand Up @@ -2281,13 +2383,23 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
previousInputW := ss.previousInput
if inputW == upstreamW && previousInputW == inputW {

_, isOrdinaryStage := ss.kind.(*ordinaryStageKind)
if isOrdinaryStage && len(ss.sides) == 0 {
// For ordinary stage with no side inputs, we use whether there are pending elements to determine
// whether a bundle is ready or not.
if len(ss.pending) == 0 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use watermark to determine whether to create a bundle for aggregation and stateful stages, but not stateless stages without sideinputs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady
}
} else if inputW == upstreamW && previousInputW == inputW {
// Otherwise, use the progression of watermark to determine the bundle readiness.
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
slog.Any("upstream == input == previousInput", inputW)))
return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady
}

ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side.Global]
Expand Down Expand Up @@ -2329,3 +2441,17 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) {
func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time {
return localNow + (scheduled - mtime.Now())
}

// wakeUpAt schedules a wakeup signal for the bundle processing loop.
// This is used for processing time timers to ensure the loop re-evaluates
// stages when a processing time timer is expected to fire.
func (em *ElementManager) wakeUpAt(t mtime.Time) {
if em.testStreamHandler == nil && em.config.EnableRTC {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed https://github.com/apache/beam/pull/36069/files#r2342213066.

We should also only be creating this Goroutine when the Real Time Clock is being used, and testStreamHandler is nil. Otherwise behavior will be additionally wonky.

// only create this goroutine if we have real-time clock enabled and the pipeline does not have TestStream.
go func(fireAt time.Time) {
time.AfterFunc(time.Until(fireAt), func() {
em.refreshCond.Broadcast()
})
}(t.ToTime())
}
}
43 changes: 43 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,49 @@ func (ws WinStrat) IsNeverTrigger() bool {
return ok
}

func getAfterProcessingTimeTriggers(t Trigger) []*TriggerAfterProcessingTime {
if t == nil {
return nil
}
var triggers []*TriggerAfterProcessingTime
switch at := t.(type) {
case *TriggerAfterProcessingTime:
return []*TriggerAfterProcessingTime{at}
case *TriggerAfterAll:
for _, st := range at.SubTriggers {
triggers = append(triggers, getAfterProcessingTimeTriggers(st)...)
}
return triggers
case *TriggerAfterAny:
for _, st := range at.SubTriggers {
triggers = append(triggers, getAfterProcessingTimeTriggers(st)...)
}
return triggers
case *TriggerAfterEach:
for _, st := range at.SubTriggers {
triggers = append(triggers, getAfterProcessingTimeTriggers(st)...)
}
return triggers
case *TriggerAfterEndOfWindow:
triggers = append(triggers, getAfterProcessingTimeTriggers(at.Early)...)
triggers = append(triggers, getAfterProcessingTimeTriggers(at.Late)...)
return triggers
case *TriggerOrFinally:
triggers = append(triggers, getAfterProcessingTimeTriggers(at.Main)...)
triggers = append(triggers, getAfterProcessingTimeTriggers(at.Finally)...)
return triggers
case *TriggerRepeatedly:
return getAfterProcessingTimeTriggers(at.Repeated)
default:
return nil
}
}

// GetAfterProcessingTimeTriggers returns all AfterProcessingTime triggers within the trigger.
func (ws WinStrat) GetAfterProcessingTimeTriggers() []*TriggerAfterProcessingTime {
return getAfterProcessingTimeTriggers(ws.Trigger)
}

func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]", ws.AllowedLateness, ws.Trigger)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool {
unsupported := false
switch at := tpb.GetTrigger().(type) {
case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_:
case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
return true
case *pipepb.Trigger_AfterAll_:
for _, st := range at.AfterAll.GetSubtriggers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestUnimplemented(t *testing.T) {
// Currently unimplemented triggers.
// https://github.com/apache/beam/issues/31438
{pipeline: primitives.TriggerAfterSynchronizedProcessingTime},
{pipeline: primitives.TriggerAfterProcessingTime},
}

for _, test := range tests {
Expand Down Expand Up @@ -93,6 +92,8 @@ func TestImplemented(t *testing.T) {
{pipeline: primitives.TriggerAfterEach},
{pipeline: primitives.TriggerAfterEndOfWindow},
{pipeline: primitives.TriggerRepeat},
{pipeline: primitives.TriggerAfterProcessingTime},
{pipeline: primitives.TriggerAfterProcessingTimeNotTriggered},
}

for _, test := range tests {
Expand Down
1 change: 1 addition & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var flinkFilters = []string{
"TestBigQueryIO.*",
"TestBigtableIO.*",
"TestSpannerIO.*",
"TestTriggerAfterProcessingTime",
// The number of produced outputs in AfterSynchronizedProcessingTime varies in different runs.
"TestTriggerAfterSynchronizedProcessingTime",
// The flink runner does not support pipeline drain for SDF.
Expand Down
Loading
Loading