@@ -171,6 +171,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
171171 /** A mode to control the behaviour of the {@link #emitNext(DataOutput)} method. */
172172 private OperatingMode operatingMode ;
173173
174+ /** The timestamp when {#operatingMode} was last changed. */
175+ private long operatingModeChangeTs ;
176+
174177 private final CompletableFuture <Void > finished = new CompletableFuture <>();
175178 private final SourceOperatorAvailabilityHelper availabilityHelper =
176179 new SourceOperatorAvailabilityHelper ();
@@ -256,7 +259,7 @@ public SourceOperator(
256259 this .configuration = checkNotNull (configuration );
257260 this .localHostname = checkNotNull (localHostname );
258261 this .emitProgressiveWatermarks = emitProgressiveWatermarks ;
259- this . operatingMode = OperatingMode .OUTPUT_NOT_INITIALIZED ;
262+ setOperatingMode ( OperatingMode .OUTPUT_NOT_INITIALIZED ) ;
260263 this .watermarkAlignmentParams = watermarkStrategy .getAlignmentParameters ();
261264 this .allowUnalignedSourceSplits = configuration .get (ALLOW_UNALIGNED_SOURCE_SPLITS );
262265 this .canEmitBatchOfRecords = checkNotNull (canEmitBatchOfRecords );
@@ -480,10 +483,10 @@ public CompletableFuture<Void> stop(StopMode mode) {
480483 case WAITING_FOR_ALIGNMENT :
481484 case OUTPUT_NOT_INITIALIZED :
482485 case READING :
483- this . operatingMode =
486+ setOperatingMode (
484487 mode == StopMode .DRAIN
485488 ? OperatingMode .SOURCE_DRAINED
486- : OperatingMode .SOURCE_STOPPED ;
489+ : OperatingMode .SOURCE_STOPPED ) ;
487490 availabilityHelper .forceStop ();
488491 if (this .operatingMode == OperatingMode .SOURCE_STOPPED ) {
489492 stopInternalServices ();
@@ -554,11 +557,11 @@ private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Except
554557 initializeMainOutput (output );
555558 return convertToInternalStatus (sourceReader .pollNext (currentMainOutput ));
556559 case SOURCE_STOPPED :
557- this . operatingMode = OperatingMode .DATA_FINISHED ;
560+ setOperatingMode ( OperatingMode .DATA_FINISHED ) ;
558561 sourceMetricGroup .idlingStarted ();
559562 return DataInputStatus .STOPPED ;
560563 case SOURCE_DRAINED :
561- this . operatingMode = OperatingMode .DATA_FINISHED ;
564+ setOperatingMode ( OperatingMode .DATA_FINISHED ) ;
562565 sourceMetricGroup .idlingStarted ();
563566 return DataInputStatus .END_OF_DATA ;
564567 case DATA_FINISHED :
@@ -589,7 +592,7 @@ private void initializeMainOutput(DataOutput<OUT> output) {
589592 lastInvokedOutput = output ;
590593 // Create per-split output for pending splits added before main output is initialized
591594 createOutputForSplits (splitsToInitializeOutput );
592- this . operatingMode = OperatingMode .READING ;
595+ setOperatingMode ( OperatingMode .READING ) ;
593596 }
594597
595598 private void initializeLatencyMarkerEmitter (DataOutput <OUT > output ) {
@@ -621,7 +624,7 @@ private DataInputStatus convertToInternalStatus(InputStatus inputStatus) {
621624 sourceMetricGroup .idlingStarted ();
622625 return DataInputStatus .NOTHING_AVAILABLE ;
623626 case END_OF_INPUT :
624- this . operatingMode = OperatingMode .DATA_FINISHED ;
627+ setOperatingMode ( OperatingMode .DATA_FINISHED ) ;
625628 sourceMetricGroup .idlingStarted ();
626629 return DataInputStatus .END_OF_DATA ;
627630 default :
@@ -898,14 +901,14 @@ private void checkWatermarkAlignment() {
898901 if (operatingMode == OperatingMode .READING ) {
899902 checkState (waitingForAlignmentFuture .isDone ());
900903 if (shouldWaitForAlignment ()) {
901- operatingMode = OperatingMode .WAITING_FOR_ALIGNMENT ;
904+ setOperatingMode ( OperatingMode .WAITING_FOR_ALIGNMENT ) ;
902905 waitingForAlignmentFuture = new CompletableFuture <>();
903906 mainInputActivityClock .pause ();
904907 }
905908 } else if (operatingMode == OperatingMode .WAITING_FOR_ALIGNMENT ) {
906909 checkState (!waitingForAlignmentFuture .isDone ());
907910 if (!shouldWaitForAlignment ()) {
908- operatingMode = OperatingMode .READING ;
911+ setOperatingMode ( OperatingMode .READING ) ;
909912 waitingForAlignmentFuture .complete (null );
910913 mainInputActivityClock .unPause ();
911914 }
@@ -961,4 +964,18 @@ public void forceStop() {
961964 forcedStopFuture .complete (null );
962965 }
963966 }
967+
968+ private void setOperatingMode (OperatingMode newMode ) {
969+ final long now = System .currentTimeMillis ();
970+ LOG .info (
971+ "Switch mode from {} to {} after {} ms, currentMaxDesiredWatermark={}, latestWatermark={}, oldestWatermark={}" ,
972+ operatingMode ,
973+ newMode ,
974+ now - operatingModeChangeTs ,
975+ currentMaxDesiredWatermark ,
976+ sampledLatestWatermark .getLatest (),
977+ sampledLatestWatermark .getOldestSample ());
978+ operatingMode = newMode ;
979+ operatingModeChangeTs = now ;
980+ }
964981}
0 commit comments