Skip to content

Commit 14ba8f7

Browse files
authored
Merge pull request #293 from ni-ze/develop
[ISSUE #294]Discard data because of delay
2 parents a69d341 + 900b260 commit 14ba8f7

File tree

7 files changed

+13
-22
lines changed

7 files changed

+13
-22
lines changed

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,7 @@ public void process(Object data) throws Throwable {
111111
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
112112

113113
if (time < watermark) {
114-
//已经触发,丢弃数据
115-
logger.warn("discard data:[{}], window has been fired. maxFiredWindowEnd:{}, time of data:{}, watermark:{}",
116-
data, watermark, watermark, time);
114+
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
117115
return;
118116
}
119117
WindowInfo.JoinStream stream = (WindowInfo.JoinStream) header.get(Constant.STREAM_TAG);

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,8 @@ public void process(V data) throws Throwable {
122122

123123
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
124124
if (time < watermark) {
125-
//已经触发,丢弃数据
126-
logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}",
127-
data, time, watermark);
125+
//delay data.
126+
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
128127
return;
129128
}
130129

@@ -207,9 +206,7 @@ public void process(V data) throws Throwable {
207206

208207
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
209208
if (time < watermark) {
210-
//已经触发,丢弃数据
211-
logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}",
212-
data, time, watermark);
209+
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
213210
return;
214211
}
215212
//本地存储里面搜索下

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,7 @@ public void process(V data) throws Throwable {
122122

123123
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
124124
if (time < watermark) {
125-
//已经触发,丢弃数据
126-
logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}",
127-
data, time, watermark);
125+
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
128126
return;
129127
}
130128

@@ -212,9 +210,7 @@ public void process(V data) throws Throwable {
212210

213211
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
214212
if (time < watermark) {
215-
//已经触发,丢弃数据
216-
logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}",
217-
data, time, watermark);
213+
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
218214
return;
219215
}
220216

core/src/main/java/org/apache/rocketmq/streams/core/metadata/StreamConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class StreamConfig {
2222
public static final String IDLE_TIME_TO_FIRE_WINDOW = "idleTimeToFireWindow";
2323
public static final String ROCKETMQ_STREAMS_CONSUMER_GROUP = "__source_shuffle_group";
2424
public static final String ROCKETMQ_STREAMS_STATE_CONSUMER_GROUP = "__state_group";
25+
public static final String COMMIT_STATE_INTERNAL_MS = "commitStateIntervalMillisecond";
2526

2627
public static Integer STREAMS_PARALLEL_THREAD_NUM = 1;
2728
public static Integer SHUFFLE_TOPIC_QUEUE_NUM = 8;

core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,6 @@ class PlanetaryEngine<K, V> {
125125
private final IdleWindowScaner idleWindowScaner;
126126
private volatile boolean stop = false;
127127

128-
private long lastCommit = 0;
129-
private int commitInterval = 10 * 1000;
130128
private final HashSet<MessageQueue> mq2Commit = new HashSet<>();
131129

132130

@@ -147,14 +145,15 @@ public PlanetaryEngine(DefaultLitePullConsumer unionConsumer, DefaultMQProducer
147145
}
148146
});
149147
Integer idleTime = (Integer) WorkerThread.this.properties.getOrDefault(StreamConfig.IDLE_TIME_TO_FIRE_WINDOW, 2000);
148+
int commitInterval = (Integer) WorkerThread.this.properties.getOrDefault(StreamConfig.COMMIT_STATE_INTERNAL_MS, 2 * 1000);
150149
this.idleWindowScaner = new IdleWindowScaner(idleTime, executor);
151150
WorkerThread.this.executor.scheduleAtFixedRate(() -> {
152151
try {
153152
doCommit(mq2Commit);
154153
} catch (Throwable t) {
155154
logger.error("commit offset and state error.", t);
156155
}
157-
}, 10, 10, TimeUnit.SECONDS);
156+
}, 1000, commitInterval, TimeUnit.MILLISECONDS);
158157
}
159158

160159

@@ -227,15 +226,15 @@ void runInLoop() throws Throwable {
227226
}
228227

229228
void doCommit(HashSet<MessageQueue> set) throws Throwable {
230-
if ((System.currentTimeMillis() - lastCommit > commitInterval) && set.size() != 0) {
229+
if (set != null && set.size() != 0) {
231230

232231
this.stateStore.persist(set);
233232
this.unionConsumer.commit(set, true);
234233

235234
for (MessageQueue messageQueue : set) {
236235
logger.debug("committed messageQueue: [{}]", messageQueue);
237236
}
238-
lastCommit = System.currentTimeMillis();
237+
239238
set.clear();
240239
}
241240
}

core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void persist(Set<MessageQueue> messageQueues) throws Throwable {
181181
Set<byte[]> keySet = super.getInCalculating(stateTopicQueueKey);
182182

183183
if (keySet == null || keySet.size() == 0) {
184-
return;
184+
continue;
185185
}
186186

187187
String stateTopic = stateTopicQueue.getTopic();

example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static void main(String[] args) {
3737
return new Pair<>(null, value);
3838
})
3939
.flatMap((ValueMapperAction<String, List<String>>) value -> {
40-
String[] splits = value.toLowerCase().split("\\W+");
40+
String[] splits = value.toLowerCase().split(",");
4141
return Arrays.asList(splits);
4242
})
4343
.keyBy(value -> value)

0 commit comments

Comments
 (0)