From 509689491f0e9d3258131f6bc29fa86b375b9e9d Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Sat, 27 May 2023 22:42:10 +0800 Subject: [PATCH 1/7] [improvement](exec) Refactor the partition sort node to send data in pipeline mode (#20128) before: the node will wait to retrieve all data from child, then send data to parent. now: for data from child that does not require sorting, it can be sent to parent immediately. --- be/src/vec/exec/vpartition_sort_node.cpp | 44 ++++++++++++++++++------ be/src/vec/exec/vpartition_sort_node.h | 3 ++ 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index 8f3f50b9d4ebd23..cb3b1992857f0fd 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -180,9 +181,11 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl _value_places[0]->append_whole_block(input_block, child(0)->row_desc()); } else { //just simply use partition num to check - //TODO: here could set can read to true directly. need mutex if (_num_partition > 512 && child_input_rows < 10000 * _num_partition) { - _blocks_buffer.push(std::move(*input_block)); + { + std::lock_guard lock(_buffer_mutex); + _blocks_buffer.push(std::move(*input_block)); + } } else { RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size())); RETURN_IF_CANCELLED(state); @@ -219,6 +222,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl debug_profile(); } COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition)); + //so all data from child have sink completed _can_read = true; } return Status::OK(); @@ -257,17 +261,32 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* state) { return Status::OK(); } +bool VPartitionSortNode::can_read() { + std::lock_guard lock(_buffer_mutex); + return !_blocks_buffer.empty() || _can_read; +} + Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { RETURN_IF_CANCELLED(state); output_block->clear_column_data(); - bool current_eos = false; - RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos)); - if (_sort_idx >= _partition_sorts.size() && output_block->rows() == 0) { + { + std::lock_guard lock(_buffer_mutex); if (_blocks_buffer.empty() == false) { _blocks_buffer.front().swap(*output_block); _blocks_buffer.pop(); - } else { + return Status::OK(); + } + } + + if (_can_read) { + bool current_eos = false; + RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos)); + } + { + std::lock_guard lock(_buffer_mutex); + if (_blocks_buffer.empty() && _sort_idx >= _partition_sorts.size()) { + _can_read = false; *eos = true; } } @@ -309,6 +328,9 @@ Status VPartitionSortNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } + if (state->enable_profile()) { + debug_profile(); + } return ExecNode::close(state); } @@ -435,20 +457,22 @@ void VPartitionSortNode::debug_profile() { fmt::format_to(partition_blocks_read, "["); for (auto place : _value_places) { fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows()); - fmt::format_to(partition_rows_read, "{}, ", place->blocks.size()); + fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size()); } fmt::format_to(partition_rows_read, "]"); fmt::format_to(partition_blocks_read, "]"); - runtime_profile()->add_info_string("PerPartitionBlocksRead", partition_blocks_read.data()); - runtime_profile()->add_info_string("PerPartitionRowsRead", partition_rows_read.data()); + runtime_profile()->add_info_string("PerPartitionBlocksRead", + fmt::to_string(partition_blocks_read)); + runtime_profile()->add_info_string("PerPartitionRowsRead", fmt::to_string(partition_rows_read)); fmt::memory_buffer partition_output_rows; fmt::format_to(partition_output_rows, "["); for (auto row : partition_profile_output_rows) { fmt::format_to(partition_output_rows, "{}, ", row); } fmt::format_to(partition_output_rows, "]"); - runtime_profile()->add_info_string("PerPartitionOutputRows", partition_output_rows.data()); + runtime_profile()->add_info_string("PerPartitionOutputRows", + fmt::to_string(partition_output_rows)); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 4143b19dc9b110f..0b24ce83787372a 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -21,6 +21,7 @@ #include #include +#include #include "exec/exec_node.h" #include "vec/columns/column.h" @@ -329,6 +330,7 @@ class VPartitionSortNode : public ExecNode { Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; void debug_profile(); + bool can_read(); private: template @@ -370,6 +372,7 @@ class VPartitionSortNode : public ExecNode { std::unique_ptr _previous_row = nullptr; std::queue _blocks_buffer; int64_t child_input_rows = 0; + std::mutex _buffer_mutex; RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _emplace_key_timer; From 637e083343711546b2ee816309106539c42c263a Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Sat, 27 May 2023 22:42:25 +0800 Subject: [PATCH 2/7] [regression](test) fix test case failed in pipeline mode (#20139) --- .../array_functions/test_array_map_function.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy index 4ba9b053bb087ea..9ab01a37f97d554 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy @@ -79,7 +79,7 @@ suite("test_array_map_function") { """ test { - sql"""select array_map((x,y)->x+y, c_array1, c_array2) from array_test2 where id > 10 order by id;""" + sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from array_test2 where id > 10 order by id;""" check{result, exception, startTime, endTime -> assertTrue(exception != null) logger.info(exception.message) From 8c00012e8f7e9a49c38636fab25878e2555c8e06 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sat, 27 May 2023 22:43:51 +0800 Subject: [PATCH 3/7] [improvement](community) simplify the pr template and modify pr labeler (#20127) --- .github/PULL_REQUEST_TEMPLATE.md | 14 ++------------ .github/workflows/labeler/scope-label-conf.yml | 14 ++------------ 2 files changed, 4 insertions(+), 24 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 4a3ddae5f14a64b..fed38e0cd58b94f 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,18 +1,8 @@ -# Proposed changes +## Proposed changes Issue Number: close #xxx -## Problem summary - -Describe your changes. - -## Checklist(Required) - -* [ ] Does it affect the original behavior -* [ ] Has unit tests been added -* [ ] Has document been added or modified -* [ ] Does it need to update dependencies -* [ ] Is this PR support rollback (If NO, please explain WHY) +<--Describe your changes.--> ## Further comments diff --git a/.github/workflows/labeler/scope-label-conf.yml b/.github/workflows/labeler/scope-label-conf.yml index 9b80b7e66fcc715..88965ed4b7fda96 100644 --- a/.github/workflows/labeler/scope-label-conf.yml +++ b/.github/workflows/labeler/scope-label-conf.yml @@ -31,9 +31,6 @@ kind/test: area/docker: - docker/runtime/* -area/vectorization: - - be/src/vec/**/* - area/nereids: - fe/fe-core/src/main/java/org/apache/doris/nereids/**/* - fe/fe-core/src/main/antlr4/org/apache/doris/nereids/**/* @@ -47,13 +44,6 @@ area/load: - fe/fe-core/src/main/java/org/apache/doris/load/* - fe/fe-core/src/main/java/org/apache/doris/load/**/* -area/routine load: - - fe/fe-core/src/main/java/org/apache/doris/load/routineload/* - -area/sql/function: - - fe/fe-core/src/main/java/org/apache/doris/rewrite/FEFunctions.java - - gensrc/script/doris_builtins_functions.py - - be/src/vec/functions/* +area/pipeline: + - be/src/pipeline/**/* -area/spark-load: - - fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoad*.java From 875e72b5ea06cb1441f5b04ace690a5c074684f2 Mon Sep 17 00:00:00 2001 From: caoliang-web <71004656+caoliang-web@users.noreply.github.com> Date: Sat, 27 May 2023 22:44:20 +0800 Subject: [PATCH 4/7] [typo](doc)spark load add task timeout parameter #20115 --- docs/en/docs/data-operate/import/import-way/spark-load-manual.md | 1 + .../docs/data-operate/import/import-way/spark-load-manual.md | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/en/docs/data-operate/import/import-way/spark-load-manual.md b/docs/en/docs/data-operate/import/import-way/spark-load-manual.md index 5e0b7690fc49e9a..76e3f7ed9013183 100644 --- a/docs/en/docs/data-operate/import/import-way/spark-load-manual.md +++ b/docs/en/docs/data-operate/import/import-way/spark-load-manual.md @@ -186,6 +186,7 @@ REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name - `spark.master`: required, yarn is supported at present, `spark://host:port`. - `spark.submit.deployMode`: the deployment mode of Spark Program. It is required and supports cluster and client. - `spark.hadoop.fs.defaultfs`: required when master is yarn. + - `spark.submit.timeout`:spark task timeout, default 5 minutes - Other parameters are optional, refer to `http://spark.apache.org/docs/latest/configuration.html` - YARN RM related parameters are as follows: - If Spark is a single-point RM, you need to configure `spark.hadoop.yarn.resourcemanager.address`,address of the single point resource manager. diff --git a/docs/zh-CN/docs/data-operate/import/import-way/spark-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/spark-load-manual.md index b4ab87f4a70842f..557488eb76a0941 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/spark-load-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/spark-load-manual.md @@ -159,6 +159,7 @@ REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name - `spark.master`: 必填,目前支持 Yarn,Spark://host:port。 - `spark.submit.deployMode`: Spark 程序的部署模式,必填,支持 Cluster、Client 两种。 - `spark.hadoop.fs.defaultFS`: Master 为 Yarn 时必填。 + - `spark.submit.timeout`:spark任务超时时间,默认5分钟 - YARN RM 相关参数如下: - 如果 Spark 为单点 RM,则需要配置`spark.hadoop.yarn.resourcemanager.address`,表示单点 ResourceManager 地址。 - 如果 Spark 为 RM-HA,则需要配置(其中 hostname 和 address 任选一个配置): From 0434c6a7381eca16d460ae48e5dce60b5e6532bf Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Sat, 27 May 2023 22:47:45 +0800 Subject: [PATCH 5/7] [refactor-WIP](TaskWorkerPool) add specific classes for PUSH, PUBLIC_VERION, CLEAR_TRANSACTION tasks (#19822) --- be/src/agent/agent_server.cpp | 25 +- be/src/agent/agent_server.h | 4 +- be/src/agent/task_worker_pool.cpp | 561 +++++++++--------- be/src/agent/task_worker_pool.h | 33 +- .../olap/task/engine_publish_version_task.cpp | 6 +- .../olap/task/engine_publish_version_task.h | 2 +- 6 files changed, 325 insertions(+), 306 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 199770ba4a68c73..9bbe4d862ebdb98 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -82,6 +82,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) #define CREATE_AND_START_THREAD(type, pool_name) #endif // BE_TEST +#ifndef BE_TEST _create_tablet_workers.reset( new CreateTableTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS)); _create_tablet_workers->start(); @@ -89,11 +90,21 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) new DropTableTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS)); _drop_tablet_workers->start(); - // Both PUSH and REALTIME_PUSH type use _push_workers - CREATE_AND_START_POOL(PUSH, _push_workers); - CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers); - CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers); - CREATE_AND_START_POOL(DELETE, _delete_workers); + // Both PUSH and REALTIME_PUSH type use _push_load_workers + _push_load_workers.reset(new PushTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS, + PushTaskPool::PushWokerType::LOAD_V2)); + _push_load_workers->start(); + _publish_version_workers.reset( + new PublishVersionTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS)); + _publish_version_workers->start(); + _clear_transaction_task_workers.reset( + new ClearTransactionTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS)); + _clear_transaction_task_workers->start(); + _push_delete_workers.reset(new PushTaskPool(exec_env, + TaskWorkerPool::ThreadModel::MULTI_THREADS, + PushTaskPool::PushWokerType::DELETE)); + _push_delete_workers->start(); +#endif CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers); CREATE_AND_START_POOL(ALTER_INVERTED_INDEX, _alter_inverted_index_workers); CREATE_AND_START_POOL(CLONE, _clone_workers); @@ -185,9 +196,9 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, break; } if (task.push_req.push_type == TPushType::LOAD_V2) { - _push_workers->submit_task(task); + _push_load_workers->submit_task(task); } else if (task.push_req.push_type == TPushType::DELETE) { - _delete_workers->submit_task(task); + _push_delete_workers->submit_task(task); } else { ret_st = Status::InvalidArgument( "task(signature={}, type={}, push_type={}) has wrong push_type", signature, diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 3aebc3d1f750ac3..3d98fd025dd65d6 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -60,10 +60,10 @@ class AgentServer { std::unique_ptr _create_tablet_workers; std::unique_ptr _drop_tablet_workers; - std::unique_ptr _push_workers; + std::unique_ptr _push_load_workers; std::unique_ptr _publish_version_workers; std::unique_ptr _clear_transaction_task_workers; - std::unique_ptr _delete_workers; + std::unique_ptr _push_delete_workers; std::unique_ptr _alter_tablet_workers; std::unique_ptr _alter_inverted_index_workers; std::unique_ptr _push_cooldown_conf_workers; diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index ea8cbc633a9e2dc..ff49f04ffebe641 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -144,22 +144,12 @@ void TaskWorkerPool::start() { break; case TaskWorkerType::PUSH: case TaskWorkerType::REALTIME_PUSH: - _worker_count = - config::push_worker_count_normal_priority + config::push_worker_count_high_priority; - _cb = std::bind(&TaskWorkerPool::_push_worker_thread_callback, this); break; case TaskWorkerType::PUBLISH_VERSION: - _worker_count = config::publish_version_worker_count; - _cb = std::bind(&TaskWorkerPool::_publish_version_worker_thread_callback, this); break; case TaskWorkerType::CLEAR_TRANSACTION_TASK: - _worker_count = config::clear_transaction_task_worker_count; - _cb = std::bind(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback, - this); break; case TaskWorkerType::DELETE: - _worker_count = config::delete_worker_count; - _cb = std::bind(&TaskWorkerPool::_push_worker_thread_callback, this); break; case TaskWorkerType::ALTER_TABLE: _worker_count = config::alter_tablet_worker_count; @@ -341,32 +331,6 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) TRACE("finish task"); } -uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count, - std::deque& tasks, - TPriority::type priority) { - int32_t index = -1; - std::deque::size_type task_count = tasks.size(); - for (uint32_t i = 0; i < task_count; ++i) { - TAgentTaskRequest task = tasks[i]; - if (priority == TPriority::HIGH) { - if (task.__isset.priority && task.priority == TPriority::HIGH) { - index = i; - break; - } - } - } - - if (index == -1) { - if (priority == TPriority::HIGH) { - return index; - } - - index = 0; - } - - return index; -} - void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; @@ -590,253 +554,6 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6 finish_task_request->__set_task_status(status.to_thrift()); } -void TaskWorkerPool::_push_worker_thread_callback() { - // gen high priority worker thread - TPriority::type priority = TPriority::NORMAL; - int32_t push_worker_count_high_priority = config::push_worker_count_high_priority; - static uint32_t s_worker_count = 0; - { - std::lock_guard worker_thread_lock(_worker_thread_lock); - if (s_worker_count < push_worker_count_high_priority) { - ++s_worker_count; - priority = TPriority::HIGH; - } - } - - while (_is_work) { - TAgentTaskRequest agent_task_req; - TPushReq push_req; - int32_t index = 0; - do { - std::unique_lock worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } - - index = _get_next_task_index(config::push_worker_count_normal_priority + - config::push_worker_count_high_priority, - _tasks, priority); - - if (index < 0) { - // there is no high priority task. notify other thread to handle normal task - _worker_thread_condition_variable.notify_all(); - break; - } - - agent_task_req = _tasks[index]; - push_req = agent_task_req.push_req; - _tasks.erase(_tasks.begin() + index); - } while (false); - - if (index < 0) { - // there is no high priority task in queue - sleep(1); - continue; - } - - LOG(INFO) << "get push task. signature=" << agent_task_req.signature - << ", priority=" << priority << " push_type=" << push_req.push_type; - std::vector tablet_infos; - - EngineBatchLoadTask engine_task(push_req, &tablet_infos); - auto status = _env->storage_engine()->execute_task(&engine_task); - - // Return result to fe - TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - if (push_req.push_type == TPushType::DELETE) { - finish_task_request.__set_request_version(push_req.version); - } - - if (status.ok()) { - LOG_INFO("successfully execute push task") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", push_req.tablet_id) - .tag("push_type", push_req.push_type); - ++_s_report_version; - finish_task_request.__set_finish_tablet_infos(tablet_infos); - } else { - LOG_WARNING("failed to execute push task") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", push_req.tablet_id) - .tag("push_type", push_req.push_type) - .error(status); - } - finish_task_request.__set_task_status(status.to_thrift()); - finish_task_request.__set_report_version(_s_report_version); - - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } -} - -void TaskWorkerPool::_publish_version_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - TPublishVersionRequest publish_version_req; - { - std::unique_lock worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } - - agent_task_req = _tasks.front(); - publish_version_req = agent_task_req.publish_version_req; - _tasks.pop_front(); - } - - DorisMetrics::instance()->publish_task_request_total->increment(1); - VLOG_NOTICE << "get publish version task. signature=" << agent_task_req.signature; - - std::vector error_tablet_ids; - std::vector succ_tablet_ids; - uint32_t retry_time = 0; - Status status; - bool is_task_timeout = false; - while (retry_time < PUBLISH_VERSION_MAX_RETRY) { - error_tablet_ids.clear(); - EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, - &succ_tablet_ids); - status = _env->storage_engine()->execute_task(&engine_task); - if (status.ok()) { - break; - } else if (status.is()) { - int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time; - if (time_elapsed > PUBLISH_TIMEOUT_SEC) { - LOG(INFO) << "task elapsed " << time_elapsed - << " seconds since it is inserted to queue, it is timeout"; - is_task_timeout = true; - } else { - // version not continuous, put to queue and wait pre version publish - // task execute - std::unique_lock worker_thread_lock(_worker_thread_lock); - _tasks.push_back(agent_task_req); - _worker_thread_condition_variable.notify_one(); - } - LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done" - << "transaction_id: " << publish_version_req.transaction_id; - break; - } else { - LOG_WARNING("failed to publish version") - .tag("transaction_id", publish_version_req.transaction_id) - .tag("error_tablets_num", error_tablet_ids.size()) - .tag("retry_time", retry_time) - .error(status); - ++retry_time; - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - } - if (status.is() && !is_task_timeout) { - continue; - } - - TFinishTaskRequest finish_task_request; - if (!status) { - DorisMetrics::instance()->publish_task_failed_total->increment(1); - // if publish failed, return failed, FE will ignore this error and - // check error tablet ids and FE will also republish this task - LOG_WARNING("failed to publish version") - .tag("signature", agent_task_req.signature) - .tag("transaction_id", publish_version_req.transaction_id) - .tag("error_tablets_num", error_tablet_ids.size()) - .error(status); - finish_task_request.__set_error_tablet_ids(error_tablet_ids); - } else { - if (!config::disable_auto_compaction) { - for (int i = 0; i < succ_tablet_ids.size(); i++) { - TabletSharedPtr tablet = - StorageEngine::instance()->tablet_manager()->get_tablet( - succ_tablet_ids[i]); - if (tablet != nullptr) { - tablet->publised_count++; - if (tablet->publised_count % 10 == 0) { - StorageEngine::instance()->submit_compaction_task( - tablet, CompactionType::CUMULATIVE_COMPACTION, true); - LOG(INFO) << "trigger compaction succ, tabletid:" << succ_tablet_ids[i] - << ", publised:" << tablet->publised_count; - } - } else { - LOG(WARNING) - << "trigger compaction failed, tabletid:" << succ_tablet_ids[i]; - } - } - } - LOG_INFO("successfully publish version") - .tag("signature", agent_task_req.signature) - .tag("transaction_id", publish_version_req.transaction_id) - .tag("tablets_num", succ_tablet_ids.size()); - } - - status.to_thrift(&finish_task_request.task_status); - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - finish_task_request.__set_report_version(_s_report_version); - finish_task_request.__set_error_tablet_ids(error_tablet_ids); - - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } -} - -void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - TClearTransactionTaskRequest clear_transaction_task_req; - { - std::unique_lock worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } - - agent_task_req = _tasks.front(); - clear_transaction_task_req = agent_task_req.clear_transaction_task_req; - _tasks.pop_front(); - } - LOG(INFO) << "get clear transaction task. signature=" << agent_task_req.signature - << ", transaction_id=" << clear_transaction_task_req.transaction_id - << ", partition_id_size=" << clear_transaction_task_req.partition_id.size(); - - Status status; - - if (clear_transaction_task_req.transaction_id > 0) { - // transaction_id should be greater than zero. - // If it is not greater than zero, no need to execute - // the following clear_transaction_task() function. - if (!clear_transaction_task_req.partition_id.empty()) { - _env->storage_engine()->clear_transaction_task( - clear_transaction_task_req.transaction_id, - clear_transaction_task_req.partition_id); - } else { - _env->storage_engine()->clear_transaction_task( - clear_transaction_task_req.transaction_id); - } - LOG(INFO) << "finish to clear transaction task. signature=" << agent_task_req.signature - << ", transaction_id=" << clear_transaction_task_req.transaction_id; - } else { - LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id - << ". signature= " << agent_task_req.signature; - } - - TFinishTaskRequest finish_task_request; - finish_task_request.__set_task_status(status.to_thrift()); - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } -} - void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; @@ -1788,7 +1505,6 @@ CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env, ThreadModel thread_model) void CreateTableTaskPool::_create_tablet_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TCreateTabletReq create_tablet_req; { std::unique_lock worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1797,9 +1513,9 @@ void CreateTableTaskPool::_create_tablet_worker_thread_callback() { return; } agent_task_req = _tasks.front(); - create_tablet_req = agent_task_req.create_tablet_req; _tasks.pop_front(); } + const TCreateTabletReq& create_tablet_req = agent_task_req.create_tablet_req; scoped_refptr trace(new Trace); MonotonicStopWatch watch; watch.start(); @@ -1862,7 +1578,6 @@ DropTableTaskPool::DropTableTaskPool(ExecEnv* env, ThreadModel thread_model) void DropTableTaskPool::_drop_tablet_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TDropTabletReq drop_tablet_req; { std::unique_lock worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1872,10 +1587,9 @@ void DropTableTaskPool::_drop_tablet_worker_thread_callback() { } agent_task_req = _tasks.front(); - drop_tablet_req = agent_task_req.drop_tablet_req; _tasks.pop_front(); } - + TDropTabletReq drop_tablet_req = agent_task_req.drop_tablet_req; Status status; TabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->get_tablet( drop_tablet_req.tablet_id, false); @@ -1912,4 +1626,275 @@ void DropTableTaskPool::_drop_tablet_worker_thread_callback() { } } +PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type) + : TaskWorkerPool( + type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH : TaskWorkerType::DELETE, + env, *env->master_info(), thread_model), + _push_worker_type(type) { + if (_push_worker_type == PushWokerType::LOAD_V2) { + _worker_count = + config::push_worker_count_normal_priority + config::push_worker_count_high_priority; + + } else { + _worker_count = config::delete_worker_count; + } + _cb = [this]() { _push_worker_thread_callback(); }; +} + +void PushTaskPool::_push_worker_thread_callback() { + // gen high priority worker thread + TPriority::type priority = TPriority::NORMAL; + int32_t push_worker_count_high_priority = config::push_worker_count_high_priority; + if (_push_worker_type == PushWokerType::LOAD_V2) { + static uint32_t s_worker_count = 0; + std::lock_guard worker_thread_lock(_worker_thread_lock); + if (s_worker_count < push_worker_count_high_priority) { + ++s_worker_count; + priority = TPriority::HIGH; + } + } + + while (_is_work) { + TAgentTaskRequest agent_task_req; + { + std::unique_lock worker_thread_lock(_worker_thread_lock); + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); + if (!_is_work) { + return; + } + + if (priority == TPriority::HIGH) { + const auto it = std::find_if( + _tasks.cbegin(), _tasks.cend(), [](const TAgentTaskRequest& req) { + return req.__isset.priority && req.priority == TPriority::HIGH; + }); + + if (it == _tasks.cend()) { + // there is no high priority task. notify other thread to handle normal task + _worker_thread_condition_variable.notify_all(); + sleep(1); + continue; + } + agent_task_req = *it; + _tasks.erase(it); + } else { + agent_task_req = _tasks.front(); + _tasks.pop_front(); + } + } + TPushReq& push_req = agent_task_req.push_req; + + LOG(INFO) << "get push task. signature=" << agent_task_req.signature + << ", priority=" << priority << " push_type=" << push_req.push_type; + std::vector tablet_infos; + + EngineBatchLoadTask engine_task(push_req, &tablet_infos); + auto status = _env->storage_engine()->execute_task(&engine_task); + + // Return result to fe + TFinishTaskRequest finish_task_request; + finish_task_request.__set_backend(BackendOptions::get_local_backend()); + finish_task_request.__set_task_type(agent_task_req.task_type); + finish_task_request.__set_signature(agent_task_req.signature); + if (push_req.push_type == TPushType::DELETE) { + finish_task_request.__set_request_version(push_req.version); + } + + if (status.ok()) { + LOG_INFO("successfully execute push task") + .tag("signature", agent_task_req.signature) + .tag("tablet_id", push_req.tablet_id) + .tag("push_type", push_req.push_type); + ++_s_report_version; + finish_task_request.__set_finish_tablet_infos(tablet_infos); + } else { + LOG_WARNING("failed to execute push task") + .tag("signature", agent_task_req.signature) + .tag("tablet_id", push_req.tablet_id) + .tag("push_type", push_req.push_type) + .error(status); + } + finish_task_request.__set_task_status(status.to_thrift()); + finish_task_request.__set_report_version(_s_report_version); + + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + } +} + +PublishVersionTaskPool::PublishVersionTaskPool(ExecEnv* env, ThreadModel thread_model) + : TaskWorkerPool(TaskWorkerType::PUBLISH_VERSION, env, *env->master_info(), thread_model) { + _worker_count = config::publish_version_worker_count; + _cb = [this]() { _publish_version_worker_thread_callback(); }; +} + +void PublishVersionTaskPool::_publish_version_worker_thread_callback() { + while (_is_work) { + TAgentTaskRequest agent_task_req; + { + std::unique_lock worker_thread_lock(_worker_thread_lock); + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); + if (!_is_work) { + return; + } + + agent_task_req = _tasks.front(); + _tasks.pop_front(); + } + const TPublishVersionRequest& publish_version_req = agent_task_req.publish_version_req; + DorisMetrics::instance()->publish_task_request_total->increment(1); + VLOG_NOTICE << "get publish version task. signature=" << agent_task_req.signature; + + std::vector error_tablet_ids; + std::vector succ_tablet_ids; + uint32_t retry_time = 0; + Status status; + bool is_task_timeout = false; + while (retry_time < PUBLISH_VERSION_MAX_RETRY) { + error_tablet_ids.clear(); + EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, + &succ_tablet_ids); + status = _env->storage_engine()->execute_task(&engine_task); + if (status.ok()) { + break; + } else if (status.is()) { + int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time; + if (time_elapsed > PUBLISH_TIMEOUT_SEC) { + LOG(INFO) << "task elapsed " << time_elapsed + << " seconds since it is inserted to queue, it is timeout"; + is_task_timeout = true; + } else { + // version not continuous, put to queue and wait pre version publish + // task execute + std::unique_lock worker_thread_lock(_worker_thread_lock); + _tasks.push_back(agent_task_req); + _worker_thread_condition_variable.notify_one(); + } + LOG(INFO) << "wait for previous publish version task to be done" + << "transaction_id: " << publish_version_req.transaction_id; + break; + } else { + LOG_WARNING("failed to publish version") + .tag("transaction_id", publish_version_req.transaction_id) + .tag("error_tablets_num", error_tablet_ids.size()) + .tag("retry_time", retry_time) + .error(status); + ++retry_time; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } + if (status.is() && !is_task_timeout) { + continue; + } + + TFinishTaskRequest finish_task_request; + if (!status) { + DorisMetrics::instance()->publish_task_failed_total->increment(1); + // if publish failed, return failed, FE will ignore this error and + // check error tablet ids and FE will also republish this task + LOG_WARNING("failed to publish version") + .tag("signature", agent_task_req.signature) + .tag("transaction_id", publish_version_req.transaction_id) + .tag("error_tablets_num", error_tablet_ids.size()) + .error(status); + finish_task_request.__set_error_tablet_ids(error_tablet_ids); + } else { + if (!config::disable_auto_compaction) { + for (int i = 0; i < succ_tablet_ids.size(); i++) { + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet( + succ_tablet_ids[i]); + if (tablet != nullptr) { + tablet->publised_count++; + if (tablet->publised_count % 10 == 0) { + StorageEngine::instance()->submit_compaction_task( + tablet, CompactionType::CUMULATIVE_COMPACTION, true); + LOG(INFO) << "trigger compaction succ, tabletid:" << succ_tablet_ids[i] + << ", publised:" << tablet->publised_count; + } + } else { + LOG(WARNING) + << "trigger compaction failed, tabletid:" << succ_tablet_ids[i]; + } + } + } + LOG_INFO("successfully publish version") + .tag("signature", agent_task_req.signature) + .tag("transaction_id", publish_version_req.transaction_id) + .tag("tablets_num", succ_tablet_ids.size()); + } + + status.to_thrift(&finish_task_request.task_status); + finish_task_request.__set_backend(BackendOptions::get_local_backend()); + finish_task_request.__set_task_type(agent_task_req.task_type); + finish_task_request.__set_signature(agent_task_req.signature); + finish_task_request.__set_report_version(_s_report_version); + finish_task_request.__set_error_tablet_ids(error_tablet_ids); + + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + } +} + +ClearTransactionTaskPool::ClearTransactionTaskPool(ExecEnv* env, ThreadModel thread_model) + : TaskWorkerPool(TaskWorkerType::CLEAR_TRANSACTION_TASK, env, *env->master_info(), + thread_model) { + _worker_count = config::clear_transaction_task_worker_count; + _cb = [this]() { _clear_transaction_task_worker_thread_callback(); }; +} + +void ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback() { + while (_is_work) { + TAgentTaskRequest agent_task_req; + { + std::unique_lock worker_thread_lock(_worker_thread_lock); + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); + if (!_is_work) { + return; + } + + agent_task_req = _tasks.front(); + _tasks.pop_front(); + } + const TClearTransactionTaskRequest& clear_transaction_task_req = + agent_task_req.clear_transaction_task_req; + LOG(INFO) << "get clear transaction task. signature=" << agent_task_req.signature + << ", transaction_id=" << clear_transaction_task_req.transaction_id + << ", partition_id_size=" << clear_transaction_task_req.partition_id.size(); + + Status status; + + if (clear_transaction_task_req.transaction_id > 0) { + // transaction_id should be greater than zero. + // If it is not greater than zero, no need to execute + // the following clear_transaction_task() function. + if (!clear_transaction_task_req.partition_id.empty()) { + _env->storage_engine()->clear_transaction_task( + clear_transaction_task_req.transaction_id, + clear_transaction_task_req.partition_id); + } else { + _env->storage_engine()->clear_transaction_task( + clear_transaction_task_req.transaction_id); + } + LOG(INFO) << "finish to clear transaction task. signature=" << agent_task_req.signature + << ", transaction_id=" << clear_transaction_task_req.transaction_id; + } else { + LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id + << ". signature= " << agent_task_req.signature; + } + + TFinishTaskRequest finish_task_request; + finish_task_request.__set_task_status(status.to_thrift()); + finish_task_request.__set_backend(BackendOptions::get_local_backend()); + finish_task_request.__set_task_type(agent_task_req.task_type); + finish_task_request.__set_signature(agent_task_req.signature); + + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + } +} + } // namespace doris diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 3e4116f42080a45..7287da3f03c8141 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -182,12 +182,7 @@ class TaskWorkerPool { bool _register_task_info(const TTaskType::type task_type, int64_t signature); void _remove_task_info(const TTaskType::type task_type, int64_t signature); void _finish_task(const TFinishTaskRequest& finish_task_request); - uint32_t _get_next_task_index(int32_t thread_count, std::deque& tasks, - TPriority::type priority); - void _push_worker_thread_callback(); - void _publish_version_worker_thread_callback(); - void _clear_transaction_task_worker_thread_callback(); void _alter_tablet_worker_thread_callback(); void _alter_inverted_index_worker_thread_callback(); void _clone_worker_thread_callback(); @@ -279,4 +274,32 @@ class DropTableTaskPool : public TaskWorkerPool { DISALLOW_COPY_AND_ASSIGN(DropTableTaskPool); }; +class PushTaskPool : public TaskWorkerPool { +public: + enum class PushWokerType { LOAD_V2, DELETE }; + PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type); + void _push_worker_thread_callback(); + + DISALLOW_COPY_AND_ASSIGN(PushTaskPool); + +private: + PushWokerType _push_worker_type; +}; + +class PublishVersionTaskPool : public TaskWorkerPool { +public: + PublishVersionTaskPool(ExecEnv* env, ThreadModel thread_model); + void _publish_version_worker_thread_callback(); + + DISALLOW_COPY_AND_ASSIGN(PublishVersionTaskPool); +}; + +class ClearTransactionTaskPool : public TaskWorkerPool { +public: + ClearTransactionTaskPool(ExecEnv* env, ThreadModel thread_model); + void _clear_transaction_task_worker_thread_callback(); + + DISALLOW_COPY_AND_ASSIGN(ClearTransactionTaskPool); +}; + } // namespace doris diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 50952f559abe91f..d8fd5c1bbe82b28 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -44,9 +44,9 @@ using namespace ErrorCode; using std::map; -EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, - std::vector* error_tablet_ids, - std::vector* succ_tablet_ids) +EnginePublishVersionTask::EnginePublishVersionTask( + const TPublishVersionRequest& publish_version_req, std::vector* error_tablet_ids, + std::vector* succ_tablet_ids) : _total_task_num(0), _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index eb90c9d7d896035..7c163839bda734f 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -59,7 +59,7 @@ class TabletPublishTxnTask { class EnginePublishVersionTask : public EngineTask { public: - EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, + EnginePublishVersionTask(const TPublishVersionRequest& publish_version_req, vector* error_tablet_ids, std::vector* succ_tablet_ids = nullptr); ~EnginePublishVersionTask() {} From f21bf11cf59d67f410343f1a82cccd31b72cce0a Mon Sep 17 00:00:00 2001 From: luozenglin Date: Sat, 27 May 2023 23:51:32 +0800 Subject: [PATCH 6/7] [fix](ldap) fix ldap related errors (#19959) 1. fix ldap user show grants return null pointer exception; 2. fix ldap user show databases return no authority db; 3. ldap authentication supports catalog level; --- .../apache/doris/ldap/LdapAuthenticate.java | 10 +- .../org/apache/doris/ldap/LdapManager.java | 2 +- .../apache/doris/ldap/LdapPrivsChecker.java | 156 ++---------------- .../org/apache/doris/ldap/LdapUserInfo.java | 2 +- .../apache/doris/mysql/privilege/Auth.java | 129 +++++---------- .../doris/mysql/privilege/UserManager.java | 11 ++ .../doris/ldap/LdapAuthenticateTest.java | 4 +- .../doris/ldap/LdapPrivsCheckerTest.java | 67 +++----- 8 files changed, 102 insertions(+), 279 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java index 2874a2fd903ece6..e7644e07169c2ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java @@ -28,6 +28,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; + /** * This class is used for LDAP authentication login and LDAP group authorization. * This means that users can log in to Doris with a user name and LDAP password, @@ -64,11 +66,15 @@ public static boolean authenticate(ConnectContext context, String password, Stri String remoteIp = context.getMysqlChannel().getRemoteIp(); UserIdentity tempUserIdentity = UserIdentity.createAnalyzedUserIdentWithIp(qualifiedUser, remoteIp); // Search the user in doris. - UserIdentity userIdentity = Env.getCurrentEnv().getAuth().getCurrentUserIdentity(tempUserIdentity); - if (userIdentity == null) { + List userIdentities = Env.getCurrentEnv().getAuth() + .getUserIdentityForLdap(qualifiedUser, remoteIp); + UserIdentity userIdentity; + if (userIdentities.isEmpty()) { userIdentity = tempUserIdentity; LOG.debug("User:{} does not exists in doris, login as temporary users.", userName); context.setIsTempUser(true); + } else { + userIdentity = userIdentities.get(0); } context.setCurrentUserIdentity(userIdentity); diff --git a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapManager.java b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapManager.java index 76053a685493c1b..65e88da9d894987 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapManager.java @@ -115,7 +115,7 @@ public boolean checkUserPasswd(String fullName, String passwd) { public boolean checkUserPasswd(String fullName, String passwd, String remoteIp, List currentUser) { if (checkUserPasswd(fullName, passwd)) { - currentUser.add(UserIdentity.createAnalyzedUserIdentWithIp(fullName, remoteIp)); + currentUser.addAll(Env.getCurrentEnv().getAuth().getUserIdentityForLdap(fullName, remoteIp)); return true; } return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapPrivsChecker.java b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapPrivsChecker.java index f8da4dd7a114871..a27b26185f17767 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapPrivsChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapPrivsChecker.java @@ -26,12 +26,12 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.LdapConfig; import org.apache.doris.mysql.privilege.Auth; +import org.apache.doris.mysql.privilege.Auth.PrivLevel; import org.apache.doris.mysql.privilege.PrivBitSet; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.mysql.privilege.Privilege; import org.apache.doris.mysql.privilege.Role; -import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,159 +45,39 @@ public class LdapPrivsChecker { private static final Logger LOG = LogManager.getLogger(LdapPrivsChecker.class); public static boolean hasGlobalPrivFromLdap(UserIdentity currentUser, PrivPredicate wanted) { - return hasTblPatternPrivs(currentUser, wanted, null, null, Auth.PrivLevel.GLOBAL) - || hasResourcePatternPrivs(currentUser, wanted, null, Auth.PrivLevel.GLOBAL); + return hasLdapPrivs(currentUser) && getUserLdapPrivs(currentUser.getQualifiedUser()).checkGlobalPriv(wanted); } - public static boolean hasDbPrivFromLdap(UserIdentity currentUser, String db, PrivPredicate wanted) { - return hasTblPatternPrivs(currentUser, wanted, db, null, Auth.PrivLevel.DATABASE); + public static boolean hasCatalogPrivFromLdap(UserIdentity currentUser, String ctl, PrivPredicate wanted) { + return hasLdapPrivs(currentUser) && getUserLdapPrivs(currentUser.getQualifiedUser()).checkCtlPriv(ctl, wanted); } - // Any database has wanted priv return true. - public static boolean hasDbPrivFromLdap(UserIdentity currentUser, PrivPredicate wanted) { - return hasPrivs(currentUser, wanted, Auth.PrivLevel.DATABASE); + public static boolean hasDbPrivFromLdap(UserIdentity currentUser, String ctl, String db, PrivPredicate wanted) { + return hasLdapPrivs(currentUser) && getUserLdapPrivs(currentUser.getQualifiedUser()).checkDbPriv(ctl, db, + wanted); } - public static boolean hasTblPrivFromLdap(UserIdentity currentUser, String db, String tbl, PrivPredicate wanted) { - return hasTblPatternPrivs(currentUser, wanted, db, tbl, Auth.PrivLevel.TABLE); + public static boolean hasTblPrivFromLdap(UserIdentity currentUser, String ctl, String db, String tbl, + PrivPredicate wanted) { + return hasLdapPrivs(currentUser) && getUserLdapPrivs(currentUser.getQualifiedUser()).checkTblPriv(ctl, db, tbl, + wanted); } - // Any table has wanted priv return true. - public static boolean hasTblPrivFromLdap(UserIdentity currentUser, PrivPredicate wanted) { - return hasPrivs(currentUser, wanted, Auth.PrivLevel.TABLE); + public static boolean checkHasPriv(UserIdentity currentUser, PrivPredicate priv, PrivLevel[] levels) { + return hasLdapPrivs(currentUser) && getUserLdapPrivs(currentUser.getQualifiedUser()).checkHasPriv(priv, levels); } public static boolean hasResourcePrivFromLdap(UserIdentity currentUser, String resourceName, PrivPredicate wanted) { - return hasResourcePatternPrivs(currentUser, wanted, resourceName, Auth.PrivLevel.RESOURCE); - } - - private static boolean hasTblPatternPrivs(UserIdentity currentUser, PrivPredicate wanted, String db, String tbl, - Auth.PrivLevel level) { - PrivBitSet savedPrivs = PrivBitSet.of(); - getCurrentUserTblPrivs(currentUser, db, tbl, savedPrivs, level); - return Privilege.satisfy(savedPrivs, wanted); - } - - private static boolean hasResourcePatternPrivs(UserIdentity currentUser, PrivPredicate wanted, String resourceName, - Auth.PrivLevel level) { - PrivBitSet savedPrivs = PrivBitSet.of(); - getCurrentUserResourcePrivs(currentUser, resourceName, savedPrivs, level); - return Privilege.satisfy(savedPrivs, wanted); - } - - public static PrivBitSet getGlobalPrivFromLdap(UserIdentity currentUser) { - PrivBitSet savedPrivs = PrivBitSet.of(); - getCurrentUserTblPrivs(currentUser, null, null, savedPrivs, Auth.PrivLevel.GLOBAL); - getCurrentUserResourcePrivs(currentUser, null, savedPrivs, Auth.PrivLevel.GLOBAL); - return savedPrivs; - } - - public static PrivBitSet getDbPrivFromLdap(UserIdentity currentUser, String db) { - PrivBitSet savedPrivs = PrivBitSet.of(); - getCurrentUserTblPrivs(currentUser, db, null, savedPrivs, Auth.PrivLevel.DATABASE); - return savedPrivs; - } - - public static PrivBitSet getTblPrivFromLdap(UserIdentity currentUser, String db, String tbl) { - PrivBitSet savedPrivs = PrivBitSet.of(); - getCurrentUserTblPrivs(currentUser, db, tbl, savedPrivs, Auth.PrivLevel.TABLE); - return savedPrivs; + return hasLdapPrivs(currentUser) && getUserLdapPrivs(currentUser.getQualifiedUser()).checkResourcePriv( + resourceName, wanted); } public static PrivBitSet getResourcePrivFromLdap(UserIdentity currentUser, String resourceName) { PrivBitSet savedPrivs = PrivBitSet.of(); - getCurrentUserResourcePrivs(currentUser, resourceName, savedPrivs, Auth.PrivLevel.RESOURCE); - return savedPrivs; - } - - private static void getCurrentUserTblPrivs(UserIdentity currentUser, String db, String tbl, PrivBitSet savedPrivs, - Auth.PrivLevel level) { - if (!hasLdapPrivs(currentUser)) { - return; - } - Role currentUserLdapPrivs = getUserLdapPrivs(currentUser.getQualifiedUser()); - for (Map.Entry entry : currentUserLdapPrivs.getTblPatternToPrivs().entrySet()) { - switch (entry.getKey().getPrivLevel()) { - case GLOBAL: - if (level.equals(Auth.PrivLevel.GLOBAL)) { - savedPrivs.or(entry.getValue()); - return; - } - break; - case DATABASE: - if (level.equals(Auth.PrivLevel.DATABASE) && db != null - && entry.getKey().getQualifiedDb().equals(db)) { - savedPrivs.or(entry.getValue()); - return; - } - break; - case TABLE: - if (level.equals(Auth.PrivLevel.TABLE) && db != null && tbl != null - && entry.getKey().getQualifiedDb().equals(db) && entry.getKey().getTbl().equals(tbl)) { - savedPrivs.or(entry.getValue()); - return; - } - break; - default: - Preconditions.checkNotNull(null, entry.getKey().getPrivLevel()); - } - } - } - - private static void getCurrentUserResourcePrivs(UserIdentity currentUser, - String resourceName, PrivBitSet savedPrivs, Auth.PrivLevel level) { - if (!hasLdapPrivs(currentUser)) { - return; - } - Role currentUserLdapPrivs = getUserLdapPrivs(currentUser.getQualifiedUser()); - for (Map.Entry entry - : currentUserLdapPrivs.getResourcePatternToPrivs().entrySet()) { - switch (entry.getKey().getPrivLevel()) { - case GLOBAL: - if (level.equals(Auth.PrivLevel.GLOBAL)) { - savedPrivs.or(entry.getValue()); - return; - } - break; - case RESOURCE: - if (level.equals(Auth.PrivLevel.RESOURCE) && resourceName != null - && entry.getKey().getResourceName().equals(resourceName)) { - savedPrivs.or(entry.getValue()); - return; - } - break; - default: - Preconditions.checkNotNull(null, entry.getKey().getPrivLevel()); - } + if (hasLdapPrivs(currentUser)) { + getUserLdapPrivs(currentUser.getQualifiedUser()).getResourcePrivTable().getPrivs(resourceName, savedPrivs); } - } - - private static boolean hasPrivs(UserIdentity currentUser, PrivPredicate wanted, Auth.PrivLevel level) { - if (!hasLdapPrivs(currentUser)) { - return false; - } - Role currentUserLdapPrivs = getUserLdapPrivs(currentUser.getQualifiedUser()); - for (Map.Entry entry : currentUserLdapPrivs.getTblPatternToPrivs().entrySet()) { - if (entry.getKey().getPrivLevel().equals(level) && Privilege.satisfy(entry.getValue(), wanted)) { - return true; - } - } - return false; - } - - // Check if user has any privs of tables in this database. - public static boolean hasPrivsOfDb(UserIdentity currentUser, String db) { - if (!hasLdapPrivs(currentUser)) { - return false; - } - Role currentUserLdapPrivs = getUserLdapPrivs(currentUser.getQualifiedUser()); - for (Map.Entry entry : currentUserLdapPrivs.getTblPatternToPrivs().entrySet()) { - if (entry.getKey().getPrivLevel().equals(Auth.PrivLevel.TABLE) - && entry.getKey().getQualifiedDb().equals(db)) { - return true; - } - } - return false; + return savedPrivs; } public static boolean hasLdapPrivs(UserIdentity userIdent) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapUserInfo.java b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapUserInfo.java index 50c3d9ef532746b..1a8a665d20184bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapUserInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapUserInfo.java @@ -57,7 +57,7 @@ public String getUserName() { } // The password needs to be checked by LdapManager for updated cache, so it is visible in the package. - boolean isSetPasswd() { + public boolean isSetPasswd() { return isSetPasswd; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java index c5ea83e483d4658..9c3afdfd71f46a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java @@ -52,6 +52,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.ldap.LdapManager; import org.apache.doris.ldap.LdapPrivsChecker; +import org.apache.doris.ldap.LdapUserInfo; import org.apache.doris.load.DppConfig; import org.apache.doris.persist.AlterUserOperationLog; import org.apache.doris.persist.LdapInfo; @@ -65,7 +66,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -227,6 +227,10 @@ public void checkPlainPassword(String remoteUser, String remoteHost, String remo } } + public List getUserIdentityForLdap(String remoteUser, String remoteHost) { + return userManager.getUserIdentityUncheckPasswd(remoteUser, remoteHost); + } + // ==== Global ==== public boolean checkGlobalPriv(UserIdentity currentUser, PrivPredicate wanted) { if (isLdapAuthEnabled() && LdapPrivsChecker.hasGlobalPrivFromLdap(currentUser, wanted)) { @@ -248,6 +252,9 @@ public boolean checkGlobalPriv(UserIdentity currentUser, PrivPredicate wanted) { // ==== Catalog ==== public boolean checkCtlPriv(UserIdentity currentUser, String ctl, PrivPredicate wanted) { + if (isLdapAuthEnabled() && (LdapPrivsChecker.hasCatalogPrivFromLdap(currentUser, ctl, wanted))) { + return true; + } if (wanted.getPrivs().containsNodePriv()) { LOG.debug("should not check NODE priv in catalog level. user: {}, catalog: {}", currentUser, ctl); @@ -270,8 +277,7 @@ public boolean checkCtlPriv(UserIdentity currentUser, String ctl, PrivPredicate // ==== Database ==== public boolean checkDbPriv(UserIdentity currentUser, String ctl, String db, PrivPredicate wanted) { - if (isLdapAuthEnabled() && (LdapPrivsChecker.hasDbPrivFromLdap(currentUser, wanted) || LdapPrivsChecker - .hasPrivsOfDb(currentUser, db))) { + if (isLdapAuthEnabled() && LdapPrivsChecker.hasDbPrivFromLdap(currentUser, ctl, db, wanted)) { return true; } if (wanted.getPrivs().containsNodePriv()) { @@ -296,7 +302,7 @@ public boolean checkDbPriv(UserIdentity currentUser, String ctl, String db, Priv // ==== Table ==== public boolean checkTblPriv(UserIdentity currentUser, String ctl, String db, String tbl, PrivPredicate wanted) { - if (isLdapAuthEnabled() && LdapPrivsChecker.hasTblPrivFromLdap(currentUser, db, tbl, wanted)) { + if (isLdapAuthEnabled() && LdapPrivsChecker.hasTblPrivFromLdap(currentUser, ctl, db, tbl, wanted)) { return true; } if (wanted.getPrivs().containsNodePriv()) { @@ -355,7 +361,7 @@ public boolean checkResourcePriv(UserIdentity currentUser, String resourceName, * This method will check the given privilege levels */ public boolean checkHasPriv(ConnectContext ctx, PrivPredicate priv, PrivLevel... levels) { - if (isLdapAuthEnabled() && checkHasPrivLdap(ctx.getCurrentUserIdentity(), priv, levels)) { + if (isLdapAuthEnabled() && LdapPrivsChecker.checkHasPriv(ctx.getCurrentUserIdentity(), priv, levels)) { return true; } readLock(); @@ -372,31 +378,6 @@ public boolean checkHasPriv(ConnectContext ctx, PrivPredicate priv, PrivLevel... } } - private boolean checkHasPrivLdap(UserIdentity currentUser, PrivPredicate priv, PrivLevel... levels) { - for (PrivLevel privLevel : levels) { - switch (privLevel) { - case GLOBAL: - if (LdapPrivsChecker.hasGlobalPrivFromLdap(currentUser, priv)) { - return true; - } - break; - case DATABASE: - if (LdapPrivsChecker.hasDbPrivFromLdap(currentUser, priv)) { - return true; - } - break; - case TABLE: - if (LdapPrivsChecker.hasTblPrivFromLdap(currentUser, priv)) { - return true; - } - break; - default: - break; - } - } - return false; - } - // Check if LDAP authentication is enabled. private boolean isLdapAuthEnabled() { return LdapConfig.ldap_authentication_enabled; @@ -1014,17 +995,24 @@ public List> getAuthInfo(UserIdentity specifiedUserIdent) { private void getUserAuthInfo(List> userAuthInfos, UserIdentity userIdent) { // AuthProcDir.TITLE_NAMES List userAuthInfo = Lists.newArrayList(); - User user = userManager.getUserByUserIdentity(userIdent); // ================= UserIdentity ======================= userAuthInfo.add(userIdent.toString()); - // ============== Password ============== - userAuthInfo.add(user.hasPassword() ? "Yes" : "No"); - // ============== Roles ============== - userAuthInfo.add(Joiner.on(",").join(userRoleManager - .getRolesByUser(userIdent, ConnectContext.get().getSessionVariable().showUserDefaultRole))); + if (isLdapAuthEnabled() && ldapManager.doesUserExist(userIdent.getQualifiedUser())) { + LdapUserInfo ldapUserInfo = ldapManager.getUserInfo(userIdent.getQualifiedUser()); + // ============== Password ============== + userAuthInfo.add(ldapUserInfo.isSetPasswd() ? "Yes" : "No"); + // ============== Roles ============== + userAuthInfo.add(ldapUserInfo.getPaloRole().getRoleName()); + } else { + User user = userManager.getUserByUserIdentity(userIdent); + // ============== Password ============== + userAuthInfo.add(user.hasPassword() ? "Yes" : "No"); + // ============== Roles ============== + userAuthInfo.add(Joiner.on(",").join(userRoleManager + .getRolesByUser(userIdent, ConnectContext.get().getSessionVariable().showUserDefaultRole))); + } // ==============GlobalPrivs============== - PrivBitSet ldapGlobalPrivs = LdapPrivsChecker.getGlobalPrivFromLdap(userIdent); - PrivBitSet globalPrivs = ldapGlobalPrivs.copy(); + PrivBitSet globalPrivs = new PrivBitSet(); List globalEntries = getUserGlobalPrivTable(userIdent).entries; if (!CollectionUtils.isEmpty(globalEntries)) { globalPrivs.or(globalEntries.get(0).privSet); @@ -1033,7 +1021,7 @@ private void getUserAuthInfo(List> userAuthInfos, UserIdentity user // ============== CatalogPrivs ======================== String ctlPrivs = getUserCtlPrivTable(userIdent).entries.stream() .map(entry -> String.format("%s: %s", - ((CatalogPrivEntry) entry).getOrigCtl(), entry.privSet, user.isSetByDomainResolver())) + ((CatalogPrivEntry) entry).getOrigCtl(), entry.privSet)) .collect(Collectors.joining("; ")); if (Strings.isNullOrEmpty(ctlPrivs)) { ctlPrivs = FeConstants.null_string; @@ -1041,29 +1029,12 @@ private void getUserAuthInfo(List> userAuthInfos, UserIdentity user userAuthInfo.add(ctlPrivs); // ============== DatabasePrivs ============== List dbPrivs = Lists.newArrayList(); - Set addedDbs = Sets.newHashSet(); for (PrivEntry entry : getUserDbPrivTable(userIdent).entries) { DbPrivEntry dEntry = (DbPrivEntry) entry; - /** - * Doris and Ldap may have different privs on one database. - * Merge these privs and add. - */ PrivBitSet savedPrivs = dEntry.getPrivSet().copy(); - savedPrivs.or(LdapPrivsChecker.getDbPrivFromLdap(userIdent, dEntry.getOrigDb())); - addedDbs.add(dEntry.getOrigDb()); dbPrivs.add(String.format("%s.%s: %s", dEntry.getOrigCtl(), dEntry.getOrigDb(), savedPrivs)); } - // Add privs from ldap groups that have not been added in Doris. - if (LdapPrivsChecker.hasLdapPrivs(userIdent)) { - Map ldapDbPrivs = LdapPrivsChecker.getLdapAllDbPrivs(userIdent); - for (Entry entry : ldapDbPrivs.entrySet()) { - if (!addedDbs.contains(entry.getKey().getQualifiedDb())) { - dbPrivs.add(String.format("%s.%s: %s", entry.getKey().getQualifiedCtl(), - entry.getKey().getQualifiedDb(), entry.getValue())); - } - } - } if (dbPrivs.isEmpty()) { userAuthInfo.add(FeConstants.null_string); @@ -1073,28 +1044,12 @@ private void getUserAuthInfo(List> userAuthInfos, UserIdentity user // tbl List tblPrivs = Lists.newArrayList(); - Set addedtbls = Sets.newHashSet(); for (PrivEntry entry : getUserTblPrivTable(userIdent).entries) { TablePrivEntry tEntry = (TablePrivEntry) entry; - /** - * Doris and Ldap may have different privs on one table. - * Merge these privs and add. - */ PrivBitSet savedPrivs = tEntry.getPrivSet().copy(); - savedPrivs.or(LdapPrivsChecker.getTblPrivFromLdap(userIdent, tEntry.getOrigDb(), tEntry.getOrigTbl())); - addedtbls.add(tEntry.getOrigDb().concat(".").concat(tEntry.getOrigTbl())); tblPrivs.add(String.format("%s.%s.%s: %s", tEntry.getOrigCtl(), tEntry.getOrigDb(), tEntry.getOrigTbl(), savedPrivs)); } - // Add privs from ldap groups that have not been added in Doris. - if (LdapPrivsChecker.hasLdapPrivs(userIdent)) { - Map ldapTblPrivs = LdapPrivsChecker.getLdapAllTblPrivs(userIdent); - for (Entry entry : ldapTblPrivs.entrySet()) { - if (!addedtbls.contains(entry.getKey().getQualifiedDb().concat(".").concat(entry.getKey().getTbl()))) { - tblPrivs.add(String.format("%s: %s", entry.getKey(), entry.getValue())); - } - } - } if (tblPrivs.isEmpty()) { userAuthInfo.add(FeConstants.null_string); @@ -1104,27 +1059,12 @@ private void getUserAuthInfo(List> userAuthInfos, UserIdentity user // resource List resourcePrivs = Lists.newArrayList(); - Set addedResources = Sets.newHashSet(); for (PrivEntry entry : getUserResourcePrivTable(userIdent).entries) { ResourcePrivEntry rEntry = (ResourcePrivEntry) entry; - /** - * Doris and Ldap may have different privs on one resource. - * Merge these privs and add. - */ PrivBitSet savedPrivs = rEntry.getPrivSet().copy(); savedPrivs.or(LdapPrivsChecker.getResourcePrivFromLdap(userIdent, rEntry.getOrigResource())); - addedResources.add(rEntry.getOrigResource()); resourcePrivs.add(rEntry.getOrigResource() + ": " + savedPrivs.toString()); } - // Add privs from ldap groups that have not been added in Doris. - if (LdapPrivsChecker.hasLdapPrivs(userIdent)) { - Map ldapResourcePrivs = LdapPrivsChecker.getLdapAllResourcePrivs(userIdent); - for (Entry entry : ldapResourcePrivs.entrySet()) { - if (!addedResources.contains(entry.getKey().getResourceName())) { - tblPrivs.add(entry.getKey().getResourceName().concat(": ").concat(entry.getValue().toString())); - } - } - } if (resourcePrivs.isEmpty()) { userAuthInfo.add(FeConstants.null_string); @@ -1141,6 +1081,9 @@ private GlobalPrivTable getUserGlobalPrivTable(UserIdentity userIdentity) { for (String roleName : roles) { table.merge(roleManager.getRole(roleName).getGlobalPrivTable()); } + if (isLdapAuthEnabled() && ldapManager.doesUserExist(userIdentity.getQualifiedUser())) { + table.merge(ldapManager.getUserInfo(userIdentity.getQualifiedUser()).getPaloRole().getGlobalPrivTable()); + } return table; } @@ -1150,6 +1093,9 @@ private CatalogPrivTable getUserCtlPrivTable(UserIdentity userIdentity) { for (String roleName : roles) { table.merge(roleManager.getRole(roleName).getCatalogPrivTable()); } + if (isLdapAuthEnabled() && ldapManager.doesUserExist(userIdentity.getQualifiedUser())) { + table.merge(ldapManager.getUserInfo(userIdentity.getQualifiedUser()).getPaloRole().getCatalogPrivTable()); + } return table; } @@ -1159,6 +1105,9 @@ private DbPrivTable getUserDbPrivTable(UserIdentity userIdentity) { for (String roleName : roles) { table.merge(roleManager.getRole(roleName).getDbPrivTable()); } + if (isLdapAuthEnabled() && ldapManager.doesUserExist(userIdentity.getQualifiedUser())) { + table.merge(ldapManager.getUserInfo(userIdentity.getQualifiedUser()).getPaloRole().getDbPrivTable()); + } return table; } @@ -1168,6 +1117,9 @@ private TablePrivTable getUserTblPrivTable(UserIdentity userIdentity) { for (String roleName : roles) { table.merge(roleManager.getRole(roleName).getTablePrivTable()); } + if (isLdapAuthEnabled() && ldapManager.doesUserExist(userIdentity.getQualifiedUser())) { + table.merge(ldapManager.getUserInfo(userIdentity.getQualifiedUser()).getPaloRole().getTablePrivTable()); + } return table; } @@ -1177,6 +1129,9 @@ private ResourcePrivTable getUserResourcePrivTable(UserIdentity userIdentity) { for (String roleName : roles) { table.merge(roleManager.getRole(roleName).getResourcePrivTable()); } + if (isLdapAuthEnabled() && ldapManager.doesUserExist(userIdentity.getQualifiedUser())) { + table.merge(ldapManager.getUserInfo(userIdentity.getQualifiedUser()).getPaloRole().getResourcePrivTable()); + } return table; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java index 5dcd08080a4b4b9..68655775b313557 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java @@ -126,6 +126,17 @@ private void checkPasswordInternal(String remoteUser, String remoteHost, byte[] hasRemotePasswd(plain, remotePasswd)); } + public List getUserIdentityUncheckPasswd(String remoteUser, String remoteHost) { + List userIdentities = Lists.newArrayList(); + List users = nameToUsers.get(remoteUser); + for (User user : users) { + if (!user.getUserIdentity().isDomain() && (user.isAnyHost() || user.getHostPattern().match(remoteHost))) { + userIdentities.add(user.getUserIdentity()); + } + } + return userIdentities; + } + private String hasRemotePasswd(boolean plain, byte[] remotePasswd) { if (plain) { return "YES"; diff --git a/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java b/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java index f54045f4ab493b1..e6856950c74e109 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java @@ -157,7 +157,7 @@ public void testAuthenticate() { setGetCurrentUserIdentity(true); String qualifiedUser = ClusterNamespace.getFullName(DEFAULT_CLUSTER, USER_NAME); Assert.assertTrue(LdapAuthenticate.authenticate(context, "123", qualifiedUser)); - Assert.assertFalse(context.getIsTempUser()); + Assert.assertTrue(context.getIsTempUser()); } @Test @@ -190,7 +190,7 @@ public void testAuthenticateGetGroupsNull() { setGetCurrentUserIdentity(true); String qualifiedUser = ClusterNamespace.getFullName(DEFAULT_CLUSTER, USER_NAME); Assert.assertTrue(LdapAuthenticate.authenticate(context, "123", qualifiedUser)); - Assert.assertFalse(context.getIsTempUser()); + Assert.assertTrue(context.getIsTempUser()); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapPrivsCheckerTest.java b/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapPrivsCheckerTest.java index a39ef49eac64ec2..eeb1ad976f9d967 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapPrivsCheckerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapPrivsCheckerTest.java @@ -32,6 +32,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; +import com.google.common.collect.Lists; import mockit.Expectations; import mockit.Mocked; import org.junit.Assert; @@ -89,10 +90,10 @@ public void setUp() { minTimes = 0; result = ldapManager; - Role role = new Role(""); + Role role = new Role("ldapRole"); Map tblPatternToPrivs = role.getTblPatternToPrivs(); - TablePattern global = new TablePattern("*", "*", "*"); + TablePattern global = new TablePattern("ctl1", "*", "*"); tblPatternToPrivs.put(global, PrivBitSet.of(Privilege.SELECT_PRIV, Privilege.CREATE_PRIV)); TablePattern db = new TablePattern(INTERNAL, DB, "*"); tblPatternToPrivs.put(db, PrivBitSet.of(Privilege.SELECT_PRIV, Privilege.LOAD_PRIV)); @@ -102,12 +103,11 @@ public void setUp() { tblPatternToPrivs.put(tbl2, PrivBitSet.of(Privilege.SELECT_PRIV, Privilege.DROP_PRIV)); Map resourcePatternToPrivs = role.getResourcePatternToPrivs(); - ResourcePattern globalResource = new ResourcePattern("*"); - resourcePatternToPrivs.put(globalResource, PrivBitSet.of(Privilege.USAGE_PRIV)); ResourcePattern resource1 = new ResourcePattern(RESOURCE1); resourcePatternToPrivs.put(resource1, PrivBitSet.of(Privilege.USAGE_PRIV)); ResourcePattern resource2 = new ResourcePattern(RESOURCE1); resourcePatternToPrivs.put(resource2, PrivBitSet.of(Privilege.USAGE_PRIV)); + Role ldapRole = new Role(role.getRoleName()); try { global.analyze(CLUSTER); db.analyze(CLUSTER); @@ -115,7 +115,8 @@ public void setUp() { tbl2.analyze(CLUSTER); resource1.analyze(); resource2.analyze(); - } catch (AnalysisException e) { + ldapRole.merge(role); + } catch (Exception e) { e.printStackTrace(); } @@ -123,7 +124,7 @@ public void setUp() { ldapManager.getUserInfo(userIdentity.getQualifiedUser()); minTimes = 0; - result = new LdapUserInfo(userIdentity.getQualifiedUser(), false, "", role); + result = new LdapUserInfo(userIdentity.getQualifiedUser(), false, "", ldapRole); ldapManager.doesUserExist(userIdentity.getQualifiedUser()); minTimes = 0; @@ -136,6 +137,10 @@ public void setUp() { context.getSessionVariable(); minTimes = 0; result = sessionVariable; + + auth.getUserIdentityForLdap(USER, IP); + minTimes = 0; + result = Lists.newArrayList(userIdentity); } }; // call the mocked method before replay @@ -143,33 +148,24 @@ public void setUp() { ConnectContext.get().getSessionVariable().isEnableUnicodeNameSupport(); } - @Test - public void testHasGlobalPrivFromLdap() { - Assert.assertTrue(LdapPrivsChecker.hasGlobalPrivFromLdap(userIdent, PrivPredicate.CREATE)); - Assert.assertTrue(LdapPrivsChecker.hasGlobalPrivFromLdap(userIdent, PrivPredicate.USAGE)); - Assert.assertFalse(LdapPrivsChecker.hasGlobalPrivFromLdap(userIdent, PrivPredicate.DROP)); - } - @Test public void testHasDbPrivFromLdap() { - Assert.assertTrue(LdapPrivsChecker.hasDbPrivFromLdap(userIdent, CLUSTER + ":" + DB, PrivPredicate.LOAD)); - Assert.assertFalse(LdapPrivsChecker.hasDbPrivFromLdap(userIdent, CLUSTER + ":" + DB, PrivPredicate.DROP)); - Assert.assertTrue(LdapPrivsChecker.hasDbPrivFromLdap(userIdent, PrivPredicate.LOAD)); - Assert.assertFalse(LdapPrivsChecker.hasDbPrivFromLdap(userIdent, PrivPredicate.DROP)); + Assert.assertTrue( + LdapPrivsChecker.hasDbPrivFromLdap(userIdent, INTERNAL, CLUSTER + ":" + DB, PrivPredicate.LOAD)); + Assert.assertFalse( + LdapPrivsChecker.hasDbPrivFromLdap(userIdent, INTERNAL, CLUSTER + ":" + DB, PrivPredicate.DROP)); } @Test public void testHasTblPrivFromLdap() { - Assert.assertTrue(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, CLUSTER + ":" + TABLE_DB, TABLE1, + Assert.assertTrue(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, INTERNAL, CLUSTER + ":" + TABLE_DB, TABLE1, PrivPredicate.ALTER)); - Assert.assertFalse(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, CLUSTER + ":" + TABLE_DB, TABLE1, + Assert.assertFalse(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, INTERNAL, CLUSTER + ":" + TABLE_DB, TABLE1, PrivPredicate.DROP)); - Assert.assertTrue(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, CLUSTER + ":" + TABLE_DB, TABLE2, + Assert.assertTrue(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, INTERNAL, CLUSTER + ":" + TABLE_DB, TABLE2, PrivPredicate.DROP)); - Assert.assertFalse(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, CLUSTER + ":" + TABLE_DB, TABLE2, + Assert.assertFalse(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, INTERNAL, CLUSTER + ":" + TABLE_DB, TABLE2, PrivPredicate.CREATE)); - Assert.assertTrue(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, PrivPredicate.ALTER)); - Assert.assertFalse(LdapPrivsChecker.hasTblPrivFromLdap(userIdent, PrivPredicate.LOAD)); } @Test @@ -179,37 +175,12 @@ public void testHasResourcePrivFromLdap() { PrivPredicate.USAGE)); } - @Test - public void testGetGlobalPrivFromLdap() { - Assert.assertEquals( - PrivBitSet.of(Privilege.SELECT_PRIV, Privilege.CREATE_PRIV, Privilege.USAGE_PRIV) - .toString(), - LdapPrivsChecker.getGlobalPrivFromLdap(userIdent).toString()); - } - - @Test - public void testGetDbPrivFromLdap() { - Assert.assertEquals(PrivBitSet.of(Privilege.SELECT_PRIV, Privilege.LOAD_PRIV).toString(), - LdapPrivsChecker.getDbPrivFromLdap(userIdent, CLUSTER + ":" + DB).toString()); - } - - @Test - public void testGetTblPrivFromLdap() { - Assert.assertEquals(PrivBitSet.of(Privilege.SELECT_PRIV, Privilege.ALTER_PRIV).toString(), - LdapPrivsChecker.getTblPrivFromLdap(userIdent, CLUSTER + ":" + TABLE_DB, TABLE1).toString()); - } - @Test public void testGetResourcePrivFromLdap() { Assert.assertEquals(PrivBitSet.of(Privilege.USAGE_PRIV).toString(), LdapPrivsChecker.getResourcePrivFromLdap(userIdent, RESOURCE1).toString()); } - @Test - public void testHasPrivsOfDb() { - Assert.assertTrue(LdapPrivsChecker.hasPrivsOfDb(userIdent, CLUSTER + ":" + TABLE_DB)); - } - @Test public void testGetLdapAllDbPrivs() throws AnalysisException { Map allDb = LdapPrivsChecker.getLdapAllDbPrivs(userIdent); From d8cedf3acef7bd2ddf7ff5af93880fb9f2485c55 Mon Sep 17 00:00:00 2001 From: ElvinWei Date: Sun, 28 May 2023 01:46:39 +0800 Subject: [PATCH 7/7] rebase --- docs/en/docs/query-acceleration/statistics.md | 47 ++++++++++++++++++- .../docs/query-acceleration/statistics.md | 45 +++++++++++++++++- .../org/apache/doris/statistics/README.md | 35 ++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/query-acceleration/statistics.md b/docs/en/docs/query-acceleration/statistics.md index ad0160630696a05..068745e6fbe7d4e 100644 --- a/docs/en/docs/query-acceleration/statistics.md +++ b/docs/en/docs/query-acceleration/statistics.md @@ -410,7 +410,52 @@ mysql> ANALYZE TABLE stats_test.example_tbl UPDATE HISTOGRAM WITH PERIOD 86400; #### Automatic collection -To be added. +Statistics can be "invalidated" when tables are changed, which can cause the optimizer to select the wrong execution plan. + +Table statistics may become invalid due to the following causes: + +- New field: The new field has no statistics +- Field change: Original statistics are unavailable +- Added zone: The new zone has no statistics +- Zone change: The original statistics are invalid +- data changes (insert data delete data | | change data) : the statistical information is error + +The main operations involved include: + +- update: updates the data +- delete: deletes data +- drop: deletes a partition +- load: import data and add partitions +- insert: inserts data and adds partitions +- alter: Field change, partition change, or new partition + +Database, table, partition, field deletion, internal will automatically clear these invalid statistics. Adjusting the column order and changing the column type do not affect. + +The system determines whether to collect statistics again based on the health of the table (as defined above). By setting the health threshold, the system collects statistics about the table again when the health is lower than a certain value. To put it simply, if statistics are collected on a table and the data of a partition becomes more or less, or a partition is added or deleted, the statistics may be automatically collected. After the statistics are collected again, the statistics and health of the table are updated. + +Currently, only tables that are configured by the user to automatically collect statistics will be collected, and statistics will not be automatically collected for other tables. + +Example: + +- Automatically analysis statistics for the 'example_tbl' table using the following syntax: + +```SQL +-- use with auto +mysql> ANALYZE TABLE stats_test.example_tbl WITH AUTO; ++--------+ +| job_id | ++--------+ +| 52539 | ++--------+ + +-- configure automatic +mysql> ANALYZE TABLE stats_test.example_tbl PROPERTIES("automatic" = "true"); ++--------+ +| job_id | ++--------+ +| 52565 | ++--------+ +``` ### Manage job diff --git a/docs/zh-CN/docs/query-acceleration/statistics.md b/docs/zh-CN/docs/query-acceleration/statistics.md index 0a8744c114196c2..471773d52174222 100644 --- a/docs/zh-CN/docs/query-acceleration/statistics.md +++ b/docs/zh-CN/docs/query-acceleration/statistics.md @@ -444,7 +444,50 @@ mysql> ANALYZE TABLE stats_test.example_tbl UPDATE HISTOGRAM WITH PERIOD 86400; #### 自动收集 -待补充。 +表发生变更时可能会导致统计信息“失效”,可能会导致优化器选择错误的执行计划。 + +导致表统计信息失效的原因包括: + +- 新增字段:新增字段无统计信息 +- 字段变更:原有统计信息不可用 +- 新增分区:新增分区无统计信息 +- 分区变更:原有统计信息失效 +- 数据变更(插入数据 | 删除数据 | 更改数据):统计信息有误差 + +主要涉及的操作包括: + +- update:更新数据 +- delete:删除数据 +- drop:删除分区 +- load:导入数据、新增分区 +- insert:插入数据、新增分区 +- alter:字段变更、分区变更、新增分区 + +其中库、表、分区、字段删除,内部会自动清除这些无效的统计信息。调整列顺序以及修改列类型不影响。 + +系统根据表的健康度(参考上文定义)来决定是否需要重新收集统计信息。我们通过设置健康度阈值,当健康度低于某个值时系统将重新收集表对应的统计信息。简单来讲就是对于收集过统计信息的表,如果某一个分区数据变多/变少、或者新增/删除分区,都有可能触发统计信息的自动收集,重新收集后更新表的统计信息和健康度。目前只会收集用户配置了自动收集统计信息的表,其他表不会自动收集统计信息。 + +示例: + +- 自动收集 `example_tbl` 表的统计信息,使用以下语法: + +```SQL +-- 使用with auto +mysql> ANALYZE TABLE stats_test.example_tbl WITH AUTO; ++--------+ +| job_id | ++--------+ +| 52539 | ++--------+ + +-- 配置automatic +mysql> ANALYZE TABLE stats_test.example_tbl PROPERTIES("automatic" = "true"); ++--------+ +| job_id | ++--------+ +| 52565 | ++--------+ +``` ### 管理任务 diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/README.md b/fe/fe-core/src/main/java/org/apache/doris/statistics/README.md index e3a577528aeaa81..ef9340b2817a3f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/README.md +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/README.md @@ -116,6 +116,41 @@ end # Test +The regression tests now mainly cover the following. + +- Analyze stats: mainly to verify the `ANALYZE` statement and its related characteristics, because some functions are affected by other factors (such as system metadata reporting time), may show instability, so this part is placed in p1. +- Manage stats: mainly used to verify the injection, deletion, display and other related operations of statistical information. + +For more, see [statistics_p0](https://github.com/apache/doris/tree/master/regression-test/suites/statistics) [statistics_p1](https://github.com/apache/doris/tree/master/regression-test/suites/statistics_p1) + +## Analyze stats + +p0 tests: + +1. Universal analysis + +p1 tests: + +1. Universal analysis +2. Sampled analysis +3. Incremental analysis +4. Automatic analysis +5. Periodic analysis + +## Manage stats + +p0 tests: + +1. Alter table stats +2. Show table stats +3. Alter column stats +4. Show column stats +5. Show column histogram +6. Drop column stats +7. Drop expired stats + +For the modification of the statistics module, all the above cases should be guaranteed to pass! + # Feature note 20230508: