Skip to content

Commit

Permalink
[improve](move-memtbale) enable move memtable in routine load (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Jan 6, 2024
1 parent 85dd606 commit db17f5f
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 1 deletion.
1 change: 1 addition & 0 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Status MultiTablePipe::request_and_exec_plans() {
request.__set_loadId(_ctx->id.to_thrift());
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
// no need to register new_load_stream_mgr coz it is already done in routineload submit task

// plan this load
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
if (task.__isset.is_multi_table && task.is_multi_table) {
ctx->is_multi_table = true;
}
if (task.__isset.memtable_on_sink_node) {
ctx->memtable_on_sink_node = task.memtable_on_sink_node;
}

// set execute plan params (only for non-single-stream-multi-table load)
TStreamLoadPutResult put_result;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ class StreamLoadContext {
// for single-stream-multi-table, we have table list
std::vector<std::string> table_list;

bool memtable_on_sink_node = false;

public:
ExecEnv* exec_env() { return _exec_env; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public TRoutineLoadTask createRoutineLoadTask() throws UserException {
} else {
tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN);
}
tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode());
return tRoutineLoadTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public boolean isFinalState() {

protected String sequenceCol;

protected boolean memtableOnSinkNode = false;

/**
* RoutineLoad support json data.
* Require Params:
Expand Down Expand Up @@ -268,6 +270,9 @@ public void setTypeRead(boolean isTypeRead) {
public RoutineLoadJob(long id, LoadDataSourceType type) {
this.id = id;
this.dataSourceType = type;
if (ConnectContext.get() != null) {
this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
}
}

public RoutineLoadJob(Long id, String name,
Expand All @@ -283,6 +288,7 @@ public RoutineLoadJob(Long id, String name,
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
} else {
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
Expand All @@ -304,6 +310,7 @@ public RoutineLoadJob(Long id, String name,
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
} else {
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
Expand Down Expand Up @@ -686,6 +693,15 @@ public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}

@Override
public boolean isMemtableOnSinkNode() {
return memtableOnSinkNode;
}

public void setMemtableOnSinkNode(boolean memtableOnSinkNode) {
this.memtableOnSinkNode = memtableOnSinkNode;
}

public void setComment(String comment) {
this.comment = comment;
}
Expand Down Expand Up @@ -874,11 +890,11 @@ public void prepare() throws UserException {
}

private void initPlanner() throws UserException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
// for multi table load job, the table name is dynamic,we will set table when task scheduling.
if (isMultiTable) {
return;
}
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
planner = new StreamLoadPlanner(db,
(OlapTable) db.getTableOrMetaException(this.tableId, Table.TableType.OLAP), this);
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct TRoutineLoadTask {
14: optional PlanNodes.TFileFormatType format
15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
16: optional bool is_multi_table
17: optional bool memtable_on_sink_node;
}

struct TKafkaMetaProxyRequest {
Expand Down

0 comments on commit db17f5f

Please sign in to comment.