Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
Expand All @@ -116,10 +113,6 @@ def sickbayTests = [
// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

// Prism not firing sessions correctly (seems to be merging inapppropriately)
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine',
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext',

// Java side dying during execution.
// https://github.com/apache/beam/issues/32930
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
Expand Down Expand Up @@ -161,14 +154,6 @@ def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',


// These tests fail once Late Data was being precisely dropped.
// They set a single element to be late data, and expect it (correctly) to be preserved.
// Since presently, these are treated as No-ops, the fix is to disable the
// dropping behavior when a stage's input is a Reshuffle/Redistribute transform.
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',

// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
Expand Down
44 changes: 21 additions & 23 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side

// StageAggregates marks the given stage as an aggregation, which
// means elements will only be processed based on windowing strategies.
func (em *ElementManager) StageAggregates(ID string) {
em.stages[ID].aggregate = true
func (em *ElementManager) StageAggregates(ID string, strat WinStrat) {
ss := em.stages[ID]
ss.aggregate = true
ss.strat = strat
}

// StageStateful marks the given stage as stateful, which means elements are
Expand Down Expand Up @@ -1095,7 +1097,7 @@ type stageState struct {
// Special handling bits
stateful bool // whether this stage uses state or timers, and needs keyed processing.
aggregate bool // whether this stage needs to block for aggregation.
strat winStrat // Windowing Strategy for aggregation fireings.
strat WinStrat // Windowing Strategy for aggregation fireings.
processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain.

// onWindowExpiration management
Expand Down Expand Up @@ -1154,7 +1156,6 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
ID: ID,
outputIDs: outputIDs,
sides: sides,
strat: defaultStrat{},
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHolds: newHoldTracker(),

Expand All @@ -1179,18 +1180,21 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
func (ss *stageState) AddPending(newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Data that arrives after the *output* watermark is late.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if e.window.MaxTimestamp() < threshold {
continue
if ss.aggregate {
// Late Data is data that has arrived after that window has expired.
// We only need to drop late data before aggregations.
// TODO - handle for side inputs too.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if ss.strat.EarliestCompletion(e.window) < threshold {
// TODO: figure out Pane and trigger firings.
continue
}
origPending = append(origPending, e)
}
origPending = append(origPending, e)
newPending = origPending
}
newPending = origPending
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
Expand Down Expand Up @@ -1626,10 +1630,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
Expand All @@ -1640,9 +1642,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
}
for _, wins := range ss.state {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip collecting this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
Expand Down Expand Up @@ -1685,9 +1685,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele
var preventDownstreamUpdate bool
for win, keys := range ss.keysToExpireByWindow {
// Check if the window has expired.
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() >= newOut {
if ss.strat.EarliestCompletion(win) >= newOut {
continue
}
// We can't advance the output watermark if there's garbage to collect.
Expand Down
28 changes: 8 additions & 20 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,16 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

type winStrat interface {
EarliestCompletion(typex.Window) mtime.Time
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
}

type defaultStrat struct{}

func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp()
}

func (defaultStrat) String() string {
return "default"
}

type sessionStrat struct {
GapSize time.Duration
}

func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.GapSize)
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}

func (ws sessionStrat) String() string {
return fmt.Sprintf("session[GapSize:%v]", ws.GapSize)
func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
}
13 changes: 7 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import (

func TestEarliestCompletion(t *testing.T) {
tests := []struct {
strat winStrat
strat WinStrat
input typex.Window
want mtime.Time
}{
{defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
{defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
{sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6},
{WinStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
{WinStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{WinStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
{WinStrat{AllowedLateness: 5 * time.Second}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime.Add(5 * time.Second)},
{WinStrat{AllowedLateness: 5 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 8},
{WinStrat{AllowedLateness: 5 * time.Second}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp.Add(5 * time.Second)},
}

for _, test := range tests {
Expand Down
9 changes: 6 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
KeyDec: kd,
}
}
em.StageAggregates(stage.ID)
ws := windowingStrategy(comps, tid)
em.StageAggregates(stage.ID, engine.WinStrat{
AllowedLateness: time.Duration(ws.GetAllowedLateness()) * time.Millisecond,
})
case urns.TransformImpulse:
impulses = append(impulses, stage.ID)
em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil)
Expand Down Expand Up @@ -266,11 +269,11 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
case *pipepb.TestStreamPayload_Event_ElementEvent:
var elms []engine.TestStreamElement
for _, e := range ev.ElementEvent.GetElements() {
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.Time(e.GetTimestamp())})
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.FromMilliseconds(e.GetTimestamp())})
}
tsb.AddElementEvent(ev.ElementEvent.GetTag(), elms)
case *pipepb.TestStreamPayload_Event_WatermarkEvent:
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark()))
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.FromMilliseconds(ev.WatermarkEvent.GetNewWatermark()))
case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent:
if ev.ProcessingTimeEvent.GetAdvanceDuration() == int64(mtime.MaxTimestamp) {
// TODO: Determine the SDK common formalism for setting processing time to infinity.
Expand Down
Loading
Loading