diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 46609023337769a..8198c38e45e277d 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -29,8 +29,8 @@ #include "agent/task_worker_pool.h" #include "agent/topic_subscriber.h" -#include "agent/user_resource_listener.h" #include "agent/utils.h" +#include "agent/workload_group_listener.h" #include "common/logging.h" #include "common/status.h" #include "gutil/strings/substitute.h" @@ -39,10 +39,6 @@ #include "olap/snapshot_manager.h" #include "runtime/exec_env.h" -namespace doris { -class TopicListener; -} // namespace doris - using std::string; using std::vector; @@ -134,9 +130,10 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) #if !defined(BE_TEST) && !defined(__APPLE__) // Add subscriber here and register listeners - TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info); - LOG(INFO) << "Register user resource listener"; - _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener); + std::unique_ptr wg_listener = std::make_unique(exec_env); + LOG(INFO) << "Register workload group listener"; + _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP, + std::move(wg_listener)); #endif } diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index daa1823b07e568e..a3e5cbd745eb238 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -23,10 +23,11 @@ #include #include +#include "agent/topic_subscriber.h" + namespace doris { class TaskWorkerPool; -class TopicSubscriber; class ExecEnv; class TAgentPublishRequest; class TAgentResult; @@ -52,6 +53,8 @@ class AgentServer { // [[deprecated]] void publish_cluster_state(TAgentResult& agent_result, const TAgentPublishRequest& request); + TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); } + private: DISALLOW_COPY_AND_ASSIGN(AgentServer); diff --git a/be/src/agent/topic_listener.h b/be/src/agent/topic_listener.h index 0ef9c597f19987b..af99a78545b80e3 100644 --- a/be/src/agent/topic_listener.h +++ b/be/src/agent/topic_listener.h @@ -17,19 +17,14 @@ #pragma once -#include +#include namespace doris { class TopicListener { public: virtual ~TopicListener() {} - // Deal with a single update - // - // Input parameters: - // protocol version: the version for the protocol, listeners should deal with the msg according to the protocol - // topic_update: single update - virtual void handle_update(const TAgentServiceVersion::type& protocol_version, - const TTopicUpdate& topic_update) = 0; + + virtual void handle_topic_info(const TPublishTopicRequest& topic_request) = 0; }; } // namespace doris diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index c5c4a324ebe9115..c3bcc29c623b09c 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -17,7 +17,7 @@ #include "agent/topic_subscriber.h" -#include +#include #include #include @@ -28,38 +28,22 @@ namespace doris { TopicSubscriber::TopicSubscriber() {} -TopicSubscriber::~TopicSubscriber() { - // Delete all listeners in the register - std::map>::iterator it = - _registered_listeners.begin(); - for (; it != _registered_listeners.end(); ++it) { - std::vector& listeners = it->second; - std::vector::iterator listener_it = listeners.begin(); - for (; listener_it != listeners.end(); ++listener_it) { - delete *listener_it; - } - } -} - -void TopicSubscriber::register_listener(TTopicType::type topic_type, TopicListener* listener) { +void TopicSubscriber::register_listener(TTopicInfoType::type topic_type, + std::unique_ptr topic_listener) { // Unique lock here to prevent access to listeners std::lock_guard lock(_listener_mtx); - this->_registered_listeners[topic_type].push_back(listener); + this->_registered_listeners.emplace(topic_type, std::move(topic_listener)); } -void TopicSubscriber::handle_updates(const TAgentPublishRequest& agent_publish_request) { - // Shared lock here in order to avoid updates in listeners' map +void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_request) { + // NOTE(wb): if we found there is bottleneck for handle_topic_info by LOG(INFO) + // eg, update workload info may delay other listener, then we need add a thread here + // to handle_topic_info asynchronous std::shared_lock lock(_listener_mtx); - // Currently, not deal with protocol version, the listener should deal with protocol version - const std::vector& topic_updates = agent_publish_request.updates; - std::vector::const_iterator topic_update_it = topic_updates.begin(); - for (; topic_update_it != topic_updates.end(); ++topic_update_it) { - std::vector& listeners = this->_registered_listeners[topic_update_it->type]; - std::vector::iterator listener_it = listeners.begin(); - // Send the update to all listeners with protocol version. - for (; listener_it != listeners.end(); ++listener_it) { - (*listener_it)->handle_update(agent_publish_request.protocol_version, *topic_update_it); - } + LOG(INFO) << "begin handle topic info"; + for (auto& listener_pair : _registered_listeners) { + listener_pair.second->handle_topic_info(topic_request); + LOG(INFO) << "handle topic " << listener_pair.first << " succ"; } } } // namespace doris diff --git a/be/src/agent/topic_subscriber.h b/be/src/agent/topic_subscriber.h index 490bd35d2b07f7c..7adcd0ea372935a 100644 --- a/be/src/agent/topic_subscriber.h +++ b/be/src/agent/topic_subscriber.h @@ -17,7 +17,8 @@ #pragma once -#include +#include +#include #include #include @@ -29,14 +30,15 @@ class TopicListener; class TopicSubscriber { public: TopicSubscriber(); - ~TopicSubscriber(); - // Put the topic type and listener to the map - void register_listener(TTopicType::type topic_type, TopicListener* listener); - // Handle all updates in the request - void handle_updates(const TAgentPublishRequest& agent_publish_request); + ~TopicSubscriber() = default; + + void register_listener(TTopicInfoType::type topic_type, + std::unique_ptr topic_listener); + + void handle_topic_info(const TPublishTopicRequest& topic_request); private: - std::map> _registered_listeners; + std::map> _registered_listeners; std::shared_mutex _listener_mtx; }; } // namespace doris diff --git a/be/src/agent/user_resource_listener.cpp b/be/src/agent/user_resource_listener.cpp deleted file mode 100644 index 6a73edf6dbc7499..000000000000000 --- a/be/src/agent/user_resource_listener.cpp +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "agent/user_resource_listener.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "common/config.h" -#include "common/status.h" -#include "runtime/client_cache.h" -#include "runtime/exec_env.h" - -namespace doris { - -using std::string; -using apache::thrift::TException; -using apache::thrift::transport::TTransportException; - -// Initialize the resource to cgroups file mapping -// TRESOURCE_IOPS not mapped - -UserResourceListener::UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info) - : _master_info(master_info), _exec_env(exec_env) {} - -UserResourceListener::~UserResourceListener() {} - -void UserResourceListener::handle_update(const TAgentServiceVersion::type& protocol_version, - const TTopicUpdate& topic_update) { - std::vector updates = topic_update.updates; - if (updates.size() > 0) { - int64_t new_version = updates[0].int_value; - // Async call to update users resource method - auto res = std::async(std::launch::async, &UserResourceListener::update_users_resource, - this, new_version); - res.get(); - } -} - -void UserResourceListener::update_users_resource(int64_t new_version) { - // Call fe to get latest user resource - Status master_status; - // using 500ms as default timeout value - FrontendServiceConnection client(_exec_env->frontend_client_cache(), - _master_info.network_address, config::thrift_rpc_timeout_ms, - &master_status); - TFetchResourceResult new_fetched_resource; - if (!master_status.ok()) { - LOG(ERROR) << "Get frontend client failed, with address:" - << _master_info.network_address.hostname << ":" - << _master_info.network_address.port; - return; - } - try { - try { - client->fetchResource(new_fetched_resource); - } catch (TTransportException& e) { - // reopen the client and set timeout to 500ms - master_status = client.reopen(config::thrift_rpc_timeout_ms); - - if (!master_status.ok()) { - LOG(WARNING) << "Reopen to get frontend client failed, with address:" - << _master_info.network_address.hostname << ":" - << _master_info.network_address.port << ", reason=" << e.what(); - return; - } - LOG(WARNING) << "fetchResource from frontend failed" - << ", reason=" << e.what(); - client->fetchResource(new_fetched_resource); - } - } catch (TException& e) { - // Already try twice, log here - static_cast(client.reopen(config::thrift_rpc_timeout_ms)); - LOG(WARNING) << "retry to fetchResource from " << _master_info.network_address.hostname - << ":" << _master_info.network_address.port << " failed:\n" - << e.what(); - return; - } -} -} // namespace doris diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp new file mode 100644 index 000000000000000..bf27861c284a79f --- /dev/null +++ b/be/src/agent/workload_group_listener.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "agent/workload_group_listener.h" + +#include "runtime/task_group/task_group.h" +#include "runtime/task_group/task_group_manager.h" +#include "util/mem_info.h" +#include "util/parse_util.h" + +namespace doris { + +void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& topic_request) { + std::set current_wg_ids; + for (const TopicInfo& topic_info : topic_request.topic_list) { + if (topic_info.topic_type != doris::TTopicInfoType::type::WORKLOAD_GROUP) { + continue; + } + + int wg_id = 0; + auto iter2 = topic_info.info_map.find("id"); + std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), wg_id); + + current_wg_ids.insert(wg_id); + } + + _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids); +} +} // namespace doris \ No newline at end of file diff --git a/be/src/agent/user_resource_listener.h b/be/src/agent/workload_group_listener.h similarity index 54% rename from be/src/agent/user_resource_listener.h rename to be/src/agent/workload_group_listener.h index 6fd25bef6716ab2..d31b1c4ef6531b4 100644 --- a/be/src/agent/user_resource_listener.h +++ b/be/src/agent/workload_group_listener.h @@ -17,31 +17,21 @@ #pragma once -#include -#include +#include #include "agent/topic_listener.h" +#include "runtime/exec_env.h" namespace doris { -class ExecEnv; -class TMasterInfo; - -class UserResourceListener : public TopicListener { +class WorkloadGroupListener : public TopicListener { public: - ~UserResourceListener(); - // Input parameters: - // root_cgroups_path: root cgroups allocated by admin to doris - UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info); - // This method should be async - virtual void handle_update(const TAgentServiceVersion::type& protocol_version, - const TTopicUpdate& topic_update); + ~WorkloadGroupListener() {} + WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {} + + void handle_topic_info(const TPublishTopicRequest& topic_request) override; private: - const TMasterInfo& _master_info; ExecEnv* _exec_env; - // Call cgroups mgr to update user's cgroups resource share - // Also refresh local user resource's cache - void update_users_resource(int64_t new_version); }; -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 4ef5def6f1d6b0d..cdd934d5c7d3533 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -189,6 +189,7 @@ void BlockedTaskScheduler::_make_task_run(std::list& local_tasks, TaskScheduler::~TaskScheduler() { stop(); + LOG(INFO) << "Task scheduler " << _name << " shutdown"; } Status TaskScheduler::start() { @@ -363,7 +364,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, void TaskScheduler::stop() { if (!this->_shutdown.load()) { this->_shutdown.store(true); - _blocked_task_scheduler->shutdown(); if (_task_queue) { _task_queue->close(); } diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index e6ed60148e04bee..b3c24fa96e7c58b 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -122,6 +122,56 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri return Status::OK(); } +void TaskGroupManager::delete_task_group_by_ids(std::set id_set) { + { + std::lock_guard w_lock(_group_mutex); + for (auto iter = _task_groups.begin(); iter != _task_groups.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + iter = _task_groups.erase(iter); + } else { + iter++; + } + } + } + + // stop task sche may cost some time, so it should not be locked + // task scheduler is stoped in task scheduler's destructor + std::set> task_sche_to_del; + std::set> scan_task_sche_to_del; + { + std::lock_guard lock(_task_scheduler_lock); + for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + task_sche_to_del.insert(std::move(_tg_sche_map[tg_id])); + iter = _tg_sche_map.erase(iter); + } else { + iter++; + } + } + + for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id])); + iter = _tg_scan_sche_map.erase(iter); + } else { + iter++; + } + } + + for (auto iter = _cgroup_ctl_map.begin(); iter != _cgroup_ctl_map.end();) { + uint64_t tg_id = iter->first; + if (id_set.find(tg_id) == id_set.end()) { + iter = _cgroup_ctl_map.erase(iter); + } else { + iter++; + } + } + } +} + void TaskGroupManager::stop() { for (auto& task_sche : _tg_sche_map) { task_sche.second->stop(); diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index e45cdeca7ea3ca5..ae501e93f3e7812 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -54,6 +54,8 @@ class TaskGroupManager { Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, int cpu_hard_limit, ExecEnv* exec_env, QueryContext* query_ctx_ptr); + void delete_task_group_by_ids(std::set id_set); + void stop(); private: diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index e98dd65a8c7a05c..8ad55e43e6e74b8 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -90,6 +90,11 @@ class BackendService : public BackendServiceIf { _agent_server->publish_cluster_state(result, request); } + void publish_topic_info(TPublishTopicResult& result, + const TPublishTopicRequest& topic_request) override { + _agent_server->get_topic_subscriber()->handle_topic_info(topic_request); + } + // DorisServer service void exec_plan_fragment(TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) override; diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index ad6f86c4f186f08..32048458ed9e264 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -144,8 +144,14 @@ class SimplifiedScanScheduler { _wg_name = wg_name; } + ~SimplifiedScanScheduler() { + stop(); + LOG(INFO) << "Scanner sche " << _wg_name << " shutdown"; + } + void stop() { _is_stop.store(true); + _scan_task_queue->shutdown(); _scan_thread_pool->shutdown(); _scan_thread_pool->wait(); } @@ -169,8 +175,9 @@ class SimplifiedScanScheduler { void _work() { while (!_is_stop.load()) { SimplifiedScanTask scan_task; - _scan_task_queue->blocking_get(&scan_task); - scan_task.scan_func(); + if (_scan_task_queue->blocking_get(&scan_task)) { + scan_task.scan_func(); + }; } } diff --git a/docs/en/docs/data-operate/update-delete/partial-update.md b/docs/en/docs/data-operate/update-delete/partial-update.md index fe38e7e30fba3ad..cd64601789e4846 100644 --- a/docs/en/docs/data-operate/update-delete/partial-update.md +++ b/docs/en/docs/data-operate/update-delete/partial-update.md @@ -116,6 +116,16 @@ partial_columns:true Also, specify the columns to be loaded in the `columns` header (it must include all key columns, or else updates cannot be performed). +### Flink Connector + +If you are using the Flink Connector, you need to add the following configuration: + +``` +'sink.properties.partial_columns' = 'true', +``` + +Also, specify the columns to be loaded in `sink.properties.column` (it must include all key columns, or else updates cannot be performed). + #### INSERT INTO In all data models, by default, when you use `INSERT INTO` with a given set of columns, the default behavior is to insert the entire row. To enable partial column updates in the Merge-on-Write implementation, you need to set the following session variable: diff --git a/docs/zh-CN/docs/data-operate/update-delete/partial-update.md b/docs/zh-CN/docs/data-operate/update-delete/partial-update.md index 935adddfb5c33bb..78dab08c201de13 100644 --- a/docs/zh-CN/docs/data-operate/update-delete/partial-update.md +++ b/docs/zh-CN/docs/data-operate/update-delete/partial-update.md @@ -116,6 +116,13 @@ partial_columns:true 同时在`columns`中指定要导入的列(必须包含所有key列,不然无法更新) +#### Flink Connector +如果使用Flink Connector, 需要添加如下配置: +``` +'sink.properties.partial_columns' = 'true', +``` +同时在`sink.properties.column`中指定要导入的列(必须包含所有key列,不然无法更新) + #### INSERT INTO 在所有的数据模型中,`INSERT INTO` 给定一部分列时默认行为都是整行写入,为了防止误用,在Merge-on-Write实现中,`INSERT INTO`默认仍然保持整行UPSERT的语意,如果需要开启部分列更新的语意,需要设置如下 session variable diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c50acaa087347a4..c9b75cb84f58c9d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2268,4 +2268,6 @@ public class Config extends ConfigBase { }) public static int sync_image_timeout_second = 300; + @ConfField(mutable = true, masterOnly = true) + public static int publish_topic_info_interval_ms = 30000; // 30s } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 8e46daaf936b95b..9a6d13552d6e400 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -113,6 +113,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; +import org.apache.doris.common.publish.TopicPublisher; +import org.apache.doris.common.publish.TopicPublisherThread; +import org.apache.doris.common.publish.WorkloadGroupPublisher; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.HttpURLUtil; @@ -492,6 +495,8 @@ public class Env { private HiveTransactionMgr hiveTransactionMgr; + private TopicPublisherThread topicPublisherThread; + public List getFrontendInfos() { List res = new ArrayList<>(); @@ -726,6 +731,8 @@ private Env(boolean isCheckpointCatalog) { this.binlogGcer = new BinlogGcer(); this.columnIdFlusher = new ColumnIdFlushDaemon(); this.queryCancelWorker = new QueryCancelWorker(systemInfo); + this.topicPublisherThread = new TopicPublisherThread( + "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); } public static void destroyCheckpoint() { @@ -970,6 +977,10 @@ public void initialize(String[] args) throws Exception { } queryCancelWorker.start(); + + TopicPublisher wgPublisher = new WorkloadGroupPublisher(this); + topicPublisherThread.addToTopicPublisherList(wgPublisher); + topicPublisherThread.start(); } // wait until FE is ready. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java index b95469d6910c6d7..f9d15a1ae5af57c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/AckResponseHandler.java @@ -30,15 +30,24 @@ public AckResponseHandler(Collection nodes, Listener listener) { this.listener = listener; } + public AckResponseHandler(Collection nodes) { + super(nodes); + this.listener = null; + } + @Override public void onResponse(Backend node) { super.onResponse(node); - listener.onResponse(node); + if (listener != null) { + listener.onResponse(node); + } } @Override public void onFailure(Backend node, Throwable t) { super.onFailure(node, t); - listener.onFailure(node, t); + if (listener != null) { + listener.onFailure(node, t); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java new file mode 100644 index 000000000000000..24086cb0e7f3dbd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisher.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.publish; + +import org.apache.doris.thrift.TPublishTopicRequest; + +public interface TopicPublisher { + public void getTopicInfo(TPublishTopicRequest req); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java new file mode 100644 index 000000000000000..616c8a30b5668a6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.publish; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPublishTopicRequest; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; + +public class TopicPublisherThread extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(TopicPublisherThread.class); + + private SystemInfoService clusterInfoService; + + private ExecutorService executor = ThreadPoolManager + .newDaemonFixedThreadPool(6, 256, "topic-publish-thread", true); + + public TopicPublisherThread(String name, long intervalMs, + SystemInfoService clusterInfoService) { + super(name, intervalMs); + this.clusterInfoService = clusterInfoService; + } + + private List topicPublisherList = new ArrayList<>(); + + public void addToTopicPublisherList(TopicPublisher topicPublisher) { + this.topicPublisherList.add(topicPublisher); + } + + @Override + protected void runAfterCatalogReady() { + if (!Config.enable_workload_group) { + return; + } + LOG.info("begin publish topic info"); + // step 1: get all publish topic info + TPublishTopicRequest request = new TPublishTopicRequest(); + for (TopicPublisher topicPublisher : topicPublisherList) { + topicPublisher.getTopicInfo(request); + } + + // step 2: publish topic info to all be + Collection nodesToPublish = clusterInfoService.getIdToBackend().values(); + AckResponseHandler handler = new AckResponseHandler(nodesToPublish); + for (Backend be : nodesToPublish) { + executor.submit(new TopicPublishWorker(request, be, handler)); + } + try { + int timeoutMs = Config.publish_topic_info_interval_ms / 3 * 2; + if (!handler.awaitAllInMs(timeoutMs)) { + Backend[] backends = handler.pendingNodes(); + if (backends.length > 0) { + LOG.warn("timed out waiting for all nodes to publish. (pending nodes: {})", + Arrays.toString(backends)); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public class TopicPublishWorker implements Runnable { + private TPublishTopicRequest request; + private Backend be; + private ResponseHandler handler; + + public TopicPublishWorker(TPublishTopicRequest request, Backend node, ResponseHandler handler) { + this.request = request; + this.be = node; + this.handler = handler; + } + + @Override + public void run() { + long beginTime = System.currentTimeMillis(); + try { + TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort()); + BackendService.Client client = ClientPool.backendPool.borrowObject(addr); + client.publishTopicInfo(request); + LOG.info("publish topic info to be {} success, time cost={} ms", + be.getHost(), (System.currentTimeMillis() - beginTime)); + } catch (Exception e) { + LOG.warn("publish topic info to be {} error happens: , time cost={} ms", + be.getHost(), (System.currentTimeMillis() - beginTime), e); + } finally { + handler.onResponse(be); + } + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java new file mode 100644 index 000000000000000..2330700ce7bb78e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.publish; + +import org.apache.doris.catalog.Env; +import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TopicInfo; + +public class WorkloadGroupPublisher implements TopicPublisher { + + private Env env; + + public WorkloadGroupPublisher(Env env) { + this.env = env; + } + + @Override + public void getTopicInfo(TPublishTopicRequest req) { + for (TopicInfo topicInfo : env.getWorkloadGroupMgr() + .getPublishTopicInfo()) { + req.addToTopicList(topicInfo); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 5e4097bbfec4722..dfb7f2fab9d9b44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -623,12 +623,11 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla TableRef ref = new TableRef(tableName, null, null); BaseTableRef tableRef = new BaseTableRef(ref, olapTable, tableName); tupleDescriptor.setRef(tableRef); - olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds()); olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds()); + olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds()); if (olapScan.getTableSample().isPresent()) { olapScanNode.setTableSample(new TableSample(olapScan.getTableSample().get().isPercent, olapScan.getTableSample().get().sampleValue, olapScan.getTableSample().get().seek)); - olapScanNode.computeSampleTabletIds(); } // TODO: remove this switch? diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java index 5c2d6c9aff1bb03..b2b4c30b9342fcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java @@ -273,13 +273,17 @@ protected boolean condition(Rule rule, Plan plan) { group_by_key is bound on t1.a */ duplicatedSlotNames.forEach(childOutputsToExpr::remove); - output.stream() - .filter(ne -> ne instanceof Alias) - .map(Alias.class::cast) - // agg function cannot be bound with group_by_key - .filter(alias -> !alias.child() - .anyMatch(expr -> expr instanceof AggregateFunction)) - .forEach(alias -> childOutputsToExpr.putIfAbsent(alias.getName(), alias.child())); + for (int i = 0; i < output.size(); i++) { + if (!(output.get(i) instanceof Alias)) { + continue; + } + Alias alias = (Alias) output.get(i); + if (alias.child().anyMatch(expr -> expr instanceof AggregateFunction)) { + continue; + } + // NOTICE: must use unbound expressions, because we will bind them in binding group by expr. + childOutputsToExpr.putIfAbsent(alias.getName(), agg.getOutputExpressions().get(i).child(0)); + } List replacedGroupBy = agg.getGroupByExpressions().stream() .map(groupBy -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index cc00cfd87709cf8..a63a55bc901b83f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -98,6 +98,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -220,7 +221,7 @@ public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { public void setIsPreAggregation(boolean isPreAggregation, String reason) { this.isPreAggregation = isPreAggregation; this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ? reason : - this.reasonOfPreAggregation + " " + reason; + this.reasonOfPreAggregation + " " + reason; } @@ -402,7 +403,8 @@ public void updateScanRangeInfoByNewMVSelector(long selectedIndexId, String scanRangeInfo = stringBuilder.toString(); String situation; boolean update; - CHECK: { // CHECKSTYLE IGNORE THIS LINE + CHECK: + { // CHECKSTYLE IGNORE THIS LINE if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite())) { situation = "The key type of table is duplicate, or unique key with merge-on-write."; @@ -545,7 +547,6 @@ public void init(Analyzer analyzer) throws UserException { computePartitionInfo(); } computeTupleState(analyzer); - computeSampleTabletIds(); /** * Compute InAccurate cardinality before mv selector and tablet pruning. @@ -778,7 +779,7 @@ private void addScanRangeLocations(Partition partition, // but it means we will do 3 S3 IO to get the data which will bring 3 slow query if (-1L != coolDownReplicaId) { final Optional replicaOptional = replicas.stream() - .filter(r -> r.getId() == coolDownReplicaId).findAny(); + .filter(r -> r.getId() == coolDownReplicaId).findAny(); replicaOptional.ifPresent( r -> { Backend backend = Env.getCurrentSystemInfo() @@ -930,75 +931,84 @@ public void setOutputColumnUniqueIds(Set outputColumnUniqueIds) { } /** - * First, determine how many rows to sample from each partition according to the number of partitions. - * Then determine the number of Tablets to be selected for each partition according to the average number - * of rows of Tablet, - * If seek is not specified, the specified number of Tablets are pseudo-randomly selected from each partition. - * If seek is specified, it will be selected sequentially from the seek tablet of the partition. - * And add the manually specified Tablet id to the selected Tablet. - * simpleTabletNums = simpleRows / partitionNums / (partitionRows / partitionTabletNums) + * Sample some tablets in the selected partition. + * If Seek is specified, the tablets sampled each time are the same. */ public void computeSampleTabletIds() { if (tableSample == null) { return; } OlapTable olapTable = (OlapTable) desc.getTable(); - long sampleRows; // The total number of sample rows - long hitRows = 1; // The total number of rows hit by the tablet - long totalRows = 0; // The total number of partition rows hit - long totalTablet = 0; // The total number of tablets in the hit partition + + // 1. Calculate the total number of rows in the selected partition, and sort partition list. + long selectedRows = 0; + long totalSampleRows = 0; + List selectedPartitionList = new ArrayList<>(); + for (Long partitionId : selectedPartitionIds) { + final Partition partition = olapTable.getPartition(partitionId); + final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); + selectedRows += selectedTable.getRowCount(); + selectedPartitionList.add(partitionId); + } + selectedPartitionList.sort(Comparator.naturalOrder()); + + // 2.Sampling is not required in some cases, will not take effect after clear sampleTabletIds. if (tableSample.isPercent()) { - sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1); + if (tableSample.getSampleValue() == 100) { + sampleTabletIds.clear(); + return; + } + totalSampleRows = (long) Math.max(selectedRows * (tableSample.getSampleValue() / 100.0), 1); } else { - sampleRows = Math.max(tableSample.getSampleValue(), 1); + if (tableSample.getSampleValue() > selectedRows) { + sampleTabletIds.clear(); + return; + } + totalSampleRows = tableSample.getSampleValue(); } - // calculate the number of tablets by each partition - long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1); - - for (Partition p : olapTable.getPartitions()) { - List ids = p.getBaseIndex().getTabletIdsInOrder(); - - if (ids.isEmpty()) { + // 3. Sampling partition. If Seek is specified, the partition will be the same for each sampling. + long hitRows = 0; // The number of rows hit by the tablet + long partitionSeek = tableSample.getSeek() != -1 + ? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * selectedPartitionIds.size()); + for (int i = 0; i < selectedPartitionList.size(); i++) { + int seekPid = (int) ((i + partitionSeek) % selectedPartitionList.size()); + final Partition partition = olapTable.getPartition(selectedPartitionList.get(seekPid)); + final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); + List tablets = selectedTable.getTablets(); + if (tablets.isEmpty()) { continue; } - // Skip partitions with row count < row count / 2 expected to be sampled per partition. - // It can be expected to sample a smaller number of partitions to avoid uneven distribution - // of sampling results. - if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) { - continue; + // 4. Calculate the number of rows that need to be sampled in the current partition. + long sampleRows = 0; // The number of sample rows in partition + if (tableSample.isPercent()) { + sampleRows = (long) Math.max(selectedTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1); + } else { + sampleRows = (long) Math.max( + tableSample.getSampleValue() * (selectedTable.getRowCount() / selectedRows), 1); } - // It is assumed here that all tablets row count is uniformly distributed - // TODO Use `p.getBaseIndex().getTablet(n).getRowCount()` to get each tablet row count to compute sample. - long avgRowsPerTablet = Math.max(p.getBaseIndex().getRowCount() / ids.size(), 1); - long tabletCounts = Math.max( - avgRowsPerPartition / avgRowsPerTablet + (avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1); - tabletCounts = Math.min(tabletCounts, ids.size()); - - long seek = tableSample.getSeek() != -1 - ? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * ids.size()); - for (int i = 0; i < tabletCounts; i++) { - int seekTid = (int) ((i + seek) % ids.size()); - sampleTabletIds.add(ids.get(seekTid)); + // 5. Sampling tablets. If Seek is specified, the same tablet will be sampled each time. + long tabletSeek = tableSample.getSeek() != -1 + ? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * tablets.size()); + for (int j = 0; j < tablets.size(); j++) { + int seekTid = (int) ((j + tabletSeek) % tablets.size()); + if (tablets.get(seekTid).getRowCount(true) == 0) { + continue; + } + sampleTabletIds.add(tablets.get(seekTid).getId()); + sampleRows -= tablets.get(seekTid).getRowCount(true); + hitRows += tablets.get(seekTid).getRowCount(true); + if (sampleRows <= 0) { + break; + } + } + if (hitRows > totalSampleRows) { + break; } - - hitRows += avgRowsPerTablet * tabletCounts; - totalRows += p.getBaseIndex().getRowCount(); - totalTablet += ids.size(); - } - - // all hit, direct full - if (totalRows < sampleRows) { - // can't fill full sample rows - sampleTabletIds.clear(); - } else if (sampleTabletIds.size() == totalTablet) { - // TODO add limit - sampleTabletIds.clear(); - } else if (!sampleTabletIds.isEmpty()) { - // TODO add limit } + LOG.debug("after computeSampleTabletIds, hitRows {}, selectedRows {}", hitRows, selectedRows); } public boolean isFromPrepareStmt() { @@ -1024,6 +1034,7 @@ private void computeTabletInfo() throws UserException { */ Preconditions.checkState(scanBackendIds.size() == 0); Preconditions.checkState(scanTabletIds.size() == 0); + computeSampleTabletIds(); for (Long partitionId : selectedPartitionIds) { final Partition partition = olapTable.getPartition(partitionId); final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 68115c4c8428b31..b8aebdccab02445 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -25,6 +25,8 @@ import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineWorkloadGroup; +import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -290,6 +292,16 @@ public TPipelineWorkloadGroup toThrift() { return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version); } + public TopicInfo toTopicInfo() { + HashMap newHashMap = new HashMap<>(); + newHashMap.put("id", String.valueOf(id)); + TopicInfo topicInfo = new TopicInfo(); + topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP); + topicInfo.setInfoMap(newHashMap); + topicInfo.setTopicKey(name); + return topicInfo; + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 1ec47d053fecfbb..5f0a52cab5583e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -40,6 +40,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -52,6 +53,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -133,6 +135,19 @@ public List getWorkloadGroup(ConnectContext context) thr return workloadGroups; } + public List getPublishTopicInfo() { + List workloadGroups = new ArrayList(); + readLock(); + try { + for (WorkloadGroup wg : idToWorkloadGroup.values()) { + workloadGroups.add(wg.toTopicInfo()); + } + } finally { + readUnlock(); + } + return workloadGroups; + } + public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context) throws UserException { String groupName = getWorkloadGroupNameAndCheckPriv(context); readLock(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index 3ab76732f613f1c..a2f3867de2bc2b3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -32,6 +32,8 @@ import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TPublishTopicResult; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -139,6 +141,11 @@ public TAgentResult publishClusterState(TAgentPublishRequest request) throws TEx return null; } + @Override + public TPublishTopicResult publishTopicInfo(TPublishTopicRequest request) throws TException { + return null; + } + @Override public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException { // TODO Auto-generated method stub diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index d3f5aeeacc25aff..058e356a3254971 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -50,6 +50,8 @@ import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TMasterInfo; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TPublishTopicResult; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -299,6 +301,11 @@ public TAgentResult publishClusterState(TAgentPublishRequest request) throws TEx return new TAgentResult(new TStatus(TStatusCode.OK)); } + @Override + public TPublishTopicResult publishTopicInfo(TPublishTopicRequest request) throws TException { + return new TPublishTopicResult(new TStatus(TStatusCode.OK)); + } + @Override public TStatus submitExportTask(TExportTaskRequest request) throws TException { return new TStatus(TStatusCode.OK); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 4b4c260b47398df..c161fc99e7139db 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -506,5 +506,4 @@ struct TTopicUpdate { struct TAgentPublishRequest { 1: required TAgentServiceVersion protocol_version 2: required list updates -} - +} \ No newline at end of file diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 3d77eab4cad469f..a7a9c50aed24fcc 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -138,6 +138,24 @@ struct TIngestBinlogResult { 1: optional Status.TStatus status; } +enum TTopicInfoType { + WORKLOAD_GROUP +} + +struct TopicInfo { + 1: optional string topic_key + 2: required TTopicInfoType topic_type + 3: optional map info_map +} + +struct TPublishTopicRequest { + 1: required list topic_list +} + +struct TPublishTopicResult { + 1: required Status.TStatus status +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -193,4 +211,6 @@ service BackendService { TCheckStorageFormatResult check_storage_format(); TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest ingest_binlog_request); + + TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request); } diff --git a/regression-test/data/correctness_p0/test_avg.out b/regression-test/data/correctness_p0/test_avg.out index 4cfab69a28b3cf2..da5b605234babfd 100644 --- a/regression-test/data/correctness_p0/test_avg.out +++ b/regression-test/data/correctness_p0/test_avg.out @@ -1,10 +1,111 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !count -- -99 +-- !select -- +100000000000001 +100000000000002 +100000000000003 +100000000000004 +100000000000005 +100000000000006 +100000000000007 +100000000000008 +100000000000009 +1000000000000010 +1000000000000011 +1000000000000012 +1000000000000013 +1000000000000014 +1000000000000015 +1000000000000016 +1000000000000017 +1000000000000018 +1000000000000019 +1000000000000020 +1000000000000021 +1000000000000022 +1000000000000023 +1000000000000024 +1000000000000025 +1000000000000026 +1000000000000027 +1000000000000028 +1000000000000029 +1000000000000030 +1000000000000031 +1000000000000032 +1000000000000033 +1000000000000034 +1000000000000035 +1000000000000036 +1000000000000037 +1000000000000038 +1000000000000039 +1000000000000040 +1000000000000041 +1000000000000042 +1000000000000043 +1000000000000044 +1000000000000045 +1000000000000046 +1000000000000047 +1000000000000048 +1000000000000049 +1000000000000050 +1000000000000051 +1000000000000052 +1000000000000053 +1000000000000054 +1000000000000055 +1000000000000056 +1000000000000057 +1000000000000058 +1000000000000059 +1000000000000060 +1000000000000061 +1000000000000062 +1000000000000063 +1000000000000064 +1000000000000065 +1000000000000066 +1000000000000067 +1000000000000068 +1000000000000069 +1000000000000070 +1000000000000071 +1000000000000072 +1000000000000073 +1000000000000074 +1000000000000075 +1000000000000076 +1000000000000077 +1000000000000078 +1000000000000079 +1000000000000080 +1000000000000081 +1000000000000082 +1000000000000083 +1000000000000084 +1000000000000085 +1000000000000086 +1000000000000087 +1000000000000088 +1000000000000089 +1000000000000090 +1000000000000091 +1000000000000092 +1000000000000093 +1000000000000094 +1000000000000095 +1000000000000096 +1000000000000097 +1000000000000098 +1000000000000099 -- !sum -- 90900000000004950 +-- !count -- +99 + -- !avg -- 9.181818181818681E14 diff --git a/regression-test/data/query_p0/sql_functions/datetime_functions/test_date_function.out b/regression-test/data/query_p0/sql_functions/datetime_functions/test_date_function.out index d67cdee1d83d101..2f1b3ddf7ac591b 100644 --- a/regression-test/data/query_p0/sql_functions/datetime_functions/test_date_function.out +++ b/regression-test/data/query_p0/sql_functions/datetime_functions/test_date_function.out @@ -182,9 +182,6 @@ -- !sql -- 16 --- !sql -- -\N - -- !sql -- 31 diff --git a/regression-test/suites/correctness_p0/test_avg.groovy b/regression-test/suites/correctness_p0/test_avg.groovy index 20e534863403ba2..3fd8e5e95703054 100644 --- a/regression-test/suites/correctness_p0/test_avg.groovy +++ b/regression-test/suites/correctness_p0/test_avg.groovy @@ -38,8 +38,9 @@ suite("test_avg") { sql """ INSERT INTO ${tableName} values (10000000000000${i}) """ } sql "sync" - qt_count """ SELECT COUNT(c_bigint) FROM ${tableName} """ + qt_select """select c_bigint from ${tableName} order by c_bigint""" qt_sum """ SELECT SUM(c_bigint) FROM ${tableName} """ + qt_count """ SELECT COUNT(c_bigint) FROM ${tableName} """ qt_avg """ SELECT AVG(c_bigint) FROM ${tableName} """ sql""" DROP TABLE IF EXISTS ${tableName} """ diff --git a/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy b/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy index 55bb2deab2384bf..9f426e23e3d1334 100644 --- a/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy +++ b/regression-test/suites/nereids_syntax_p0/analyze_agg.groovy @@ -43,7 +43,8 @@ suite("analyze_agg") { d VARCHAR(30), e VARCHAR(32), a VARCHAR(32), - f VARCHAR(32) + f VARCHAR(32), + g DECIMAL(9, 3) )ENGINE = OLAP UNIQUE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 30 @@ -73,4 +74,7 @@ suite("analyze_agg") { sql "select count(distinct t2.b), variance(distinct t2.c) from t2" exception "variance(DISTINCT c#2) can't support multi distinct." } + + // should not bind g /g in group by again, otherwise will throw exception + sql "select g / g as nu, sum(c) from t2 group by nu" } \ No newline at end of file diff --git a/regression-test/suites/query_p0/sql_functions/datetime_functions/test_date_function.groovy b/regression-test/suites/query_p0/sql_functions/datetime_functions/test_date_function.groovy index 7ff1afe64458d7e..2aa3dbb85db9c7c 100644 --- a/regression-test/suites/query_p0/sql_functions/datetime_functions/test_date_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/datetime_functions/test_date_function.groovy @@ -263,7 +263,6 @@ suite("test_date_function") { qt_sql """ select datediff(CAST('2007-12-31 23:59:59' AS DATETIME), CAST('2007-12-30' AS DATETIME)) """ qt_sql """ select datediff(CAST('2010-11-30 23:59:59' AS DATETIME), CAST('2010-12-31' AS DATETIME)) """ qt_sql """ select datediff('2010-10-31', '2010-10-15') """ - qt_sql """ select datediff('10000-10-31', '2010-10-15') from ${tableName}; """ // DAY qt_sql """ select day('1987-01-31') """ diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_load.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_load.sql new file mode 100644 index 000000000000000..7bfd2811cda8aaa --- /dev/null +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_load.sql @@ -0,0 +1,6 @@ +LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/customer.tbl.gz") + INTO TABLE customer + COLUMNS TERMINATED BY "|" + (c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,temp) +) diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_part_delete.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_part_delete.sql index a9d1b34d68cc0f0..4677867ae228513 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_part_delete.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_part_delete.sql @@ -1 +1 @@ -delete from customer where c_custkey > 1500 ; +delete from customer where c_custkey > 1500000 ; diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_sequence_create.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_sequence_create.sql index d1a099e7a5b9c5c..b756ad271b8a9d0 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_sequence_create.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/customer_sequence_create.sql @@ -11,7 +11,7 @@ CREATE TABLE IF NOT EXISTS `customer` ( UNIQUE KEY (`c_custkey`) DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 10 PROPERTIES ( -"function_column.sequence_type" = 'int', +"function_column.sequence_col" = 'c_custkey', "compression"="zstd", "replication_num" = "1", "enable_unique_key_merge_on_write" = "true" diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_load.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_load.sql new file mode 100644 index 000000000000000..3e1511ca69a67a0 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_load.sql @@ -0,0 +1,6 @@ +LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/date.tbl.gz") + INTO TABLE date + COLUMNS TERMINATED BY "|" + (d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth, d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,temp) +) diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_part_delete.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_part_delete.sql index 0c21b27cc48c536..839aa6da0a40b99 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_part_delete.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_part_delete.sql @@ -1 +1 @@ -delete from `date` where d_datekey >= '19920701' and d_datekey <= '19920731'; +delete from `date` where d_datekey >= '19950702'; diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_sequence_create.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_sequence_create.sql index 36b9fb599188dae..b450b872650c2b7 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_sequence_create.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/date_sequence_create.sql @@ -20,7 +20,7 @@ CREATE TABLE IF NOT EXISTS `date` ( UNIQUE KEY (`d_datekey`) DISTRIBUTED BY HASH(`d_datekey`) BUCKETS 1 PROPERTIES ( -"function_column.sequence_type" = 'int', +"function_column.sequence_col" = 'd_datekey', "compression"="zstd", "replication_num" = "1", "enable_unique_key_merge_on_write" = "true" diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_load.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_load.sql new file mode 100644 index 000000000000000..a5ed2465ea99c6b --- /dev/null +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_load.sql @@ -0,0 +1,6 @@ +LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/lineorder.tbl.*.gz") + INTO TABLE lineorder + COLUMNS TERMINATED BY "|" + (lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,temp) +) diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_part_delete.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_part_delete.sql index abb7ded4331f2a5..1d56fe6485a8bc0 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_part_delete.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_part_delete.sql @@ -1 +1 @@ -delete from lineorder where lo_orderkey >= 240001 and lo_orderkey <= 360000; +delete from lineorder where lo_orderkey >= 300013154; diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_sequence_create.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_sequence_create.sql index cfb34471364534f..230c4fde44d5dce 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_sequence_create.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/lineorder_sequence_create.sql @@ -28,7 +28,7 @@ PARTITION p1997 VALUES [("19970101"), ("19980101")), PARTITION p1998 VALUES [("19980101"), ("19990101"))) DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 48 PROPERTIES ( -"function_column.sequence_type" = 'int', +"function_column.sequence_col" = 'lo_orderkey', "compression"="zstd", "replication_num" = "1", "enable_unique_key_merge_on_write" = "true" diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_load.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_load.sql new file mode 100644 index 000000000000000..f98bb5d486fdafa --- /dev/null +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_load.sql @@ -0,0 +1,6 @@ +LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/part.tbl.gz") + INTO TABLE part + COLUMNS TERMINATED BY "|" + (p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,temp) +) diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_part_delete.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_part_delete.sql index 32ec2aa18b23973..1c34e3ff4d7785a 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_part_delete.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_part_delete.sql @@ -1 +1 @@ -delete from `part` where p_partkey > 10000; +delete from `part` where p_partkey > 700000; diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_sequence_create.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_sequence_create.sql index 2ffba863adcc3dd..a06acd3d9f8b080 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_sequence_create.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/part_sequence_create.sql @@ -12,7 +12,7 @@ CREATE TABLE IF NOT EXISTS `part` ( UNIQUE KEY (`p_partkey`) DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 10 PROPERTIES ( -"function_column.sequence_type" = 'int', +"function_column.sequence_col" = 'p_partkey', "compression"="zstd", "replication_num" = "1", "enable_unique_key_merge_on_write" = "true" diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_load.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_load.sql new file mode 100644 index 000000000000000..2bdfca469dc27e6 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_load.sql @@ -0,0 +1,6 @@ +LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/supplier.tbl.gz") + INTO TABLE supplier + COLUMNS TERMINATED BY "|" + (s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,temp) +) diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_part_delete.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_part_delete.sql index ac6a7030fd07b3b..42c2b222a5c7667 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_part_delete.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_part_delete.sql @@ -1 +1 @@ -delete from `supplier` where s_suppkey > 100; +delete from `supplier` where s_suppkey > 100000; diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_sequence_create.sql b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_sequence_create.sql index 5a92faef42b72d8..e88dde59acd588d 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_sequence_create.sql +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/ddl/supplier_sequence_create.sql @@ -10,7 +10,7 @@ CREATE TABLE IF NOT EXISTS `supplier` ( UNIQUE KEY (`s_suppkey`) DISTRIBUTED BY HASH(`s_suppkey`) BUCKETS 10 PROPERTIES ( -"function_column.sequence_type" = 'int', +"function_column.sequence_col" = 's_suppkey', "compression"="zstd", "replication_num" = "1", "enable_unique_key_merge_on_write" = "true" diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/four/load_four_step.groovy b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/four/load_four_step.groovy index 81702248a8f0868..f75d7710bf71fde 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/four/load_four_step.groovy +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/four/load_four_step.groovy @@ -20,92 +20,88 @@ // and modified by Doris. suite("load_four_step") { - def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000, "c_custkey", 1500], + def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000000, 1500000], "lineorder": ["""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, - lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600572, "lo_orderkey", 481137], - "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 20000, "p_partkey", 10000], + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600037902, 300018949], + "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 1400000, 700000], "date": ["""d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth, d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, - d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 255, "d_datekey", 224], - "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200, "s_suppkey", 100]] + d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 2556, 1278], + "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200000, 100000]] + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" tables.each { tableName, rows -> + // create table sql """ DROP TABLE IF EXISTS $tableName """ sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_sequence_create.sql""").text - for (j in 0..<2) { - streamLoad { - table tableName - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', rows[0] - set 'function_column.sequence_col', rows[2] - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz""" - - time 10000 // limit inflight 10s - - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + // step 1: load data + // step 2: load all data for 3 times + for (j in 0..<2) { + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + // load data from cos + def loadLabel = tableName + '_' + uniqueID + def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ('CANCELLED'.equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ('FINISHED'.equalsIgnoreCase(loadState)) { + break } + sleep(5000) } - sql 'sync' - int flag = 1 - for (int i = 1; i <= 5; i++) { - def loadRowCount = sql "select count(1) from ${tableName}" - logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString()) - assertTrue(loadRowCount[0][0] == rows[1]) - } + rowCount = sql "select count(*) from ${tableName}" + assertEquals(rows[1], rowCount[0][0]) } + + // step 3: delete 50% data sql """ set delete_without_partition = true; """ sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_part_delete.sql""").text + sql 'sync' for (int i = 1; i <= 5; i++) { def loadRowCount = sql "select count(1) from ${tableName}" logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString()) - assertTrue(loadRowCount[0][0] == rows[3]) + assertTrue(loadRowCount[0][0] == rows[2]) } - streamLoad { - table tableName - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', rows[0] - set 'function_column.sequence_col', rows[2] - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz""" - time 10000 // limit inflight 10s + // step 4: load full data again + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = tableName + '_' + uniqueID + def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ('CANCELLED'.equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ('FINISHED'.equalsIgnoreCase(loadState)) { + break } + sleep(5000) } + sql 'sync' for (int i = 1; i <= 5; i++) { def loadRowCount = sql "select count(1) from ${tableName}" diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/one/load_one_step.groovy b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/one/load_one_step.groovy index 31a8344d7309d07..1035fe2865af2a1 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/one/load_one_step.groovy +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/one/load_one_step.groovy @@ -17,51 +17,53 @@ suite("load_one_step") { - def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000], + def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000000], "lineorder": ["""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, - lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600572], - "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 20000], + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600037902], + "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 1400000], "date": ["""d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth, d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, - d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 255], - "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200]] + d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 2556], + "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200000]] + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() tables.each { tableName, rows -> + // create table sql """ DROP TABLE IF EXISTS $tableName """ sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_create.sql""").text - streamLoad { - table "${tableName}" - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', "${rows[0]}" - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz""" - time 10000 // limit inflight 10s + // load data from cos + def loadLabel = tableName + '_' + uniqueID + def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ('CANCELLED'.equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ('FINISHED'.equalsIgnoreCase(loadState)) { + break } + sleep(5000) } - sql 'sync' - for (int i = 1; i <= 5; i++) { - def loadRowCount = sql "select count(1) from ${tableName}" - logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString()) - assertTrue(loadRowCount[0][0] == rows[1]) - } + rowCount = sql "select count(*) from ${tableName}" + assertEquals(rows[1], rowCount[0][0]) } } diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/three/load_three_step.groovy b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/three/load_three_step.groovy index aae1b16426c417e..a3553068e7650d9 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/three/load_three_step.groovy +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/three/load_three_step.groovy @@ -17,56 +17,62 @@ suite("load_three_step") { - def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000, "c_custkey"], - "lineorder": ["""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, - lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, - lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600572, "lo_orderkey"], - "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 20000, "p_partkey"], + def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000000], + "lineorder": ["""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, + lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600037902], + "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 1400000], "date": ["""d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth, - d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, - d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 255, "d_datekey"], - "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200, "s_suppkey"]] + d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, + d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 2556], + "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200000]] + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" tables.each { tableName, rows -> + // create table sql """ DROP TABLE IF EXISTS $tableName """ sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_sequence_create.sql""").text - for (j in 0..<2) { - streamLoad { - table tableName - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', rows[0] - set 'function_column.sequence_col', rows[2] - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz""" - time 10000 // limit inflight 10s - - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + // step 1: load data + // step 2: load all data for 3 times + for (j in 0..<2) { + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + // load data from cos + def loadLabel = tableName + '_' + uniqueID + def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ('CANCELLED'.equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ('FINISHED'.equalsIgnoreCase(loadState)) { + break } + sleep(5000) } - sql 'sync' - for (int i = 1; i <= 5; i++) { - def loadRowCount = sql "select count(1) from ${tableName}" - logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString()) - assertTrue(loadRowCount[0][0] == rows[1]) - } + rowCount = sql "select count(*) from ${tableName}" + assertEquals(rows[1], rowCount[0][0]) } + + // step 3: delete all data sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_delete.sql""").text + sql 'sync' for (int i = 1; i <= 5; i++) { def loadRowCount = sql "select count(1) from ${tableName}" logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString()) diff --git a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/two/load_two_step.groovy b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/two/load_two_step.groovy index f309aaeba3ebcbf..1077d2977fffcff 100644 --- a/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/two/load_two_step.groovy +++ b/regression-test/suites/unique_with_mow_p2/ssb_unique_load_zstd/two/load_two_step.groovy @@ -17,54 +17,58 @@ suite("load_two_step") { - def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000, "c_custkey"], + def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000000], "lineorder": ["""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, - lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600572, "lo_orderkey"], - "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 20000, "p_partkey"], + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600037902], + "part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 1400000], "date": ["""d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth, d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, - d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 255, "d_datekey"], - "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200, "s_suppkey"]] + d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 2556], + "supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200000]] + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() tables.each { tableName, rows -> + // create table sql """ DROP TABLE IF EXISTS $tableName """ sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_sequence_create.sql""").text - streamLoad { - table tableName - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', rows[0] - set 'function_column.sequence_col', rows[2] - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz""" - time 10000 // limit inflight 10s + // step 1: load data from cos + def loadLabel = tableName + '_' + uniqueID + def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ('CANCELLED'.equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ('FINISHED'.equalsIgnoreCase(loadState)) { + break } + sleep(5000) } - sql 'sync' - for (int i = 1; i <= 5; i++) { - def loadRowCount = sql "select count(1) from ${tableName}" - logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString()) - assertTrue(loadRowCount[0][0] == rows[1]) - } + rowCount = sql "select count(*) from ${tableName}" + assertEquals(rows[1], rowCount[0][0]) + + // step 2: delete all data sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_delete.sql""").text + sql 'sync' for (int i = 1; i <= 5; i++) { def loadRowCount = sql "select count(1) from ${tableName}" logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString())