@@ -769,11 +769,12 @@ def process_clear_timer(self):
769
769
expected = [('fired' , ts ) for ts in (20 , 200 )]
770
770
assert_that (actual , equal_to (expected ))
771
771
772
- def _run_pardo_timer_test (
772
+ def _run_pardo_et_timer_test (
773
773
self , n , timer_delay , reset_count = True , clear_timer = True , expected = None ):
774
- class AnotherTimerDoFn (beam .DoFn ):
774
+ class EventTimeTimerDoFn (beam .DoFn ):
775
775
COUNT = userstate .ReadModifyWriteStateSpec (
776
776
'count' , coders .VarInt32Coder ())
777
+ # event-time timer
777
778
TIMER = userstate .TimerSpec ('timer' , userstate .TimeDomain .WATERMARK )
778
779
779
780
def __init__ (self ):
@@ -790,21 +791,28 @@ def process(
790
791
timer = beam .DoFn .TimerParam (TIMER )):
791
792
local_count = count .read () or 0
792
793
local_count += 1
794
+
795
+ _LOGGER .info (f"get element { element_pair [1 ]} , count={ local_count } " )
793
796
if local_count == 1 :
797
+ _LOGGER .info (f"set timer to { t + self ._timer_delay } " )
794
798
timer .set (t + self ._timer_delay )
795
799
796
800
if local_count == self ._n :
797
801
if self ._reset_count :
802
+ _LOGGER .info ("reset count" )
798
803
local_count = 0
799
804
800
805
# don't need the timer now
801
806
if self ._clear_timer :
807
+ _LOGGER .info ("clear timer" )
802
808
timer .clear ()
803
809
804
810
count .write (local_count )
805
811
806
812
@userstate .on_timer (TIMER )
807
813
def timer_callback (self , t = beam .DoFn .TimestampParam ):
814
+ _LOGGER .error ("Timer should not fire here" )
815
+ _LOGGER .info (f"timer callback start (timestamp={ t } )" )
808
816
yield "fired"
809
817
810
818
with self .create_pipeline () as p :
@@ -814,28 +822,49 @@ def timer_callback(self, t=beam.DoFn.TimestampParam):
814
822
stop_timestamp = timestamp .Timestamp .now () + 14 ,
815
823
fire_interval = 1 )
816
824
| beam .WithKeys (0 )
817
- | beam .ParDo (AnotherTimerDoFn ()))
825
+ | beam .ParDo (EventTimeTimerDoFn ()))
818
826
assert_that (actual , equal_to (expected ))
819
827
820
- def test_pardo_timer_with_no_firing (self ):
828
+ def test_pardo_et_timer_with_no_firing (self ):
829
+ if type (self ) in [FnApiRunnerTest ,
830
+ FnApiRunnerTestWithGrpc ,
831
+ FnApiRunnerTestWithGrpcAndMultiWorkers ,
832
+ FnApiRunnerTestWithDisabledCaching ,
833
+ FnApiRunnerTestWithMultiWorkers ,
834
+ FnApiRunnerTestWithBundleRepeat ,
835
+ FnApiRunnerTestWithBundleRepeatAndMultiWorkers ,
836
+ "PortableRunnerTest" ]:
837
+ raise unittest .SkipTest ("https://github.com/apache/beam/issues/35168" )
838
+
821
839
# The timer will not fire. It is initially set to T + 10, but then it is
822
840
# cleared at T + 4 (count == 5), and reset to T + 5 + 10
823
841
# (count is reset every 5 seconds).
824
- self ._run_pardo_timer_test (5 , 10 , True , True , [])
842
+ self ._run_pardo_et_timer_test (5 , 10 , True , True , [])
843
+ raise ValueError ("ok" )
825
844
826
- def test_pardo_timer_with_early_firing (self ):
845
+ def test_pardo_et_timer_with_early_firing (self ):
827
846
# The timer will fire at T + 2, T + 7, T + 12.
828
- self ._run_pardo_timer_test (5 , 2 , True , True , ["fired" , "fired" , "fired" ])
847
+ self ._run_pardo_et_timer_test (5 , 2 , True , True , ["fired" , "fired" , "fired" ])
848
+
849
+ def test_pardo_et_timer_with_no_reset (self ):
850
+ if type (self ) in [FnApiRunnerTest ,
851
+ FnApiRunnerTestWithGrpc ,
852
+ FnApiRunnerTestWithGrpcAndMultiWorkers ,
853
+ FnApiRunnerTestWithDisabledCaching ,
854
+ FnApiRunnerTestWithMultiWorkers ,
855
+ FnApiRunnerTestWithBundleRepeat ,
856
+ FnApiRunnerTestWithBundleRepeatAndMultiWorkers ,
857
+ "PortableRunnerTest" ]:
858
+ raise unittest .SkipTest ("https://github.com/apache/beam/issues/35168" )
829
859
830
- def test_pardo_timer_with_no_reset (self ):
831
860
# The timer will not fire. It is initially set to T + 10, and then it is
832
861
# cleared at T + 4 and never set again (count is not reset).
833
- self ._run_pardo_timer_test (5 , 10 , False , True , [])
862
+ self ._run_pardo_et_timer_test (5 , 10 , False , True , [])
834
863
835
- def test_pardo_timer_with_no_reset_and_no_clear (self ):
864
+ def test_pardo_et_timer_with_no_reset_and_no_clear (self ):
836
865
# The timer will fire at T + 10. After the timer is set, it is never
837
866
# cleared or set again.
838
- self ._run_pardo_timer_test (5 , 10 , False , False , ["fired" ])
867
+ self ._run_pardo_et_timer_test (5 , 10 , False , False , ["fired" ])
839
868
840
869
def test_pardo_state_timers (self ):
841
870
self ._run_pardo_state_timers (windowed = False )
0 commit comments