Skip to content

Commit 008f089

Browse files
committed
fix the watermark
1 parent 42e9610 commit 008f089

File tree

5 files changed

+16
-24
lines changed

5 files changed

+16
-24
lines changed

flink-learning-common/src/main/java/com/zhisheng/common/watermarks/MetricWatermark.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ public class MetricWatermark implements AssignerWithPeriodicWatermarks<MetricEve
1616

1717
@Override
1818
public long extractTimestamp(MetricEvent metricEvent, long previousElementTimestamp) {
19-
if (metricEvent.getTimestamp() > currentTimestamp) {
20-
this.currentTimestamp = metricEvent.getTimestamp();
21-
}
22-
return currentTimestamp;
19+
long timestamp = metricEvent.getTimestamp();
20+
currentTimestamp = Math.max(timestamp, currentTimestamp);
21+
return timestamp;
2322
}
2423

2524
@Nullable

flink-learning-examples/src/main/java/com/zhisheng/examples/streaming/watermark/WordPeriodicWatermark.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,13 @@ public class WordPeriodicWatermark implements AssignerWithPeriodicWatermarks<Wor
2020

2121
@Override
2222
public long extractTimestamp(Word word, long previousElementTimestamp) {
23-
this.currentTimestamp = word.getTimestamp();
23+
long timestamp = word.getTimestamp();
24+
currentTimestamp = Math.max(timestamp, currentTimestamp);
2425
log.info("event timestamp = {}, {}, CurrentWatermark = {}, {}", word.getTimestamp(),
2526
DateUtil.format(word.getTimestamp(), YYYY_MM_DD_HH_MM_SS),
2627
getCurrentWatermark().getTimestamp(),
2728
DateUtil.format(getCurrentWatermark().getTimestamp(), YYYY_MM_DD_HH_MM_SS));
2829
return word.getTimestamp();
29-
30-
// if (word.getTimestamp() > currentTimestamp) {
31-
// this.currentTimestamp = word.getTimestamp();
32-
// }
33-
// return currentTimestamp;
3430
}
3531

3632
@Nullable

flink-learning-monitor/flink-learning-monitor-alert/src/main/java/com/zhisheng/alert/schema/OutageMetricWaterMark.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ public Watermark getCurrentWatermark() {
2222

2323
@Override
2424
public long extractTimestamp(OutageMetricEvent outageMetricEvent, long l) {
25-
if (outageMetricEvent.getTimestamp() > currentTimestamp) {
26-
this.currentTimestamp = outageMetricEvent.getTimestamp();
27-
}
28-
return currentTimestamp;
25+
long timestamp = outageMetricEvent.getTimestamp();
26+
currentTimestamp = Math.max(timestamp, currentTimestamp);
27+
return timestamp;
2928
}
3029
}

flink-learning-window/src/main/java/com/zhisheng/window/CustomTriggerMain.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,14 @@ public static void main(String[] args) throws Exception {
3636
@Nullable
3737
@Override
3838
public Watermark getCurrentWatermark() {
39-
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
39+
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag);
4040
}
4141

4242
@Override
4343
public long extractTimestamp(WordEvent element, long previousElementTimestamp) {
44-
if (element.getTimestamp() > currentTimestamp) {
45-
this.currentTimestamp = element.getTimestamp();
46-
}
47-
return currentTimestamp;
44+
long timestamp = element.getTimestamp();
45+
currentTimestamp = Math.max(timestamp, currentTimestamp);
46+
return timestamp;
4847
}
4948
});
5049

flink-learning-window/src/main/java/com/zhisheng/window/Main2.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ public static void main(String[] args) throws Exception {
3737
@Nullable
3838
@Override
3939
public Watermark getCurrentWatermark() {
40-
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
40+
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag);
4141
}
4242

4343
@Override
4444
public long extractTimestamp(WordEvent element, long previousElementTimestamp) {
45-
if (element.getTimestamp() > currentTimestamp) {
46-
this.currentTimestamp = element.getTimestamp();
47-
}
48-
return currentTimestamp;
45+
long timestamp = element.getTimestamp();
46+
currentTimestamp = Math.max(timestamp, currentTimestamp);
47+
return timestamp;
4948
}
5049
});
5150

0 commit comments

Comments
 (0)