Skip to content

Commit ef30dd1

Browse files
starmilkxinni-ze
authored andcommitted
add watermark in IdleWindowScaner
1 parent ef51008 commit ef30dd1

File tree

4 files changed

+43
-36
lines changed

4 files changed

+43
-36
lines changed

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void process(Object data) throws Throwable {
121121
throw new IllegalStateException(format);
122122
}
123123

124-
store(key, data, time, streamType);
124+
store(key, data, time, watermark, streamType);
125125

126126
List<WindowKey> fire = this.joinWindowFire.fire(this.name, watermark, streamType);
127127
for (WindowKey windowKey : fire) {
@@ -130,7 +130,7 @@ public void process(Object data) throws Throwable {
130130
}
131131

132132

133-
private void store(Object key, Object data, long time, StreamType streamType) throws Throwable {
133+
private void store(Object key, Object data, long time, long watermark, StreamType streamType) throws Throwable {
134134
String name = Utils.buildKey(this.name, streamType.name());
135135
List<Window> windows = super.calculateWindow(windowInfo, time);
136136
for (Window window : windows) {
@@ -142,12 +142,12 @@ private void store(Object key, Object data, long time, StreamType streamType) th
142142
case LEFT_STREAM:
143143
WindowState<K, V1> leftState = new WindowState<>((K) key, (V1) data, time);
144144
this.leftWindowStore.put(stateTopicMessageQueue, windowKey, leftState);
145-
this.idleWindowScaner.putJoinWindowCallback(windowKey, joinWindowFire);
145+
this.idleWindowScaner.putJoinWindowCallback(windowKey, watermark, joinWindowFire);
146146
break;
147147
case RIGHT_STREAM:
148148
WindowState<K, V2> rightState = new WindowState<>((K) key, (V2) data, time);
149149
this.rightWindowStore.put(stateTopicMessageQueue, windowKey, rightState);
150-
this.idleWindowScaner.putJoinWindowCallback(windowKey, joinWindowFire);
150+
this.idleWindowScaner.putJoinWindowCallback(windowKey, watermark, joinWindowFire);
151151
break;
152152
}
153153
}

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void process(V data) throws Throwable {
151151
//f(Window + key, newValue, store)
152152
WindowState<K, Accumulator<R, OV>> state = new WindowState<>(key, storeAccumulator, time);
153153
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
154-
this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, this.accumulatorWindowFire);
154+
this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, watermark, this.accumulatorWindowFire);
155155
}
156156

157157
try {
@@ -188,7 +188,6 @@ public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowab
188188
WindowState::windowState2Byte);
189189

190190
this.idleWindowScaner = context.getDefaultWindowScaner();
191-
this.idleWindowScaner.initSessionTimeOut(windowInfo.getSessionTimeout().toMilliseconds());
192191

193192
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
194193
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
@@ -227,7 +226,7 @@ public void process(V data) throws Throwable {
227226
logger.info("new session window, with key={}, valueTime={}, sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
228227
Utils.format(newSessionWindowTime.getKey()), Utils.format(newSessionWindowTime.getValue()));
229228
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
230-
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, this.accumulatorSessionWindowFire);
229+
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark, this.accumulatorSessionWindowFire);
231230
}
232231
}
233232

@@ -324,7 +323,7 @@ public void process(V data) throws Throwable {
324323

325324
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
326325

327-
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, this.accumulatorSessionWindowFire);
326+
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark, this.accumulatorSessionWindowFire);
328327
this.idleWindowScaner.removeOldAccumulatorSession(needToDelete);
329328

330329
this.windowStore.deleteByKey(needToDelete);

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void process(V data) throws Throwable {
153153
//f(Window + key, newValue, store)
154154
WindowState<K, OV> state = new WindowState<>(key, newValue, time);
155155
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
156-
this.idleWindowScaner.putAggregateWindowCallback(windowKey, this.aggregateWindowFire);
156+
this.idleWindowScaner.putAggregateWindowCallback(windowKey, watermark, this.aggregateWindowFire);
157157
}
158158

159159
try {
@@ -192,7 +192,6 @@ public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowab
192192
WindowState::windowState2Byte);
193193

194194
this.idleWindowScaner = context.getDefaultWindowScaner();
195-
this.idleWindowScaner.initSessionTimeOut(windowInfo.getSessionTimeout().toMilliseconds());
196195

197196
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
198197
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
@@ -231,7 +230,7 @@ public void process(V data) throws Throwable {
231230
logger.info("new session window, with key={}, valueTime={}, sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
232231
Utils.format(newSessionWindowTime.getKey()), Utils.format(newSessionWindowTime.getValue()));
233232
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
234-
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, this.aggregateSessionWindowFire);
233+
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark, this.aggregateSessionWindowFire);
235234
}
236235
}
237236

@@ -323,7 +322,7 @@ public void process(V data) throws Throwable {
323322

324323
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
325324

326-
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, this.aggregateSessionWindowFire);
325+
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark, this.aggregateSessionWindowFire);
327326
this.idleWindowScaner.removeOldAggregateSession(needToDelete);
328327

329328
this.windowStore.deleteByKey(needToDelete);

core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java

+33-24
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Iterator;
2626
import java.util.Map;
2727
import java.util.concurrent.ConcurrentHashMap;
28-
import java.util.concurrent.Executors;
2928
import java.util.concurrent.ScheduledExecutorService;
3029
import java.util.concurrent.TimeUnit;
3130

@@ -34,7 +33,6 @@ public class IdleWindowScaner implements AutoCloseable {
3433
private static final Logger logger = LoggerFactory.getLogger(IdleWindowScaner.class.getName());
3534

3635
private final Integer maxIdleTime;
37-
private long sessionTimeOut = 0;
3836
private final ScheduledExecutorService executor;
3937

4038
private final ConcurrentHashMap<WindowKey, TimeType> lastUpdateTime2WindowKey = new ConcurrentHashMap<>(16);
@@ -59,65 +57,66 @@ public IdleWindowScaner(Integer maxIdleTime, ScheduledExecutorService executor)
5957
}, 0, 1000, TimeUnit.MILLISECONDS);
6058
}
6159

62-
public void initSessionTimeOut(long sessionTimeOut) {
63-
this.sessionTimeOut = sessionTimeOut;
64-
}
65-
66-
public void putAccumulatorWindowCallback(WindowKey windowKey, AccumulatorWindowFire<?, ?, ?, ?> function) {
60+
public void putAccumulatorWindowCallback(WindowKey windowKey, long watermark, AccumulatorWindowFire<?, ?, ?, ?> function) {
6761
this.fireWindowCallBack.putIfAbsent(windowKey, function);
6862
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
6963
if (timeType == null) {
70-
timeType = new TimeType(Type.AccumulatorWindow, System.currentTimeMillis());
64+
timeType = new TimeType(Type.AccumulatorWindow, System.currentTimeMillis(), watermark);
7165
} else {
7266
timeType.setUpdateTime(System.currentTimeMillis());
67+
timeType.setWatermark(watermark);
7368
}
7469
return timeType;
7570
});
7671
}
7772

78-
public void putAccumulatorSessionWindowCallback(WindowKey windowKey, AccumulatorSessionWindowFire<?, ?, ?, ?> function) {
73+
public void putAccumulatorSessionWindowCallback(WindowKey windowKey, long watermark, AccumulatorSessionWindowFire<?, ?, ?, ?> function) {
7974
this.fireSessionWindowCallback.putIfAbsent(windowKey, function);
8075
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
8176
if (timeType == null) {
82-
timeType = new TimeType(Type.AccumulatorSessionWindow, System.currentTimeMillis());
77+
timeType = new TimeType(Type.AccumulatorSessionWindow, System.currentTimeMillis(), watermark);
8378
} else {
8479
timeType.setUpdateTime(System.currentTimeMillis());
80+
timeType.setWatermark(watermark);
8581
}
8682
return timeType;
8783
});
8884
}
8985

90-
public void putAggregateWindowCallback(WindowKey windowKey, AggregateWindowFire<?, ?, ?> function) {
86+
public void putAggregateWindowCallback(WindowKey windowKey, long watermark, AggregateWindowFire<?, ?, ?> function) {
9187
this.windowKeyAggregate.putIfAbsent(windowKey, function);
9288
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
9389
if (timeType == null) {
94-
timeType = new TimeType(Type.AggregateWindow, System.currentTimeMillis());
90+
timeType = new TimeType(Type.AggregateWindow, System.currentTimeMillis(), watermark);
9591
} else {
9692
timeType.setUpdateTime(System.currentTimeMillis());
93+
timeType.setWatermark(watermark);
9794
}
9895
return timeType;
9996
});
10097
}
10198

102-
public void putAggregateSessionWindowCallback(WindowKey windowKey, AggregateSessionWindowFire<?, ?, ?> function) {
99+
public void putAggregateSessionWindowCallback(WindowKey windowKey, long watermark, AggregateSessionWindowFire<?, ?, ?> function) {
103100
this.windowKeyAggregateSession.putIfAbsent(windowKey, function);
104101
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
105102
if (timeType == null) {
106-
timeType = new TimeType(Type.AggregateSessionWindow, System.currentTimeMillis());
103+
timeType = new TimeType(Type.AggregateSessionWindow, System.currentTimeMillis(), watermark);
107104
} else {
108105
timeType.setUpdateTime(System.currentTimeMillis());
106+
timeType.setWatermark(watermark);
109107
}
110108
return timeType;
111109
});
112110
}
113111

114-
public void putJoinWindowCallback(WindowKey windowKey, JoinWindowFire<?, ?, ?, ?> function) {
112+
public void putJoinWindowCallback(WindowKey windowKey, long watermark, JoinWindowFire<?, ?, ?, ?> function) {
115113
this.fireJoinWindowCallback.putIfAbsent(windowKey, function);
116114
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
117115
if (timeType == null) {
118-
timeType = new TimeType(Type.JoinWindow, System.currentTimeMillis());
116+
timeType = new TimeType(Type.JoinWindow, System.currentTimeMillis(), watermark);
119117
} else {
120118
timeType.setUpdateTime(System.currentTimeMillis());
119+
timeType.setWatermark(watermark);
121120
}
122121
return timeType;
123122
});
@@ -169,9 +168,10 @@ private void scanAndFireWindow() throws Throwable {
169168
switch (type) {
170169
case AggregateSessionWindow:
171170
case AccumulatorSessionWindow: {
172-
if (idleTime >= sessionTimeOut) {
171+
long watermark = timeType.getWatermark() + idleTime;
172+
if (watermark > windowKey.getWindowEnd()) {
173173
try {
174-
doFire(windowKey, type);
174+
doFire(windowKey, type, watermark);
175175
} finally {
176176
iterator.remove();
177177
}
@@ -181,10 +181,10 @@ private void scanAndFireWindow() throws Throwable {
181181
case AccumulatorWindow:
182182
case JoinWindow:
183183
case AggregateWindow: {
184-
long windowSize = windowKey.getWindowEnd() - windowKey.getWindowStart();
185-
if (idleTime > this.maxIdleTime && idleTime > windowSize) {
184+
long watermark = timeType.getWatermark() + idleTime;
185+
if (idleTime > this.maxIdleTime && watermark > windowKey.getWindowEnd()) {
186186
try {
187-
doFire(windowKey, type);
187+
doFire(windowKey, type, watermark);
188188
} finally {
189189
iterator.remove();
190190
}
@@ -197,8 +197,7 @@ private void scanAndFireWindow() throws Throwable {
197197
}
198198
}
199199

200-
private void doFire(WindowKey windowKey, Type type) throws Throwable {
201-
long watermark = windowKey.getWindowEnd() + 1;
200+
private void doFire(WindowKey windowKey, Type type, long watermark) throws Throwable {
202201
String operatorName = windowKey.getOperatorName();
203202

204203
switch (type) {
@@ -258,10 +257,12 @@ public void close() throws Exception {
258257
static class TimeType {
259258
private Type type;
260259
private long updateTime;
260+
private long watermark;
261261

262-
public TimeType(Type type, long updateTime) {
262+
public TimeType(Type type, long updateTime, long watermark) {
263263
this.type = type;
264264
this.updateTime = updateTime;
265+
this.watermark = watermark;
265266
}
266267

267268
public Type getType() {
@@ -279,6 +280,14 @@ public long getUpdateTime() {
279280
public void setUpdateTime(long updateTime) {
280281
this.updateTime = updateTime;
281282
}
283+
284+
public long getWatermark() {
285+
return watermark;
286+
}
287+
288+
public void setWatermark(long watermark) {
289+
this.watermark = watermark;
290+
}
282291
}
283292

284293
enum Type {

0 commit comments

Comments
 (0)