Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ func (ws WinStrat) String() string {

// triggerInput represents a Key + window + stage's trigger conditions.
type triggerInput struct {
newElementCount int // The number of new elements since the last check.
endOfWindowReached bool // Whether or not the end of the window has been reached.
newElementCount int // The number of new elements since the last check.
endOfWindowReached bool // Whether or not the end of the window has been reached.
emNow mtime.Time // The current processing time in the runner.
}

// Trigger represents a trigger for a windowing strategy. A trigger determines when
Expand Down Expand Up @@ -573,4 +574,105 @@ func (t *TriggerDefault) String() string {
return "Default"
}

// TODO https://github.com/apache/beam/issues/31438 Handle TriggerAfterProcessingTime
// TimestampTransform is the engine's representation of a processing time transform.
type TimestampTransform struct {
Delay time.Duration
AlignToPeriod time.Duration
AlignToOffset time.Duration
}

// TriggerAfterProcessingTime fires once after a specified amount of processing time
// has passed since an element was first seen.
// Uses the extra state field to track the processing time of the first element.
type TriggerAfterProcessingTime struct {
Transforms []TimestampTransform
}

type afterProcessingTimeState struct {
emNow mtime.Time
firingTime mtime.Time
endOfWindowReached bool
Copy link
Collaborator Author

@shunping shunping Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can pass triggerInput as an argument in shouldFire() and reset(). Then we don't need to put emNow and endOfWindowReached inside ts.extra.

I am fine with both approaches, but would also want to see if you have any opinion on that. @lostluck

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handy to have emNow in state, since it helps track the change happening. Feels consistent. That's partly why there isn't the triggerInput on those methods presently: it's harder to reason about the state machine, if the state isn't in the machine.

}

func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateData) {
ts := state.getTriggerState(t)
if ts.finished {
return
}

if ts.extra == nil {
ts.extra = afterProcessingTimeState{
emNow: input.emNow,
firingTime: t.applyTimestampTransforms(input.emNow),
endOfWindowReached: input.endOfWindowReached,
}
} else {
s, _ := ts.extra.(afterProcessingTimeState)
s.emNow = input.emNow
s.endOfWindowReached = input.endOfWindowReached
ts.extra = s
}

state.setTriggerState(t, ts)
}

func (t *TriggerAfterProcessingTime) applyTimestampTransforms(start mtime.Time) mtime.Time {
ret := start
for _, transform := range t.Transforms {
ret = ret + mtime.Time(transform.Delay/time.Millisecond)
if transform.AlignToPeriod > 0 {
// timestamp - (timestamp % period) + period
// And with an offset, we adjust before and after.
tsMs := ret
periodMs := mtime.Time(transform.AlignToPeriod / time.Millisecond)
offsetMs := mtime.Time(transform.AlignToOffset / time.Millisecond)

adjustedMs := tsMs - offsetMs
alignedMs := adjustedMs - (adjustedMs % periodMs) + periodMs + offsetMs
ret = alignedMs
}
}
return ret
}

func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool {
ts := state.getTriggerState(t)
if ts.extra == nil || ts.finished {
return false
}
s := ts.extra.(afterProcessingTimeState)
return s.emNow >= s.firingTime
}

func (t *TriggerAfterProcessingTime) onFire(state *StateData) {
ts := state.getTriggerState(t)
if ts.finished {
return
}

// We don't reset the state here, only mark it as finished
ts.finished = true
state.setTriggerState(t, ts)
}

func (t *TriggerAfterProcessingTime) reset(state *StateData) {
ts := state.getTriggerState(t)
if ts.extra != nil {
if ts.extra.(afterProcessingTimeState).endOfWindowReached {
delete(state.Trigger, t)
return
}
}

// Not reaching the end of window yet.
// We keep the state (especially the next possible firing time) in case the trigger is called again
ts.finished = false
s := ts.extra.(afterProcessingTimeState)
s.firingTime = t.applyTimestampTransforms(s.emNow) // compute next possible firing time
ts.extra = s
state.setTriggerState(t, ts)
}

func (t *TriggerAfterProcessingTime) String() string {
return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms)
}
129 changes: 129 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,135 @@ func TestTriggers_isReady(t *testing.T) {
{triggerInput{newElementCount: 1, endOfWindowReached: true}, false},
{triggerInput{newElementCount: 1, endOfWindowReached: true}, true}, // Late
},
}, {
name: "afterProcessingTime_Delay_Exact",
trig: &TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{Delay: 3 * time.Second},
},
},
inputs: []io{
{triggerInput{emNow: 0}, false}, // the trigger is set to fire at 3s after 0
{triggerInput{emNow: 1000}, false},
{triggerInput{emNow: 2000}, false},
{triggerInput{emNow: 3000}, true}, // fire
{triggerInput{emNow: 4000}, false},
{triggerInput{emNow: 5000}, false},
{triggerInput{emNow: 6000}, false},
{triggerInput{emNow: 7000}, false},
},
}, {
name: "afterProcessingTime_Delay_Late",
trig: &TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{Delay: 3 * time.Second},
},
},
inputs: []io{
{triggerInput{emNow: 0}, false}, // the trigger is set to fire at 3s after 0
{triggerInput{emNow: 1000}, false},
{triggerInput{emNow: 2000}, false},
{triggerInput{emNow: 3001}, true}, // fire a little after the preset time
{triggerInput{emNow: 4000}, false},
},
}, {
name: "afterProcessingTime_AlignToPeriodOnly",
trig: &TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{AlignToPeriod: 5 * time.Second},
},
},
inputs: []io{
{triggerInput{emNow: 1500}, false}, // align 1.5s to 5s
{triggerInput{emNow: 2000}, false},
{triggerInput{emNow: 4999}, false},
{triggerInput{emNow: 5000}, true}, // fire at 5
{triggerInput{emNow: 5001}, false},
},
}, {
name: "afterProcessingTime_AlignToPeriodAndOffset",
trig: &TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond},
},
},
inputs: []io{
{triggerInput{emNow: 1500}, false}, // align 1.5s to 5s plus an 0.2 offset
{triggerInput{emNow: 2000}, false},
{triggerInput{emNow: 5119}, false},
{triggerInput{emNow: 5200}, true}, // fire at 5.2s
{triggerInput{emNow: 5201}, false},
},
}, {
name: "afterProcessingTime_TwoTransforms",
trig: &TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond},
{Delay: 1 * time.Second},
},
},
inputs: []io{
{triggerInput{emNow: 1500}, false}, // align 1.5s to 5s plus an 0.2 offset and a 1s delay
{triggerInput{emNow: 2000}, false},
{triggerInput{emNow: 5119}, false},
{triggerInput{emNow: 5200}, false},
{triggerInput{emNow: 5201}, false},
{triggerInput{emNow: 6119}, false},
{triggerInput{emNow: 6200}, true}, // fire
{triggerInput{emNow: 6201}, false},
},
}, {
name: "afterProcessingTime_Repeated", trig: &TriggerRepeatedly{
&TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{Delay: 3 * time.Second},
}}},
inputs: []io{
{triggerInput{emNow: 0}, false},
{triggerInput{emNow: 1000}, false},
{triggerInput{emNow: 2000}, false},
{triggerInput{emNow: 3000}, true}, // firing the first time, trigger set again
{triggerInput{emNow: 4000}, false},
{triggerInput{emNow: 5000}, false},
{triggerInput{emNow: 6000}, true}, // firing the second time
},
}, {
name: "afterProcessingTime_Repeated_AcrossWindows", trig: &TriggerRepeatedly{
&TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{Delay: 3 * time.Second},
}}},
inputs: []io{
{triggerInput{emNow: 0}, false},
{triggerInput{emNow: 1000}, false},
{triggerInput{emNow: 2000}, false},
{triggerInput{emNow: 3000}, true}, // fire the first time, trigger is set again
{triggerInput{emNow: 4000}, false},
{triggerInput{emNow: 5000}, false},
{triggerInput{emNow: 6000,
endOfWindowReached: true}, true}, // fire the second time, reach end of window and start over
{triggerInput{emNow: 7000}, false}, // trigger firing time is set to 7s + 3s = 10s
{triggerInput{emNow: 8000}, false},
{triggerInput{emNow: 9000}, false},
{triggerInput{emNow: 10000}, true}, // fire in the new window
},
}, {
name: "afterProcessingTime_Repeated_Composite", trig: &TriggerRepeatedly{
&TriggerAfterAny{SubTriggers: []Trigger{
&TriggerAfterProcessingTime{
Transforms: []TimestampTransform{
{Delay: 3 * time.Second},
},
},
&TriggerElementCount{ElementCount: 2},
}}},
inputs: []io{
{triggerInput{emNow: 0, newElementCount: 1}, false}, // ElmCount = 1, set AfterProcessingTime trigger firing time to 3s
{triggerInput{emNow: 1000, newElementCount: 1}, true}, // ElmCount = 2, fire ElmCount trigger and reset ElmCount and AfterProcessingTime firing time (4s)
{triggerInput{emNow: 4000, newElementCount: 1}, true}, // ElmCount = 1, fire AfterProcessingTime trigger and reset ElmCount and AfterProcessingTime firing time (7s)
{triggerInput{emNow: 5000, newElementCount: 1}, false}, // ElmCount = 1
{triggerInput{emNow: 5500, newElementCount: 1}, true}, // ElmCount = 2, fire ElmCount trigger
},
}, {
name: "default",
trig: &TriggerDefault{},
Expand Down
22 changes: 21 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,27 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger {
}
case *pipepb.Trigger_Repeat_:
return &engine.TriggerRepeatedly{Repeated: buildTrigger(at.Repeat.GetSubtrigger())}
case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_:
case *pipepb.Trigger_AfterProcessingTime_:
var transforms []engine.TimestampTransform
for _, ts := range at.AfterProcessingTime.GetTimestampTransforms() {
var delay, period, offset time.Duration
if d := ts.GetDelay(); d != nil {
delay = time.Duration(d.GetDelayMillis()) * time.Millisecond
}
if align := ts.GetAlignTo(); align != nil {
period = time.Duration(align.GetPeriod()) * time.Millisecond
offset = time.Duration(align.GetOffset()) * time.Millisecond
}
transforms = append(transforms, engine.TimestampTransform{
Delay: delay,
AlignToPeriod: period,
AlignToOffset: offset,
})
}
return &engine.TriggerAfterProcessingTime{
Transforms: transforms,
}
case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
panic(fmt.Sprintf("unsupported trigger: %v", prototext.Format(tpb)))
default:
return &engine.TriggerDefault{}
Expand Down
Loading