Skip to content

Commit

Permalink
[Improve] Add a jobId to the doris label to distinguish between tasks (
Browse files Browse the repository at this point in the history
…apache#4839)

Co-authored-by: zhouyao <yao.zhou@marketingforce.com>
  • Loading branch information
2 people authored and EricJoy2048 committed Jul 10, 2023
1 parent f146888 commit 768f0a6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.Serializer;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class DorisSink

private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
private String jobId;

@Override
public String getPluginName() {
Expand All @@ -78,6 +80,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobId = jobContext.getJobId();
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
Expand All @@ -93,7 +100,7 @@ public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> createWriter(
SinkWriter.Context context) throws IOException {
DorisSinkWriter dorisSinkWriter =
new DorisSinkWriter(
context, Collections.emptyList(), seaTunnelRowType, pluginConfig);
context, Collections.emptyList(), seaTunnelRowType, pluginConfig, jobId);
dorisSinkWriter.initializeLoad(Collections.emptyList());
return dorisSinkWriter;
}
Expand All @@ -102,7 +109,7 @@ public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> createWriter(
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> restoreWriter(
SinkWriter.Context context, List<DorisSinkState> states) throws IOException {
DorisSinkWriter dorisWriter =
new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig);
new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig, jobId);
dorisWriter.initializeLoad(states);
return dorisWriter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ public DorisSinkWriter(
SinkWriter.Context context,
List<DorisSinkState> state,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig) {
Config pluginConfig,
String jobId) {
this.dorisConfig = DorisConfig.loadConfig(pluginConfig);
this.lastCheckpointId = state.size() != 0 ? state.get(0).getCheckpointId() : 0;
log.info("restore checkpointId {}", lastCheckpointId);
log.info("labelPrefix " + dorisConfig.getLabelPrefix());
this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(), lastCheckpointId);
this.labelPrefix = dorisConfig.getLabelPrefix() + "_" + context.getIndexOfSubtask();
this.labelPrefix =
dorisConfig.getLabelPrefix() + "_" + jobId + "_" + context.getIndexOfSubtask();
this.labelGenerator = new LabelGenerator(labelPrefix, dorisConfig.getEnable2PC());
this.scheduledExecutorService =
new ScheduledThreadPoolExecutor(
Expand Down

0 comments on commit 768f0a6

Please sign in to comment.