From db17f5fe791af648060cdcad673a30ce7283b39c Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Sat, 6 Jan 2024 18:22:01 +0800 Subject: [PATCH] [improve](move-memtbale) enable move memtable in routine load (#28974) --- be/src/io/fs/multi_table_pipe.cpp | 1 + .../routine_load_task_executor.cpp | 3 +++ .../runtime/stream_load/stream_load_context.h | 2 ++ .../doris/load/routineload/KafkaTaskInfo.java | 1 + .../doris/load/routineload/RoutineLoadJob.java | 18 +++++++++++++++++- gensrc/thrift/BackendService.thrift | 1 + 6 files changed, 25 insertions(+), 1 deletion(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index a11d6412df26e0..da46645fd4ff34 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -162,6 +162,7 @@ Status MultiTablePipe::request_and_exec_plans() { request.__set_loadId(_ctx->id.to_thrift()); request.fileType = TFileType::FILE_STREAM; request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node); // no need to register new_load_stream_mgr coz it is already done in routineload submit task // plan this load diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index d14b2999c14395..dc6b855cd5a409 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -213,6 +213,9 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { if (task.__isset.is_multi_table && task.is_multi_table) { ctx->is_multi_table = true; } + if (task.__isset.memtable_on_sink_node) { + ctx->memtable_on_sink_node = task.memtable_on_sink_node; + } // set execute plan params (only for non-single-stream-multi-table load) TStreamLoadPutResult put_result; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index e57996af9e12fb..3f1f6b92431af7 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -232,6 +232,8 @@ class StreamLoadContext { // for single-stream-multi-table, we have table list std::vector table_list; + bool memtable_on_sink_node = false; + public: ExecEnv* exec_env() { return _exec_env; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 56e931993e86bd..2075e5548e5e43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -108,6 +108,7 @@ public TRoutineLoadTask createRoutineLoadTask() throws UserException { } else { tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN); } + tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode()); return tRoutineLoadTask; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8bea553c792293..1ce3c1e8c0cb58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -202,6 +202,8 @@ public boolean isFinalState() { protected String sequenceCol; + protected boolean memtableOnSinkNode = false; + /** * RoutineLoad support json data. * Require Params: @@ -268,6 +270,9 @@ public void setTypeRead(boolean isTypeRead) { public RoutineLoadJob(long id, LoadDataSourceType type) { this.id = id; this.dataSourceType = type; + if (ConnectContext.get() != null) { + this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; + } } public RoutineLoadJob(Long id, String name, @@ -283,6 +288,7 @@ public RoutineLoadJob(Long id, String name, if (ConnectContext.get() != null) { SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); + this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; } else { sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } @@ -304,6 +310,7 @@ public RoutineLoadJob(Long id, String name, if (ConnectContext.get() != null) { SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); + this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; } else { sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } @@ -686,6 +693,15 @@ public boolean hasSequenceCol() { return !Strings.isNullOrEmpty(sequenceCol); } + @Override + public boolean isMemtableOnSinkNode() { + return memtableOnSinkNode; + } + + public void setMemtableOnSinkNode(boolean memtableOnSinkNode) { + this.memtableOnSinkNode = memtableOnSinkNode; + } + public void setComment(String comment) { this.comment = comment; } @@ -874,11 +890,11 @@ public void prepare() throws UserException { } private void initPlanner() throws UserException { - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); // for multi table load job, the table name is dynamic,we will set table when task scheduling. if (isMultiTable) { return; } + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); planner = new StreamLoadPlanner(db, (OlapTable) db.getTableOrMetaException(this.tableId, Table.TableType.OLAP), this); } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 2c5b199fc1749c..dab0b8606771d7 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -67,6 +67,7 @@ struct TRoutineLoadTask { 14: optional PlanNodes.TFileFormatType format 15: optional PaloInternalService.TPipelineFragmentParams pipeline_params 16: optional bool is_multi_table + 17: optional bool memtable_on_sink_node; } struct TKafkaMetaProxyRequest {