Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ type link struct {
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
stateful bool
hasTimers []string
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
stateful bool
// hasTimers indicates the transform+timerfamily pairs that need to be waited on for
// the stage to be considered complete.
hasTimers []struct{ Transform, TimerFamily string }
processingTimeTimers map[string]bool

exe transformExecuter
Expand Down Expand Up @@ -396,7 +398,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
}
}
for timerID, v := range pardo.GetTimerFamilySpecs() {
stg.hasTimers = append(stg.hasTimers, tid)
stg.hasTimers = append(stg.hasTimers, struct{ Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID})
if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME {
if stg.processingTimeTimers == nil {
stg.processingTimeTimers = map[string]bool{}
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type B struct {
InputTransformID string
Input []*engine.Block // Data and Timers for this bundle.
EstimatedInputElements int
HasTimers []string
HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate.

// IterableSideInputData is a map from transformID + inputID, to window, to data.
IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
Expand Down Expand Up @@ -175,7 +175,8 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
for _, tid := range b.HasTimers {
timers = append(timers, &fnpb.Elements_Timers{
InstructionId: b.InstID,
TransformId: tid,
TransformId: tid.Transform,
TimerFamilyId: tid.TimerFamily,
IsLast: true,
})
}
Expand Down
32 changes: 0 additions & 32 deletions sdks/python/apache_beam/runners/portability/prism_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from apache_beam.runners.portability import portable_runner_test
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.utils import timestamp

Expand Down Expand Up @@ -195,37 +194,6 @@ def test_windowing(self):
assert_that(
res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])]))

# The fn_runner_test.py version of this test doesn't execute the process
# method for some reason. Overridden here to validate that the cleared
# timer won't re-fire.
def test_pardo_timers_clear(self):
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)

class TimerDoFn(beam.DoFn):
def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
unused_key, ts = element
timer.set(ts)
timer.set(2 * ts)

@userstate.on_timer(timer_spec)
def process_timer(
self,
ts=beam.DoFn.TimestampParam,
timer=beam.DoFn.TimerParam(timer_spec)):
timer.set(timestamp.Timestamp(micros=2 * ts.micros))
timer.clear() # Shouldn't fire again
yield 'fired'

with self.create_pipeline() as p:
actual = (
p
| beam.Create([('k1', 10), ('k2', 100)])
| beam.ParDo(TimerDoFn())
| beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))

expected = [('fired', ts) for ts in (20, 200)]
assert_that(actual, equal_to(expected))

# Can't read host files from within docker, read a "local" file there.
def test_read(self):
print('name:', __name__)
Expand Down