Skip to content

Commit 5d78f1a

Browse files
[IOTDB-5968] Pipe: pipe task does not work properly after cluster reboot (apache#10046)
* fix: dead lock issue found in NodeManager.java * fix: dead lock issue found in SimpleProgressIndex.java * fix: logic error of 'pipe task start' found in PipeTaskAgent.java , which may cause data consistency issues * fix: NPE error from ClusterSyncInfoFetcher.java * improvement: reduce some exception log print * improvement: introduce some log print for progress index reporting and recovering
1 parent 48afbc5 commit 5d78f1a

13 files changed

Lines changed: 98 additions & 80 deletions

File tree

confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,10 @@ private void setCQConfig(ConfigurationResp dataSet) {
206206
}
207207

208208
private TRuntimeConfiguration getRuntimeConfiguration() {
209-
getPipeManager().getPipePluginCoordinator().lock();
209+
// getPipeTaskCoordinator.lock() should be called outside the getPipePluginCoordinator().lock()
210+
// to avoid deadlock
210211
getPipeManager().getPipeTaskCoordinator().lock();
212+
getPipeManager().getPipePluginCoordinator().lock();
211213
getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
212214
getUDFManager().getUdfInfo().acquireUDFTableLock();
213215

@@ -226,8 +228,11 @@ private TRuntimeConfiguration getRuntimeConfiguration() {
226228
} finally {
227229
getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
228230
getUDFManager().getUdfInfo().releaseUDFTableLock();
229-
getPipeManager().getPipeTaskCoordinator().unlock();
230231
getPipeManager().getPipePluginCoordinator().unlock();
232+
// getPipeTaskCoordinator.unlock() should be called outside the
233+
// getPipePluginCoordinator().unlock()
234+
// to avoid deadlock
235+
getPipeManager().getPipeTaskCoordinator().unlock();
231236
}
232237
}
233238

confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,22 +223,23 @@ public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) {
223223
dataRegionGroupId,
224224
new PipeTaskMeta(
225225
new MinimumProgressIndex(), newDataRegionLeader));
226-
} else {
227-
LOGGER.warn(
228-
"The pipe task meta does not contain the data region group {} or the data region group has already been removed",
229-
dataRegionGroupId);
230226
}
227+
// else:
228+
// "The pipe task meta does not contain the data region group {} or
229+
// the data region group has already been removed"
231230
}
232231
}));
233232
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
234233
}
235234

236235
public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
236+
LOGGER.info("Handling pipe meta changes ...");
237237
pipeMetaKeeper.clear();
238238
plan.getPipeMetaList()
239239
.forEach(
240240
pipeMeta -> {
241241
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
242+
LOGGER.info("Recording pipe meta: {}", pipeMeta);
242243
});
243244
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
244245
}

confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
100100
}
101101

102102
@Override
103-
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
103+
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
104104
LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromHandleOnDataNodes");
105105

106-
pushPipeMetaToDataNodes(env);
106+
pushPipeMetaToDataNodesIgnoreException(env);
107107
}
108108

109109
@Override
@@ -142,10 +142,10 @@ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
142142
}
143143

144144
@Override
145-
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
145+
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
146146
LOGGER.info("PipeHandleLeaderChangeProcedure: rollbackFromCreateOnDataNodes");
147147

148-
pushPipeMetaToDataNodes(env);
148+
pushPipeMetaToDataNodesIgnoreException(env);
149149
}
150150

151151
@Override

confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,20 @@ protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
138138
.getValue()
139139
.getProgressIndex()
140140
.isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
141-
runtimeMetaOnConfigNode
142-
.getValue()
143-
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
141+
LOGGER.info(
142+
"Updating progress index for (pipe name: {}, consensus group id: {}) ... Progress index on config node: {}, progress index from data node: {}",
143+
pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
144+
runtimeMetaOnConfigNode.getKey(),
145+
runtimeMetaOnConfigNode.getValue().getProgressIndex(),
146+
runtimeMetaFromDataNode.getProgressIndex());
147+
LOGGER.info(
148+
"Progress index for (pipe name: {}, consensus group id: {}) is updated to {}",
149+
pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
150+
runtimeMetaOnConfigNode.getKey(),
151+
runtimeMetaOnConfigNode
152+
.getValue()
153+
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
154+
144155
needWriteConsensusOnConfigNodes = true;
145156
}
146157

@@ -149,6 +160,7 @@ protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
149160
pipeTaskMetaOnConfigNode.clearExceptionMessages();
150161
for (final PipeRuntimeException exception :
151162
runtimeMetaFromDataNode.getExceptionMessages()) {
163+
152164
pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
153165

154166
if (exception instanceof PipeRuntimeCriticalException) {
@@ -159,6 +171,7 @@ protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
159171
.get()
160172
.equals(PipeStatus.STOPPED)) {
161173
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
174+
162175
needWriteConsensusOnConfigNodes = true;
163176
needPushPipeMetaToDataNodes = true;
164177

@@ -181,6 +194,7 @@ protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
181194
.forEach(
182195
status -> {
183196
status.set(PipeStatus.STOPPED);
197+
184198
needWriteConsensusOnConfigNodes = true;
185199
needPushPipeMetaToDataNodes = true;
186200

@@ -224,14 +238,14 @@ protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
224238
}
225239

226240
@Override
227-
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
241+
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
228242
LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
229243

230244
if (!needPushPipeMetaToDataNodes) {
231245
return;
232246
}
233247

234-
pushPipeMetaToDataNodes(env);
248+
pushPipeMetaToDataNodesIgnoreException(env);
235249
}
236250

237251
@Override

confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
6666
}
6767

6868
@Override
69-
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
69+
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
7070
LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
7171

72-
pushPipeMetaToDataNodes(env);
72+
pushPipeMetaToDataNodesIgnoreException(env);
7373
}
7474

7575
@Override
@@ -94,10 +94,10 @@ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
9494
}
9595

9696
@Override
97-
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
97+
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
9898
LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
9999

100-
pushPipeMetaToDataNodes(env);
100+
// do nothing
101101
}
102102

103103
@Override

confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ protected void pushPipeMetaToDataNodes(ConfigNodeProcedureEnv env) throws IOExce
200200
}
201201
}
202202

203+
protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv env) {
204+
try {
205+
pushPipeMetaToDataNodes(env);
206+
} catch (Throwable throwable) {
207+
LOGGER.info("Failed to push pipe meta list to data nodes, will retry later.", throwable);
208+
}
209+
}
210+
203211
@Override
204212
public void serialize(DataOutputStream stream) throws IOException {
205213
super.serialize(stream);

node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex progressI
165165
// thatSimpleProgressIndex.memtableFlushOrderId
166166
return this;
167167
} finally {
168-
lock.writeLock().lock();
168+
lock.writeLock().unlock();
169169
}
170170
}
171171

node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ public ProgressIndex getProgressIndex() {
5252
return progressIndex.get();
5353
}
5454

55-
public void updateProgressIndex(ProgressIndex updateIndex) {
56-
progressIndex.updateAndGet(index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
55+
public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
56+
return progressIndex.updateAndGet(
57+
index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
5758
}
5859

5960
public int getLeaderDataNodeId() {

server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,19 @@ public static synchronized void launchPipeTaskAgent() {
163163
PipeAgent.task()
164164
.handlePipeMetaChanges(
165165
getAllPipeInfoResp.getAllPipeInfo().stream()
166-
.map(PipeMeta::deserialize)
166+
.map(
167+
byteBuffer -> {
168+
final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
169+
LOGGER.info(
170+
"Pulled pipe meta from config node: {}, recovering ...", pipeMeta);
171+
return pipeMeta;
172+
})
167173
.collect(Collectors.toList()));
168-
} catch (Throwable throwable) {
174+
} catch (Exception e) {
169175
LOGGER.info(
170176
"Failed to get pipe task meta from config node. Ignore the exception, "
171177
+ "because config node may not be ready yet, and meta will be pushed by config node later.",
172-
throwable);
178+
e);
173179
}
174180
}
175181
}

0 commit comments

Comments
 (0)