diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 326758ffeb486c..964a898129b09f 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -76,6 +76,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, heartbeat_result.backend_info.__set_http_port(config::webserver_port); heartbeat_result.backend_info.__set_be_rpc_port(-1); heartbeat_result.backend_info.__set_brpc_port(config::brpc_port); + heartbeat_result.backend_info.__set_arrow_flight_sql_port(config::arrow_flight_sql_port); heartbeat_result.backend_info.__set_version(get_short_version()); heartbeat_result.backend_info.__set_be_start_time(_be_epoch); heartbeat_result.backend_info.__set_be_node_role(config::be_node_role); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 74980aa41d53d1..5b3a8dafd294f4 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -59,7 +59,7 @@ DEFINE_Int32(be_port, "9060"); // port for brpc DEFINE_Int32(brpc_port, "8060"); -DEFINE_Int32(arrow_flight_port, "-1"); +DEFINE_Int32(arrow_flight_sql_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/common/config.h b/be/src/common/config.h index 4c41e6f0e16423..b34a6c98ccda5b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -96,9 +96,9 @@ DECLARE_Int32(be_port); // port for brpc DECLARE_Int32(brpc_port); -// port for arrow flight -// Default -1, do not start arrow flight server. -DECLARE_Int32(arrow_flight_port); +// port for arrow flight sql +// Default -1, do not start arrow flight sql server. +DECLARE_Int32(arrow_flight_sql_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index c51ced1aeb2248..adbaf7fbb0d787 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -144,7 +144,6 @@ Status BufferControlBlock::add_batch(std::unique_ptr& result) _fe_result_batch_queue.push_back(std::move(result)); } _buffer_rows += num_rows; - _data_arrival.notify_one(); } else { auto ctx = _waiting_rpc.front(); _waiting_rpc.pop_front(); diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index a3b99300f209c5..c4d0f148ed2a23 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -21,6 +21,9 @@ #include #include #include + +#include +#include // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -33,6 +36,7 @@ #include "util/doris_metrics.h" #include "util/metrics.h" #include "util/thread.h" +#include "util/uid_util.h" namespace doris { @@ -42,7 +46,7 @@ ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) { // Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the // actual size of all BufferControlBlock. REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() { - // std::lock_guard l(_lock); + // std::lock_guard l(_buffer_map_lock); return _buffer_map.size(); }); } @@ -80,7 +84,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size } { - std::lock_guard l(_lock); + std::unique_lock wlock(_buffer_map_lock); _buffer_map.insert(std::make_pair(query_id, control_block)); // BufferControlBlock should destroy after max_timeout // for exceed max_timeout FE will return timeout to client @@ -95,8 +99,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size } std::shared_ptr ResultBufferMgr::find_control_block(const TUniqueId& query_id) { - // TODO(zhaochun): this lock can be bottleneck? - std::lock_guard l(_lock); + std::shared_lock rlock(_buffer_map_lock); BufferMap::iterator iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { @@ -108,14 +111,12 @@ std::shared_ptr ResultBufferMgr::find_control_block(const TU void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id, const RowDescriptor& row_desc) { - { - std::lock_guard l(_lock); - _row_descriptor_map.insert(std::make_pair(query_id, row_desc)); - } + std::unique_lock wlock(_row_descriptor_map_lock); + _row_descriptor_map.insert(std::make_pair(query_id, row_desc)); } RowDescriptor ResultBufferMgr::find_row_descriptor(const TUniqueId& query_id) { - std::lock_guard l(_lock); + std::shared_lock rlock(_row_descriptor_map_lock); RowDescriptorMap::iterator iter = _row_descriptor_map.find(query_id); if (_row_descriptor_map.end() != iter) { @@ -150,18 +151,23 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, } Status ResultBufferMgr::cancel(const TUniqueId& query_id) { - std::lock_guard l(_lock); - BufferMap::iterator iter = _buffer_map.find(query_id); + { + std::unique_lock wlock(_buffer_map_lock); + BufferMap::iterator iter = _buffer_map.find(query_id); - if (_buffer_map.end() != iter) { - iter->second->cancel(); - _buffer_map.erase(iter); + if (_buffer_map.end() != iter) { + iter->second->cancel(); + _buffer_map.erase(iter); + } } - RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id); + { + std::unique_lock wlock(_row_descriptor_map_lock); + RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id); - if (_row_descriptor_map.end() != row_desc_iter) { - _row_descriptor_map.erase(row_desc_iter); + if (_row_descriptor_map.end() != row_desc_iter) { + _row_descriptor_map.erase(row_desc_iter); + } } return Status::OK(); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 8c9b621968a296..4e5cd38a7264b7 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -86,9 +87,11 @@ class ResultBufferMgr { void cancel_thread(); // lock for buffer map - std::mutex _lock; + std::shared_mutex _buffer_map_lock; // buffer block map BufferMap _buffer_map; + // lock for descriptor map + std::shared_mutex _row_descriptor_map_lock; // for arrow flight RowDescriptorMap _row_descriptor_map; diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index d7a648c54b6108..8a0f1e67859494 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -57,12 +57,17 @@ arrow::Result> ArrowFlightBatchReader::C } arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr* out) { - CHECK(*out == nullptr); + // *out not nullptr + *out = nullptr; auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out); if (UNLIKELY(!st.ok())) { - LOG(WARNING) << st.to_string(); + LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string(); ARROW_RETURN_NOT_OK(to_arrow_status(st)); } + if (*out != nullptr) { + VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns(); + } return arrow::Status::OK(); } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index abfa7913e7d541..3e2552ef23b337 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -536,7 +536,7 @@ int main(int argc, char** argv) { // 5. arrow flight service std::shared_ptr flight_server = std::move(doris::flight::FlightSqlServer::create()).ValueOrDie(); - status = flight_server->init(doris::config::arrow_flight_port); + status = flight_server->init(doris::config::arrow_flight_sql_port); // 6. start daemon thread to do clean or gc jobs doris::Daemon daemon; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 525e9ea0248470..5367135c134ada 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -96,6 +96,7 @@ #include "runtime/thread_context.h" #include "runtime/types.h" #include "service/point_query_executor.h" +#include "util/arrow/row_batch.h" #include "util/async_io.h" #include "util/brpc_client_cache.h" #include "util/doris_metrics.h" @@ -704,6 +705,40 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c } } +void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcController* controller, + const PFetchArrowFlightSchemaRequest* request, + PFetchArrowFlightSchemaResult* result, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([request, result, done]() { + brpc::ClosureGuard closure_guard(done); + RowDescriptor row_desc = ExecEnv::GetInstance()->result_mgr()->find_row_descriptor( + UniqueId(request->finst_id()).to_thrift()); + if (row_desc.equals(RowDescriptor())) { + auto st = Status::NotFound("not found row descriptor"); + st.to_protobuf(result->mutable_status()); + return; + } + + std::shared_ptr schema; + auto st = convert_to_arrow_schema(row_desc, &schema); + if (UNLIKELY(!st.ok())) { + st.to_protobuf(result->mutable_status()); + return; + } + + std::string schema_str; + st = serialize_arrow_schema(row_desc, &schema, &schema_str); + if (st.ok()) { + result->set_schema(std::move(schema_str)); + } + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + offer_failed(result, done, _heavy_work_pool); + return; + } +} + Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response) { PointQueryExecutor lookup_util; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index e7a5914274cee7..db0ee07581ef02 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -82,6 +82,11 @@ class PInternalServiceImpl : public PBackendService { PFetchTableSchemaResult* result, google::protobuf::Closure* done) override; + void fetch_arrow_flight_schema(google::protobuf::RpcController* controller, + const PFetchArrowFlightSchemaRequest* request, + PFetchArrowFlightSchemaResult* result, + google::protobuf::Closure* done) override; + void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index 78fe346be6cb1b..b60034696acd4f 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -37,6 +37,8 @@ #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/types.h" +#include "util/arrow/block_convertor.h" +#include "vec/core/block.h" namespace doris { @@ -188,4 +190,16 @@ Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::strin return Status::OK(); } +Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr* schema, + std::string* result) { + std::vector slots; + for (auto tuple_desc : row_desc.tuple_descriptors()) { + slots.insert(slots.end(), tuple_desc->slots().begin(), tuple_desc->slots().end()); + } + auto block = vectorized::Block(slots, 0); + std::shared_ptr batch; + RETURN_IF_ERROR(convert_to_arrow_batch(block, *schema, arrow::default_memory_pool(), &batch)); + return serialize_record_batch(*batch, result); +} + } // namespace doris diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index 8946502f839cd3..b5e9d8d3c3781c 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -43,4 +43,7 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc, Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); +Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr* schema, + std::string* result); + } // namespace doris diff --git a/conf/be.conf b/conf/be.conf index e91eb7d52d66e9..52ac34a91b6c76 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -40,7 +40,7 @@ be_port = 9060 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 -arrow_flight_port = -1 +arrow_flight_sql_port = -1 # HTTPS configures enable_https = false diff --git a/conf/fe.conf b/conf/fe.conf index 82701115b95bd5..fd145e743efe58 100644 --- a/conf/fe.conf +++ b/conf/fe.conf @@ -52,6 +52,7 @@ http_port = 8030 rpc_port = 9020 query_port = 9030 edit_log_port = 9010 +arrow_flight_sql_port = -1 # Choose one if there are more than one ip except loopback address. # Note that there should at most one ip match this list. diff --git a/docs/en/community/developer-guide/be-vscode-dev.md b/docs/en/community/developer-guide/be-vscode-dev.md index bf70f93c2cd30c..a01dabddd3b3d9 100644 --- a/docs/en/community/developer-guide/be-vscode-dev.md +++ b/docs/en/community/developer-guide/be-vscode-dev.md @@ -115,6 +115,7 @@ be_rpc_port = 9070 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 +arrow_flight_sql_port = -1 # Note that there should be at most one ip that matches this list. # If no ip matches this rule, it will choose one randomly. diff --git a/docs/en/community/developer-guide/fe-idea-dev.md b/docs/en/community/developer-guide/fe-idea-dev.md index b873bc8d153223..d52454cdb6b00e 100644 --- a/docs/en/community/developer-guide/fe-idea-dev.md +++ b/docs/en/community/developer-guide/fe-idea-dev.md @@ -174,6 +174,7 @@ sys_log_level = INFO http_port = 8030 rpc_port = 9020 query_port = 9030 +arrow_flight_sql_port = -1 edit_log_port = 9010 # Choose one if there are more than one ip except loopback address. diff --git a/docs/en/docs/admin-manual/cluster-management/upgrade.md b/docs/en/docs/admin-manual/cluster-management/upgrade.md index 41ae6baf619397..5cdb67c72aca5f 100644 --- a/docs/en/docs/admin-manual/cluster-management/upgrade.md +++ b/docs/en/docs/admin-manual/cluster-management/upgrade.md @@ -144,6 +144,7 @@ admin set frontend config("disable_tablet_scheduler" = "true"); http_port = 18030 rpc_port = 19020 query_port = 19030 + arrow_flight_sql_port = 19040 edit_log_port = 19010 ... ``` diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 08c120674601fe..dac22a43a86ae2 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -127,6 +127,12 @@ There are two ways to configure BE configuration items: * Description: The port of BRPC on BE, used for communication between BEs * Default value: 8060 +#### `arrow_flight_sql_port` + +* Type: int32 +* Description: The port of Arrow Flight SQL server on BE, used for communication between Arrow Flight Client and BE +* Default value: -1 + #### `enable_https` * Type: bool diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index cdca4f048d1448..8fc84e6ef41f33 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -384,6 +384,12 @@ Default:9030 FE MySQL server port +#### `arrow_flight_sql_port` + +Default:-1 + +Arrow Flight SQL server port + #### `frontend_address` Status: Deprecated, not recommended use. This parameter may be deleted later diff --git a/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md b/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md index d08b932dd22b05..17d4dd9a5a28e5 100644 --- a/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md +++ b/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md @@ -76,6 +76,7 @@ none "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 @@ -85,6 +86,7 @@ none * `queryPort` is the MySQL protocol port of the FE node. * `rpcPort` is the thrift RPC port of the FE node. * `maxReplayedJournal` represents the maximum metadata journal id currently played back by the FE node. + * `arrowFlightSqlPort` is the Arrow Flight SQL port of the FE node. ## Examples @@ -114,6 +116,7 @@ none "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 diff --git a/docs/en/docs/admin-manual/http-actions/fe/node-action.md b/docs/en/docs/admin-manual/http-actions/fe/node-action.md index e1189b7e4e0af6..842d58c7a360e7 100644 --- a/docs/en/docs/admin-manual/http-actions/fe/node-action.md +++ b/docs/en/docs/admin-manual/http-actions/fe/node-action.md @@ -80,6 +80,7 @@ frontends: "HttpPort", "QueryPort", "RpcPort", + "ArrowFlightSqlPort", "Role", "IsMaster", "ClusterId", diff --git a/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md b/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md index 1975b448e5f45d..a8483da8f83915 100644 --- a/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md +++ b/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md @@ -239,6 +239,7 @@ FE currently has the following ports * http_port: http port, also used to push image * rpc_port: thrift server port of Frontend * query_port: Mysql connection port +* arrow_flight_sql_port: Arrow Flight SQL connection port 1. edit_log_port @@ -256,6 +257,10 @@ FE currently has the following ports After modifying the configuration, restart FE directly. This only affects mysql's connection target. +5. arrow_flight_sql_port + + After modifying the configuration, restart FE directly. This only affects arrow flight sql server connection target. + ### Recover metadata from FE memory In some extreme cases, the image file on the disk may be damaged, but the metadata in the memory is intact. At this point, we can dump the metadata from the memory and replace the image file on the disk to recover the metadata. the entire non-stop query service operation steps are as follows: diff --git a/docs/en/docs/get-starting/quick-start.md b/docs/en/docs/get-starting/quick-start.md index 414fe140185d25..db145540cc7106 100644 --- a/docs/en/docs/get-starting/quick-start.md +++ b/docs/en/docs/get-starting/quick-start.md @@ -143,6 +143,7 @@ mysql> show frontends\G; HttpPort: 8030 QueryPort: 9030 RpcPort: 9020 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1685821635 diff --git a/docs/en/docs/install/standard-deployment.md b/docs/en/docs/install/standard-deployment.md index e546d64c584930..5622ea84e1efae 100644 --- a/docs/en/docs/install/standard-deployment.md +++ b/docs/en/docs/install/standard-deployment.md @@ -123,6 +123,7 @@ Doris instances communicate directly over the network. The following table shows | FE | http_port | 8030 | FE <--> FE, user <--> FE | HTTP server port on FE | | FE | rpc_port | 9020 | BE --> FE, FE <--> FE | Thrift server port on FE; The configurations of each FE should be consistent. | | FE | query_port | 9030 | user <--> FE | MySQL server port on FE | +| FE | arrow_flight_sql_port | 9040 | user <--> FE | Arrow Flight SQL server port on FE | | FE | edit\_log_port | 9010 | FE <--> FE | Port on FE for BDBJE communication | | Broker | broker ipc_port | 8000 | FE --> Broker, BE --> Broker | Thrift server port on Broker for receiving requests | diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md b/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md index 52bc90cd7bda65..c78f19a64987b8 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md @@ -56,6 +56,7 @@ mysql> desc function frontends(); | HttpPort | TEXT | No | false | NULL | NONE | | QueryPort | TEXT | No | false | NULL | NONE | | RpcPort | TEXT | No | false | NULL | NONE | +| ArrowFlightSqlPort| TEXT | No | false | NULL | NONE | | Role | TEXT | No | false | NULL | NONE | | IsMaster | TEXT | No | false | NULL | NONE | | ClusterId | TEXT | No | false | NULL | NONE | @@ -85,6 +86,7 @@ mysql> select * from frontends()\G HttpPort: 8034 QueryPort: 9033 RpcPort: 9023 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1258341841 diff --git a/docs/zh-CN/community/developer-guide/be-vscode-dev.md b/docs/zh-CN/community/developer-guide/be-vscode-dev.md index 7a18a186f37491..9e8a1855fa0c81 100644 --- a/docs/zh-CN/community/developer-guide/be-vscode-dev.md +++ b/docs/zh-CN/community/developer-guide/be-vscode-dev.md @@ -114,6 +114,7 @@ be_rpc_port = 9070 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 +arrow_flight_sql_port = -1 # Note that there should at most one ip match this list. # If no ip match this rule, will choose one randomly. diff --git a/docs/zh-CN/community/developer-guide/fe-idea-dev.md b/docs/zh-CN/community/developer-guide/fe-idea-dev.md index 62813adc3b5647..5eb1a70548c45a 100644 --- a/docs/zh-CN/community/developer-guide/fe-idea-dev.md +++ b/docs/zh-CN/community/developer-guide/fe-idea-dev.md @@ -169,6 +169,7 @@ sys_log_level = INFO http_port = 8030 rpc_port = 9020 query_port = 9030 +arrow_flight_sql_port = -1 edit_log_port = 9010 # Choose one if there are more than one ip except loopback address. diff --git a/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md b/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md index 27f2994b8ed434..0b2145b9c74881 100644 --- a/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md +++ b/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md @@ -144,6 +144,7 @@ admin set frontend config("disable_tablet_scheduler" = "true"); http_port = 18030 rpc_port = 19020 query_port = 19030 + arrow_flight_sql_port = 19040 edit_log_port = 19010 ... ``` diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 7501a800199c59..9e91afd12ea20b 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -123,6 +123,12 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下 * 描述:BE 上的 brpc 的端口,用于 BE 之间通讯 * 默认值:8060 +#### `arrow_flight_sql_port` + +* 类型:int32 +* 描述:FE 上的 Arrow Flight SQL server 的端口,用于从 Arrow Flight Client 和 BE 之间通讯 +* 默认值:-1 + #### `enable_https` * 类型:bool diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 1079b604312113..e4c02ef8b468f8 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -384,6 +384,12 @@ heartbeat_mgr 中处理心跳事件的线程数。 Doris FE 通过 mysql 协议查询连接端口 +#### `arrow_flight_sql_port` + +默认值:-1 + +Doris FE 通过 Arrow Flight SQL 协议查询连接端口 + #### `frontend_address` 状态:已弃用,不建议使用。 diff --git a/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md b/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md index 689da335c984b1..d767aa1cc52620 100644 --- a/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md +++ b/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md @@ -76,6 +76,7 @@ under the License. "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 @@ -85,6 +86,7 @@ under the License. * `queryPort` 是 FE 节点的 MySQL 协议端口。 * `rpcPort` 是 FE 节点的 thrift RPC 端口。 * `maxReplayedJournal` 表示 FE 节点当前回放的最大元数据日志id。 + * `arrowFlightSqlPort` 是 FE 节点的 Arrow Flight SQL 协议端口。 ## Examples @@ -114,6 +116,7 @@ under the License. "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 diff --git a/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md b/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md index 9960ad1551485c..53cc693b6f986b 100644 --- a/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md +++ b/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md @@ -80,6 +80,7 @@ frontends: "HttpPort", "QueryPort", "RpcPort", + "ArrowFlightSqlPort", "Role", "IsMaster", "ClusterId", diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md b/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md index 12ef2de434c15c..beb10e06b3c058 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md @@ -240,6 +240,7 @@ FE 目前有以下几个端口 * http_port:http 端口,也用于推送 image * rpc_port:FE 的 thrift server port * query_port:Mysql 连接端口 +* arrow_flight_sql_port: Arrow Flight SQL 连接端口 1. edit_log_port @@ -257,6 +258,9 @@ FE 目前有以下几个端口 修改配置后,直接重启 FE 即可。这个只影响到 mysql 的连接目标。 +5. arrow_flight_sql_port + + 修改配置后,直接重启 FE 即可。这个只影响到 Arrow Flight SQL 的连接目标。 ### 从 FE 内存中恢复元数据 diff --git a/docs/zh-CN/docs/get-starting/quick-start.md b/docs/zh-CN/docs/get-starting/quick-start.md index 8df83dab60048c..ff9e75bdfa0d0e 100644 --- a/docs/zh-CN/docs/get-starting/quick-start.md +++ b/docs/zh-CN/docs/get-starting/quick-start.md @@ -147,6 +147,7 @@ mysql> show frontends\G HttpPort: 8030 QueryPort: 9030 RpcPort: 9020 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1685821635 @@ -277,6 +278,7 @@ mysql> SHOW BACKENDS\G BePort: 9060 HttpPort: 8040 BrpcPort: 8060 + ArrowFlightSqlPort: 8070 LastStartTime: 2022-08-16 15:31:37 LastHeartbeat: 2022-08-17 13:33:17 Alive: true diff --git a/docs/zh-CN/docs/install/standard-deployment.md b/docs/zh-CN/docs/install/standard-deployment.md index a338ab5f35f929..923dc52a3d90d1 100644 --- a/docs/zh-CN/docs/install/standard-deployment.md +++ b/docs/zh-CN/docs/install/standard-deployment.md @@ -119,6 +119,7 @@ Doris 各个实例直接通过网络进行通讯。以下表格展示了所有 | FE | http_port | 8030 | FE <--> FE,用户 <--> FE |FE 上的 http server 端口 | | FE | rpc_port | 9020 | BE --> FE, FE <--> FE | FE 上的 thrift server 端口,每个fe的配置需要保持一致| | FE | query_port | 9030 | 用户 <--> FE | FE 上的 mysql server 端口 | +| FE | arrow_flight_sql_port | 9040 | 用户 <--> FE | FE 上的 Arrow Flight SQL server 端口 | | FE | edit\_log_port | 9010 | FE <--> FE | FE 上的 bdbje 之间通信用的端口 | | Broker | broker\_ipc_port | 8000 | FE --> Broker, BE --> Broker | Broker 上的 thrift server,用于接收请求 | diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md index f367fd30141eb2..a2c85ec75456ba 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md @@ -55,6 +55,7 @@ mysql> desc function frontends(); | HttpPort | TEXT | No | false | NULL | NONE | | QueryPort | TEXT | No | false | NULL | NONE | | RpcPort | TEXT | No | false | NULL | NONE | +| ArrowFlightSqlPort| TEXT | No | false | NULL | NONE | | Role | TEXT | No | false | NULL | NONE | | IsMaster | TEXT | No | false | NULL | NONE | | ClusterId | TEXT | No | false | NULL | NONE | @@ -84,6 +85,7 @@ mysql> select * from frontends()\G HttpPort: 8034 QueryPort: 9033 RpcPort: 9023 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1258341841 diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1d4f35fef78e25..a2aa07ba24b7fe 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -390,6 +390,9 @@ public class Config extends ConfigBase { @ConfField(description = {"FE MySQL server 的端口号", "The port of FE MySQL server"}) public static int query_port = 9030; + @ConfField(description = {"FE Arrow-Flight-SQL server 的端口号", "The port of FE Arrow-Flight-SQ server"}) + public static int arrow_flight_sql_port = -1; + @ConfField(description = {"MySQL 服务的 IO 线程数", "The number of IO threads in MySQL service"}) public static int mysql_service_io_threads_num = 4; diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 26e49d276e6432..0c00f1f4425a8d 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -496,6 +496,10 @@ under the License. org.apache.httpcomponents httpclient + + com.google.flatbuffers + flatbuffers-java + org.apache.doris hive-catalog-shade @@ -706,6 +710,64 @@ under the License. quartz 2.3.2 + + + + org.apache.arrow + arrow-memory-netty + + + io.grpc + grpc-netty + + + io.grpc + grpc-core + + + io.grpc + grpc-context + + + io.netty + netty-buffer + + + io.netty + netty-handler + + + io.netty + netty-transport + + + io.grpc + grpc-api + + + org.apache.arrow + flight-core + + + org.apache.arrow + arrow-memory-core + + + org.apache.arrow + arrow-jdbc + + + org.apache.arrow + arrow-vector + + + org.hamcrest + hamcrest + + + org.apache.arrow + flight-sql + @@ -777,7 +839,7 @@ under the License. - + de.jflex @@ -1053,5 +1115,12 @@ under the License. + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index f204d11edb5a85..59dd3d96b01a88 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -193,7 +193,8 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star } if (options.enableQeService) { - QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler()); + QeService qeService = new QeService(Config.query_port, Config.arrow_flight_sql_port, + ExecuteEnv.getInstance().getScheduler()); qeService.start(); } @@ -231,6 +232,11 @@ private static void checkAllPorts() throws IOException { "Rpc port", NetUtils.RPC_PORT_SUGGESTION)) { throw new IOException("port " + Config.rpc_port + " already in use"); } + if (Config.arrow_flight_sql_port != -1 + && !NetUtils.isPortAvailable(FrontendOptions.getLocalHostAddress(), Config.arrow_flight_sql_port, + "Arrow Flight SQL port", NetUtils.ARROW_FLIGHT_SQL_SUGGESTION)) { + throw new IOException("port " + Config.arrow_flight_sql_port + " already in use"); + } } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java index 7658cfae0715db..647e4caf570d92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java @@ -44,10 +44,10 @@ public class BackendsProcDir implements ProcDirInterface { private static final Logger LOG = LogManager.getLogger(BackendsProcDir.class); public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("BackendId") - .add("Host").add("HeartbeatPort").add("BePort").add("HttpPort").add("BrpcPort").add("LastStartTime") - .add("LastHeartbeat").add("Alive").add("SystemDecommissioned").add("TabletNum").add("DataUsedCapacity") - .add("TrashUsedCapcacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct").add("MaxDiskUsedPct") - .add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status") + .add("Host").add("HeartbeatPort").add("BePort").add("HttpPort").add("BrpcPort").add("ArrowFlightSqlPort") + .add("LastStartTime").add("LastHeartbeat").add("Alive").add("SystemDecommissioned").add("TabletNum") + .add("DataUsedCapacity").add("TrashUsedCapcacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct") + .add("MaxDiskUsedPct").add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status") .add("HeartbeatFailureCounter").add("NodeRole") .build(); @@ -107,6 +107,7 @@ public static List> getBackendInfos() { backendInfo.add(String.valueOf(backend.getBePort())); backendInfo.add(String.valueOf(backend.getHttpPort())); backendInfo.add(String.valueOf(backend.getBrpcPort())); + backendInfo.add(String.valueOf(backend.getArrowFlightSqlPort())); backendInfo.add(TimeUtils.longToTimeString(backend.getLastStartTime())); backendInfo.add(TimeUtils.longToTimeString(backend.getLastUpdateMs())); backendInfo.add(String.valueOf(backend.isAlive())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index e11cd81058cc2c..0e500259639c15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -46,7 +46,7 @@ public class FrontendsProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("Name").add("Host").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort") - .add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive") + .add("ArrowFlightSqlPort").add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive") .add("ReplayedJournalId").add("LastStartTime").add("LastHeartbeat") .add("IsHelper").add("ErrMsg").add("Version") .add("CurrentConnected") @@ -119,9 +119,11 @@ public static void getFrontendsInfo(Env env, List> infos) { if (fe.getHost().equals(env.getSelfNode().getHost())) { info.add(Integer.toString(Config.query_port)); info.add(Integer.toString(Config.rpc_port)); + info.add(Integer.toString(Config.arrow_flight_sql_port)); } else { info.add(Integer.toString(fe.getQueryPort())); info.add(Integer.toString(fe.getRpcPort())); + info.add(Integer.toString(fe.getArrowFlightSqlPort())); } info.add(fe.getRole().name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java index 334dd11564d35f..0c1ac130cdea0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java @@ -48,6 +48,8 @@ public class NetUtils { public static final String HTTPS_PORT_SUGGESTION = "Please change the 'https_port' in fe.conf and try again. " + "But you need to make sure that ALL FEs https_port are same."; public static final String RPC_PORT_SUGGESTION = "Please change the 'rpc_port' in fe.conf and try again."; + public static final String ARROW_FLIGHT_SQL_SUGGESTION = + "Please change the 'arrow_flight_sql_port' in fe.conf and try again."; // Target format is "host:port" public static InetSocketAddress createSocketAddr(String target) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java index b2878522edc0af..fb503f7feea11d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java @@ -40,6 +40,7 @@ * "data": { * "queryPort": 9030, * "rpcPort": 9020, + * "arrowFlightSqlPort": 9040, * "maxReplayedJournal": 17287 * }, * "count": 0 @@ -53,6 +54,7 @@ public class BootstrapFinishAction extends RestBaseController { public static final String REPLAYED_JOURNAL_ID = "replayedJournalId"; public static final String QUERY_PORT = "queryPort"; + public static final String ARROW_FLIGHT_SQL_PORT = "arrowFlightSqlPort"; public static final String RPC_PORT = "rpcPort"; public static final String VERSION = "version"; @@ -91,6 +93,7 @@ public ResponseEntity execute(HttpServletRequest request, HttpServletResponse re result.setReplayedJournalId(replayedJournalId); result.setQueryPort(Config.query_port); result.setRpcPort(Config.rpc_port); + result.setArrowFlightSqlPort(Config.arrow_flight_sql_port); result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH); } @@ -107,6 +110,7 @@ private static class BootstrapResult { private long replayedJournalId = 0; private int queryPort = 0; private int rpcPort = 0; + private int arrowFlightSqlPort = 0; private String version = ""; public BootstrapResult() { @@ -125,10 +129,18 @@ public void setQueryPort(int queryPort) { this.queryPort = queryPort; } + public void setArrowFlightSqlPort(int arrowFlightSqlPort) { + this.arrowFlightSqlPort = arrowFlightSqlPort; + } + public int getQueryPort() { return queryPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public void setRpcPort(int rpcPort) { this.rpcPort = rpcPort; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java index 983bafc8522419..929e46101321cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java @@ -65,6 +65,8 @@ public Object clusterInfo(HttpServletRequest request, HttpServletResponse respon result.put("mysql", frontends.stream().map(ip -> ip + ":" + Config.query_port).collect(Collectors.toList())); result.put("http", frontends.stream().map(ip -> ip + ":" + Config.http_port).collect(Collectors.toList())); + result.put("arrow flight sql server", frontends.stream().map( + ip -> ip + ":" + Config.arrow_flight_sql_port).collect(Collectors.toList())); return ResponseEntityBuilder.ok(result); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java index e6b0f872f4be19..ae48526515af88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java @@ -87,7 +87,8 @@ private void startService(String dorisHomeDir) throws Exception { httpServer.start(); // MySQl server - QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler()); + QeService qeService = new QeService(Config.query_port, Config.arrow_flight_sql_port, + ExecuteEnv.getInstance().getScheduler()); qeService.start(); ThreadPoolManager.registerAllThreadPoolMetric(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6a67a17e3c34f8..1982023cc5130f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -333,7 +333,8 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d public PlanFragment visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanTranslatorContext context) { PlanFragment planFragment = physicalResultSink.child().accept(this, context); - planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId())); + planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId(), + ConnectContext.get().getResultSinkType())); return planFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index a719081496b05b..7dc45029e7e509 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -268,6 +268,7 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment) mergePlan.init(ctx.getRootAnalyzer()); Preconditions.checkState(mergePlan.hasValidStats()); PlanFragment fragment = new PlanFragment(ctx.getNextFragmentId(), mergePlan, DataPartition.UNPARTITIONED); + fragment.setResultSinkType(ctx.getRootAnalyzer().getContext().getResultSinkType()); inputFragment.setDestination(mergePlan); return fragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 3d74bfc0dfc3d9..16be7e17a09810 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -31,6 +31,7 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragment; +import org.apache.doris.thrift.TResultSinkType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -145,6 +146,8 @@ public class PlanFragment extends TreeNode { // has colocate plan node private boolean hasColocatePlanNode = false; + private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; + /** * C'tor for fragment with specific partition; the output is by default broadcast. */ @@ -234,6 +237,10 @@ public void setHasColocatePlanNode(boolean hasColocatePlanNode) { this.hasColocatePlanNode = hasColocatePlanNode; } + public void setResultSinkType(TResultSinkType resultSinkType) { + this.resultSinkType = resultSinkType; + } + public boolean hasColocatePlanNode() { return hasColocatePlanNode; } @@ -269,7 +276,7 @@ public void finalize(StatementBase stmtBase) { } else { // add ResultSink // we're streaming to an result sink - sink = new ResultSink(planRoot.getId()); + sink = new ResultSink(planRoot.getId(), resultSinkType); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java index 49a4ca3333aa11..1b0b745223caa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java @@ -22,6 +22,7 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TResultSink; +import org.apache.doris.thrift.TResultSinkType; /** * Result sink that forwards data to @@ -33,10 +34,17 @@ public class ResultSink extends DataSink { // Two phase fetch option private TFetchOption fetchOption; + private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; + public ResultSink(PlanNodeId exchNodeId) { this.exchNodeId = exchNodeId; } + public ResultSink(PlanNodeId exchNodeId, TResultSinkType resultSinkType) { + this.exchNodeId = exchNodeId; + this.resultSinkType = resultSinkType; + } + @Override public String getExplainString(String prefix, TExplainLevel explainLevel) { StringBuilder strBuilder = new StringBuilder(); @@ -49,6 +57,7 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { strBuilder.append(prefix).append(" ").append("FETCH ROW STORE\n"); } } + strBuilder.append(prefix).append(" ").append(resultSinkType).append("\n"); return strBuilder.toString(); } @@ -63,6 +72,7 @@ protected TDataSink toThrift() { if (fetchOption != null) { tResultSink.setFetchOption(fetchOption); } + tResultSink.setType(resultSinkType); result.setResultSink(tResultSink); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index bf7c84a2c4db6c..5a427a108010ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -42,6 +42,7 @@ import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; import org.apache.doris.task.LoadTaskInfo; +import org.apache.doris.thrift.TResultSinkType; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionStatus; @@ -176,6 +177,8 @@ public class ConnectContext { private String workloadGroupName = ""; private Map insertGroupCommitTableToBeMap = new HashMap<>(); + private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -221,6 +224,10 @@ public MysqlSslContext getMysqlSslContext() { return mysqlSslContext; } + public TResultSinkType getResultSinkType() { + return resultSinkType; + } + public void setOrUpdateInsertResult(long txnId, String label, String db, String tbl, TransactionStatus txnStatus, long loadedRows, int filteredRows) { if (isTxnModel() && insertResult != null) { @@ -644,6 +651,10 @@ public void setStatementContext(StatementContext statementContext) { this.statementContext = statementContext; } + public void setResultSinkType(TResultSinkType resultSinkType) { + this.resultSinkType = resultSinkType; + } + // kill operation with no protect. public void kill(boolean killConnection) { LOG.warn("kill query from {}, kill connection: {}", getMysqlChannel().getRemoteHostPortString(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 48f71915310603..feff5b7ebec69e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; @@ -207,6 +208,11 @@ public class Coordinator implements CoordInterface { private final List needCheckBackendExecStates = Lists.newArrayList(); private final List needCheckPipelineExecContexts = Lists.newArrayList(); private ResultReceiver receiver; + private TNetworkAddress resultFlightServerAddr; + private TNetworkAddress resultInternalServiceAddr; + private ArrayList resultOutputExprs; + + private TUniqueId finstId; private final List scanNodes; private int scanRangeNum = 0; // number of instances of this query, equals to @@ -274,6 +280,22 @@ public ExecutionProfile getExecutionProfile() { return executionProfile; } + public TNetworkAddress getResultFlightServerAddr() { + return resultFlightServerAddr; + } + + public TNetworkAddress getResultInternalServiceAddr() { + return resultInternalServiceAddr; + } + + public ArrayList getResultOutputExprs() { + return resultOutputExprs; + } + + public TUniqueId getFinstId() { + return finstId; + } + // True if all scan node are ExternalScanNode. private boolean isAllExternalScan = true; @@ -598,6 +620,10 @@ public void exec() throws Exception { TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId, addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline); + finstId = topParams.instanceExecParams.get(0).instanceId; + resultFlightServerAddr = toArrowFlightHost(execBeAddr); + resultInternalServiceAddr = toBrpcHost(execBeAddr); + resultOutputExprs = fragments.get(0).getOutputExprs(); if (LOG.isDebugEnabled()) { LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host); @@ -1595,6 +1621,18 @@ private TNetworkAddress toBrpcHost(TNetworkAddress host) throws Exception { return new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); } + private TNetworkAddress toArrowFlightHost(TNetworkAddress host) throws Exception { + Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort( + host.getHostname(), host.getPort()); + if (backend == null) { + throw new UserException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); + } + if (backend.getArrowFlightSqlPort() < 0) { + return null; + } + return new TNetworkAddress(backend.getHost(), backend.getArrowFlightSqlPort()); + } + // estimate if this fragment contains UnionNode private boolean containsUnionNode(PlanNode node) { if (node instanceof UnionNode) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index 0ffb5b989d8370..50c422a1979340 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -247,10 +247,10 @@ private RowBatch getNextInternal(Status status, Backend backend) throws TExcepti while (pResult == null) { InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); Future futureResponse = - BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request); + BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(), request); long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { - LOG.warn("fetch result timeout {}", backend.getBrpcAdress()); + LOG.warn("fetch result timeout {}", backend.getBrpcAddress()); status.setStatus("query timeout"); return null; } @@ -265,18 +265,18 @@ private RowBatch getNextInternal(Status status, Backend backend) throws TExcepti } } catch (TimeoutException e) { futureResponse.cancel(true); - LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAdress()); + LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress()); status.setStatus("query timeout"); return null; } } } catch (RpcException e) { - LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAdress(), e); + LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAddress(), e); status.setRpcStatus(e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); return null; } catch (ExecutionException e) { - LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAdress()); + LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAddress()); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java index 36eb4e8db59c97..f1e9a653454120 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java @@ -18,6 +18,7 @@ package org.apache.doris.qe; import org.apache.doris.mysql.MysqlServer; +import org.apache.doris.service.arrowflight.FlightSqlService; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,13 +34,18 @@ public class QeService { // MySQL protocol service private MysqlServer mysqlServer; + private int arrowFlightSQLPort; + private FlightSqlService flightSqlService; + @Deprecated - public QeService(int port) { + public QeService(int port, int arrowFlightSQLPort) { this.port = port; + this.arrowFlightSQLPort = arrowFlightSQLPort; } - public QeService(int port, ConnectScheduler scheduler) { + public QeService(int port, int arrowFlightSQLPort, ConnectScheduler scheduler) { this.port = port; + this.arrowFlightSQLPort = arrowFlightSQLPort; this.mysqlServer = new MysqlServer(port, scheduler); } @@ -56,6 +62,14 @@ public void start() throws Exception { LOG.error("mysql server start failed"); System.exit(-1); } + if (arrowFlightSQLPort != -1) { + this.flightSqlService = new FlightSqlService(arrowFlightSQLPort); + if (!flightSqlService.start()) { + System.exit(-1); + } + } else { + LOG.info("No Arrow Flight SQL service that needs to be started."); + } LOG.info("QE service start."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ecedfa3aa289f9..6b30fc58e20f9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -146,6 +146,7 @@ import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.service.arrowflight.FlightStatementExecutor; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; import org.apache.doris.system.Backend; @@ -2597,7 +2598,8 @@ public List executeInternalQuery() { planner = new NereidsPlanner(statementContext); planner.plan(parsedStmt, context.getSessionVariable().toThrift()); } catch (Exception e) { - LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); + LOG.warn("Arrow Flight SQL fall back to legacy planner, because: {}", + e.getMessage(), e); parsedStmt = null; planner = null; context.getState().setNereids(false); @@ -2612,7 +2614,6 @@ public List executeInternalQuery() { LOG.warn("Failed to run internal SQL: {}", originStmt, e); throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); } - planner.getFragments(); RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); profile.addExecutionProfile(coord.getExecutionProfile()); @@ -2646,7 +2647,7 @@ public List executeInternalQuery() { } } catch (Exception e) { fetchResultSpan.recordException(e); - throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); + throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e); } finally { fetchResultSpan.end(); } @@ -2657,6 +2658,64 @@ public List executeInternalQuery() { } } + public void executeArrowFlightQuery(FlightStatementExecutor flightStatementExecutor) { + LOG.debug("ARROW FLIGHT QUERY: " + originStmt.toString()); + try { + try { + if (ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { + try { + parseByNereids(); + Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, + "Nereids only process LogicalPlanAdapter," + + " but parsedStmt is " + parsedStmt.getClass().getName()); + context.getState().setNereids(true); + context.getState().setIsQuery(true); + planner = new NereidsPlanner(statementContext); + planner.plan(parsedStmt, context.getSessionVariable().toThrift()); + } catch (Exception e) { + LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); + parsedStmt = null; + context.getState().setNereids(false); + analyzer = new Analyzer(context.getEnv(), context); + analyze(context.getSessionVariable().toThrift()); + } + } else { + analyzer = new Analyzer(context.getEnv(), context); + analyze(context.getSessionVariable().toThrift()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to execute Arrow Flight SQL. " + Util.getRootCauseMessage(e), e); + } + coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); + profile.addExecutionProfile(coord.getExecutionProfile()); + try { + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + } catch (UserException e) { + throw new RuntimeException("Failed to execute Arrow Flight SQL. " + Util.getRootCauseMessage(e), e); + } + + Span queryScheduleSpan = context.getTracer() + .spanBuilder("Arrow Flight SQL schedule").setParent(Context.current()).startSpan(); + try (Scope scope = queryScheduleSpan.makeCurrent()) { + coord.exec(); + } catch (Exception e) { + queryScheduleSpan.recordException(e); + LOG.warn("Failed to coord exec Arrow Flight SQL, because: {}", e.getMessage(), e); + throw new RuntimeException(e.getMessage() + Util.getRootCauseMessage(e), e); + } finally { + queryScheduleSpan.end(); + } + } finally { + QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); // TODO for query profile + } + flightStatementExecutor.setFinstId(coord.getFinstId()); + flightStatementExecutor.setResultFlightServerAddr(coord.getResultFlightServerAddr()); + flightStatementExecutor.setResultInternalServiceAddr(coord.getResultInternalServiceAddr()); + flightStatementExecutor.setResultOutputExprs(coord.getResultOutputExprs()); + } + private List convertResultBatchToResultRows(TResultBatch batch) { List columns = parsedStmt.getColLabels(); List resultRows = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index e1f5a2c95b9095..3250d186889260 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -100,6 +100,11 @@ public InternalService.PFetchDataResult fetchDataSync(InternalService.PFetchData return blockingStub.fetchData(request); } + public Future fetchArrowFlightSchema( + InternalService.PFetchArrowFlightSchemaRequest request) { + return stub.fetchArrowFlightSchema(request); + } + public Future fetchTableStructureAsync( InternalService.PFetchTableSchemaRequest request) { return stub.fetchTableSchema(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index b30b8e0c5a2f86..55881b4cf91418 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -259,6 +259,18 @@ public InternalService.PFetchDataResult fetchDataSync( } } + public Future fetchArrowFlightSchema( + TNetworkAddress address, InternalService.PFetchArrowFlightSchemaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.fetchArrowFlightSchema(request); + } catch (Throwable e) { + LOG.warn("fetch arrow flight schema catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future fetchTableStructureAsync( TNetworkAddress address, InternalService.PFetchTableSchemaRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b0796a372de69e..4238e012c5ef2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2226,6 +2226,7 @@ public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) th result.setReplayedJournalId(replayedJournalId); result.setQueryPort(Config.query_port); result.setRpcPort(Config.rpc_port); + result.setArrowFlightSqlPort(Config.arrow_flight_sql_port); result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH); result.setLastStartupTime(exeEnv.getStartupTime()); result.setProcessUUID(exeEnv.getProcessUUID()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java new file mode 100644 index 00000000000000..e0ec4bf10c08ee --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight; + +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +/** + * flight sql protocol implementation based on nio. + */ +public class FlightSqlService { + private static final Logger LOG = LogManager.getLogger(FlightSqlService.class); + private final FlightServer flightServer; + private volatile boolean running; + + public FlightSqlService(int port) { + BufferAllocator allocator = new RootAllocator(); + Location location = Location.forGrpcInsecure("0.0.0.0", port); + FlightSqlServiceImpl producer = new FlightSqlServiceImpl(location); + flightServer = FlightServer.builder(allocator, location, producer).build(); + } + + // start Arrow Flight SQL service, return true if success, otherwise false + public boolean start() { + try { + flightServer.start(); + running = true; + LOG.info("Arrow Flight SQL service is started."); + } catch (IOException e) { + LOG.error("Start Arrow Flight SQL service failed.", e); + return false; + } + return true; + } + + public void stop() { + if (running) { + running = false; + try { + flightServer.close(); + } catch (InterruptedException e) { + LOG.warn("close Arrow Flight SQL server failed.", e); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java new file mode 100644 index 00000000000000..38e275b1d5dd0a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java +// and modified by Doris + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.Util; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.Criteria; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.FlightSqlProducer; +import org.apache.arrow.flight.sql.SqlInfoBuilder; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCrossReference; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetExportedKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetImportedKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetXdbcTypeInfo; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate; +import org.apache.arrow.flight.sql.impl.FlightSql.SqlSupportedCaseSensitivity; +import org.apache.arrow.flight.sql.impl.FlightSql.TicketStatementQuery; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; + +public class FlightSqlServiceImpl implements FlightSqlProducer, AutoCloseable { + private static final Logger LOG = LogManager.getLogger(FlightSqlServiceImpl.class); + private final Location location; + private final BufferAllocator rootAllocator = new RootAllocator(); + private final SqlInfoBuilder sqlInfoBuilder; + + public FlightSqlServiceImpl(final Location location) { + this.location = location; + sqlInfoBuilder = new SqlInfoBuilder(); + sqlInfoBuilder.withFlightSqlServerName("DorisFE") + .withFlightSqlServerVersion("1.0") + .withFlightSqlServerArrowVersion("13.0") + .withFlightSqlServerReadOnly(false) + .withSqlIdentifierQuoteChar("`") + .withSqlDdlCatalog(true) + .withSqlDdlSchema(false) + .withSqlDdlTable(false) + .withSqlIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE) + .withSqlQuotedIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE); + } + + @Override + public void getStreamPreparedStatement(final CommandPreparedStatementQuery command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPreparedStatement unimplemented").toRuntimeException(); + } + + @Override + public void closePreparedStatement(final ActionClosePreparedStatementRequest request, final CallContext context, + final StreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("closePreparedStatement unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context, + final FlightDescriptor descriptor) { + try { + final String query = request.getQuery(); + final FlightStatementExecutor flightStatementExecutor = new FlightStatementExecutor(query); + + flightStatementExecutor.executeQuery(); + + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(ByteString.copyFromUtf8( + DebugUtil.printId(flightStatementExecutor.getFinstId()) + ":" + query)).build(); + final Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + // TODO Support multiple endpoints. + Location location = Location.forGrpcInsecure(flightStatementExecutor.getResultFlightServerAddr().hostname, + flightStatementExecutor.getResultFlightServerAddr().port); + List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); + + Schema schema; + schema = flightStatementExecutor.fetchArrowFlightSchema(5000); + if (schema == null) { + throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); + } + return new FlightInfo(schema, descriptor, endpoints, -1, -1); + } catch (Exception e) { + LOG.warn("get flight info statement failed, " + e.getMessage(), e); + throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException(); + } + } + + @Override + public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQuery command, + final CallContext context, + final FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED.withDescription("getFlightInfoPreparedStatement unimplemented") + .toRuntimeException(); + } + + @Override + public SchemaResult getSchemaStatement(final CommandStatementQuery command, final CallContext context, + final FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED.withDescription("getSchemaStatement unimplemented").toRuntimeException(); + } + + @Override + public void close() throws Exception { + AutoCloseables.close(rootAllocator); + } + + @Override + public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("listFlights unimplemented").toRuntimeException(); + } + + @Override + public void createPreparedStatement(final ActionCreatePreparedStatementRequest request, final CallContext context, + final StreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("createPreparedStatement unimplemented").toRuntimeException(); + } + + @Override + public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) { + throw CallStatus.UNIMPLEMENTED.withDescription("doExchange unimplemented").toRuntimeException(); + } + + @Override + public Runnable acceptPutStatement(CommandStatementUpdate command, + CallContext context, FlightStream flightStream, + StreamListener ackStream) { + throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutStatement unimplemented").toRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate command, CallContext context, + FlightStream flightStream, StreamListener ackStream) { + throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutPreparedStatementUpdate unimplemented") + .toRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, CallContext context, + FlightStream flightStream, StreamListener ackStream) { + throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutPreparedStatementQuery unimplemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoSqlInfo(final CommandGetSqlInfo request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA); + } + + @Override + public void getStreamSqlInfo(final CommandGetSqlInfo command, final CallContext context, + final ServerStreamListener listener) { + this.sqlInfoBuilder.send(command.getInfoList(), listener); + } + + @Override + public FlightInfo getFlightInfoTypeInfo(CommandGetXdbcTypeInfo request, CallContext context, + FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_TYPE_INFO_SCHEMA); + } + + @Override + public void getStreamTypeInfo(CommandGetXdbcTypeInfo request, CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTypeInfo unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA); + } + + @Override + public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCatalogs unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoSchemas(final CommandGetDbSchemas request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_SCHEMAS_SCHEMA); + } + + @Override + public void getStreamSchemas(final CommandGetDbSchemas command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamSchemas unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoTables(final CommandGetTables request, final CallContext context, + final FlightDescriptor descriptor) { + Schema schemaToUse = Schemas.GET_TABLES_SCHEMA; + if (!request.getIncludeSchema()) { + schemaToUse = Schemas.GET_TABLES_SCHEMA_NO_SCHEMA; + } + return getFlightInfoForSchema(request, descriptor, schemaToUse); + } + + @Override + public void getStreamTables(final CommandGetTables command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoTableTypes(final CommandGetTableTypes request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLE_TYPES_SCHEMA); + } + + @Override + public void getStreamTableTypes(final CallContext context, final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTableTypes unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_PRIMARY_KEYS_SCHEMA); + } + + @Override + public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final CallContext context, + final ServerStreamListener listener) { + + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoExportedKeys(final CommandGetExportedKeys request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_EXPORTED_KEYS_SCHEMA); + } + + @Override + public void getStreamExportedKeys(final CommandGetExportedKeys command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamExportedKeys unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoImportedKeys(final CommandGetImportedKeys request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_KEYS_SCHEMA); + } + + @Override + public void getStreamImportedKeys(final CommandGetImportedKeys command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamImportedKeys unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoCrossReference(CommandGetCrossReference request, CallContext context, + FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_CROSS_REFERENCE_SCHEMA); + } + + @Override + public void getStreamCrossReference(CommandGetCrossReference command, CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCrossReference unimplemented").toRuntimeException(); + } + + @Override + public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamStatement unimplemented").toRuntimeException(); + } + + private FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor, + final Schema schema) { + final Ticket ticket = new Ticket(Any.pack(request).toByteArray()); + // TODO Support multiple endpoints. + final List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); + + return new FlightInfo(schema, descriptor, endpoints, -1, -1); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java new file mode 100644 index 00000000000000..ced03350de4e05 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java +// and modified by Doris + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TResultSinkType; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TUniqueId; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public final class FlightStatementExecutor { + private AutoCloseConnectContext acConnectContext; + private final String query; + private TUniqueId queryId; + private TUniqueId finstId; + private TNetworkAddress resultFlightServerAddr; + private TNetworkAddress resultInternalServiceAddr; + private ArrayList resultOutputExprs; + + public FlightStatementExecutor(final String query) { + this.query = query; + acConnectContext = buildConnectContext(); + } + + public void setQueryId(TUniqueId queryId) { + this.queryId = queryId; + } + + public void setFinstId(TUniqueId finstId) { + this.finstId = finstId; + } + + public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) { + this.resultFlightServerAddr = resultFlightServerAddr; + } + + public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) { + this.resultInternalServiceAddr = resultInternalServiceAddr; + } + + public void setResultOutputExprs(ArrayList resultOutputExprs) { + this.resultOutputExprs = resultOutputExprs; + } + + public String getQuery() { + return query; + } + + public TUniqueId getQueryId() { + return queryId; + } + + public TUniqueId getFinstId() { + return finstId; + } + + public TNetworkAddress getResultFlightServerAddr() { + return resultFlightServerAddr; + } + + public TNetworkAddress getResultInternalServiceAddr() { + return resultInternalServiceAddr; + } + + public ArrayList getResultOutputExprs() { + return resultOutputExprs; + } + + @Override + public boolean equals(final Object other) { + if (!(other instanceof FlightStatementExecutor)) { + return false; + } + return this == other; + } + + @Override + public int hashCode() { + return Objects.hash(this); + } + + public static AutoCloseConnectContext buildConnectContext() { + ConnectContext connectContext = new ConnectContext(); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.internalSession = true; + sessionVariable.setEnablePipelineEngine(false); // TODO + sessionVariable.setEnablePipelineXEngine(false); // TODO + connectContext.setEnv(Env.getCurrentEnv()); + connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser()); // TODO + connectContext.setCurrentUserIdentity(UserIdentity.ROOT); // TODO + connectContext.setStartTime(); + connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER); + connectContext.setResultSinkType(TResultSinkType.ARROW_FLIGHT_PROTOCAL); + return new AutoCloseConnectContext(connectContext); + } + + public void executeQuery() { + try { + UUID uuid = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + setQueryId(queryId); + acConnectContext.connectContext.setQueryId(queryId); + StmtExecutor stmtExecutor = new StmtExecutor(acConnectContext.connectContext, getQuery()); + acConnectContext.connectContext.setExecutor(stmtExecutor); + stmtExecutor.executeArrowFlightQuery(this); + } catch (Exception e) { + throw new RuntimeException("Failed to coord exec", e); + } + } + + public Schema fetchArrowFlightSchema(int timeoutMs) { + TNetworkAddress address = getResultInternalServiceAddr(); + TUniqueId tid = getFinstId(); + ArrayList resultOutputExprs = getResultOutputExprs(); + Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + try { + InternalService.PFetchArrowFlightSchemaRequest request = + InternalService.PFetchArrowFlightSchemaRequest.newBuilder() + .setFinstId(finstId) + .build(); + + Future future + = BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request); + InternalService.PFetchArrowFlightSchemaResult pResult; + pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); + if (pResult == null) { + throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s", + DebugUtil.printId(tid))); + } + TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + Status status = null; + status.setPstatus(pResult.getStatus()); + throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", + DebugUtil.printId(tid), status)); + } + if (pResult.hasSchema() && pResult.getSchema().size() > 0) { + RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); + ArrowStreamReader arrowStreamReader = new ArrowStreamReader( + new ByteArrayInputStream(pResult.getSchema().toByteArray()), + rootAllocator + ); + try { + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + List fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() != resultOutputExprs.size()) { + throw new RuntimeException(String.format( + "Schema size %s' is not equal to arrow field size %s, finstId: %s.", + fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + } + return root.getSchema(); + } catch (Exception e) { + throw new RuntimeException("Read Arrow Flight Schema failed.", e); + } + } else { + throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s", + DebugUtil.printId(tid))); + } + } catch (RpcException e) { + throw new RuntimeException(String.format( + "arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (InterruptedException e) { + throw new RuntimeException(String.format( + "arrow flight schema future get interrupted exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (ExecutionException e) { + throw new RuntimeException(String.format( + "arrow flight schema future get execution exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (TimeoutException e) { + throw new RuntimeException(String.format( + "arrow flight schema fetch timeout, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index db470fb91ddcb6..fcb5e63e8383fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -76,6 +76,8 @@ public class Backend implements Writable { private volatile int beRpcPort; // be rpc port @SerializedName("brpcPort") private volatile int brpcPort = -1; + @SerializedName("arrowFlightSqlPort") + private volatile int arrowFlightSqlPort = -1; @SerializedName("lastUpdateMs") private volatile long lastUpdateMs; @@ -204,6 +206,10 @@ public int getBrpcPort() { return brpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public String getHeartbeatErrMsg() { return heartbeatErrMsg; } @@ -289,6 +295,10 @@ public void setBrpcPort(int brpcPort) { this.brpcPort = brpcPort; } + public void setArrowFlightSqlPort(int arrowFlightSqlPort) { + this.arrowFlightSqlPort = arrowFlightSqlPort; + } + public void setCpuCores(int cpuCores) { this.cpuCores = cpuCores; } @@ -670,6 +680,11 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) this.brpcPort = hbResponse.getBrpcPort(); } + if (this.arrowFlightSqlPort != hbResponse.getArrowFlightSqlPort() && !FeConstants.runningUnitTest) { + isChanged = true; + this.arrowFlightSqlPort = hbResponse.getArrowFlightSqlPort(); + } + if (this.isShutDown.get() != hbResponse.isShutDown()) { isChanged = true; LOG.info("{} shutdown state is changed", this.toString()); @@ -796,10 +811,14 @@ public Map getTagMap() { return tagMap; } - public TNetworkAddress getBrpcAdress() { + public TNetworkAddress getBrpcAddress() { return new TNetworkAddress(getHost(), getBrpcPort()); } + public TNetworkAddress getArrowFlightAddress() { + return new TNetworkAddress(getHost(), getArrowFlightSqlPort()); + } + public String getTagMapString() { return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index 18c5b94568b9c0..a91dd12b049b39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -37,6 +37,8 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { private int httpPort; @SerializedName(value = "brpcPort") private int brpcPort; + @SerializedName(value = "arrowFlightSqlPort") + private int arrowFlightSqlPort; @SerializedName(value = "nodeRole") private String nodeRole = Tag.VALUE_MIX; @@ -54,7 +56,7 @@ public BackendHbResponse() { } public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime, - String version, String nodeRole, boolean isShutDown) { + String version, String nodeRole, boolean isShutDown, int arrowFlightSqlPort) { super(HeartbeatResponse.Type.BACKEND); this.beId = beId; this.status = HbStatus.OK; @@ -66,6 +68,7 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long this.version = version; this.nodeRole = nodeRole; this.isShutDown = isShutDown; + this.arrowFlightSqlPort = arrowFlightSqlPort; } public BackendHbResponse(long beId, String errMsg) { @@ -99,6 +102,10 @@ public int getBrpcPort() { return brpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public long getBeStartTime() { return beStartTime; } @@ -122,6 +129,7 @@ protected void readFields(DataInput in) throws IOException { bePort = in.readInt(); httpPort = in.readInt(); brpcPort = in.readInt(); + arrowFlightSqlPort = in.readInt(); } @Override @@ -133,6 +141,7 @@ public String toString() { sb.append(", bePort: ").append(bePort); sb.append(", httpPort: ").append(httpPort); sb.append(", brpcPort: ").append(brpcPort); + sb.append(", arrowFlightSqlPort: ").append(arrowFlightSqlPort); return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index 95937b9d42013a..51e236974602b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -51,6 +51,7 @@ public class Frontend implements Writable { private int queryPort; private int rpcPort; + private int arrowFlightSqlPort; private long replayedJournalId; private long lastStartupTime; @@ -100,6 +101,10 @@ public int getRpcPort() { return rpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public boolean isAlive() { return isAlive; } @@ -153,6 +158,7 @@ public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean isReplay) version = hbResponse.getVersion(); queryPort = hbResponse.getQueryPort(); rpcPort = hbResponse.getRpcPort(); + arrowFlightSqlPort = hbResponse.getArrowFlightSqlPort(); replayedJournalId = hbResponse.getReplayedJournalId(); lastUpdateTime = hbResponse.getHbTime(); heartbeatErrMsg = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java index f7d7e90624d022..c9afcef49b37f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java @@ -39,6 +39,8 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { private int queryPort; @SerializedName(value = "rpcPort") private int rpcPort; + @SerializedName(value = "arrowFlightSqlPort") + private int arrowFlightSqlPort; @SerializedName(value = "replayedJournalId") private long replayedJournalId; private String version; @@ -50,7 +52,7 @@ public FrontendHbResponse() { super(HeartbeatResponse.Type.FRONTEND); } - public FrontendHbResponse(String name, int queryPort, int rpcPort, + public FrontendHbResponse(String name, int queryPort, int rpcPort, int arrowFlightSqlPort, long replayedJournalId, long hbTime, String version, long feStartTime, List diskInfos, long processUUID) { @@ -59,6 +61,7 @@ public FrontendHbResponse(String name, int queryPort, int rpcPort, this.name = name; this.queryPort = queryPort; this.rpcPort = rpcPort; + this.arrowFlightSqlPort = arrowFlightSqlPort; this.replayedJournalId = replayedJournalId; this.hbTime = hbTime; this.version = version; @@ -87,6 +90,10 @@ public int getRpcPort() { return rpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public long getReplayedJournalId() { return replayedJournalId; } @@ -113,6 +120,7 @@ public void readFields(DataInput in) throws IOException { name = Text.readString(in); queryPort = in.readInt(); rpcPort = in.readInt(); + arrowFlightSqlPort = in.readInt(); replayedJournalId = in.readLong(); } @@ -124,6 +132,7 @@ public String toString() { sb.append(", version: ").append(version); sb.append(", queryPort: ").append(queryPort); sb.append(", rpcPort: ").append(rpcPort); + sb.append(", arrowFlightSqlPort: ").append(arrowFlightSqlPort); sb.append(", replayedJournalId: ").append(replayedJournalId); sb.append(", festartTime: ").append(processUUID); return sb.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 7dc1275afe79a1..a285d529a268f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -238,6 +238,7 @@ public HeartbeatResponse call() { backendInfo.setHttpPort(2); backendInfo.setBeRpcPort(3); backendInfo.setBrpcPort(4); + backendInfo.setArrowFlightSqlPort(8); backendInfo.setVersion("test-1234"); result = new THeartbeatResult(); result.setStatus(new TStatus(TStatusCode.OK)); @@ -253,6 +254,10 @@ public HeartbeatResponse call() { if (tBackendInfo.isSetBrpcPort()) { brpcPort = tBackendInfo.getBrpcPort(); } + int arrowFlightSqlPort = -1; + if (tBackendInfo.isSetArrowFlightSqlPort()) { + arrowFlightSqlPort = tBackendInfo.getArrowFlightSqlPort(); + } String version = ""; if (tBackendInfo.isSetVersion()) { version = tBackendInfo.getVersion(); @@ -267,7 +272,7 @@ public HeartbeatResponse call() { isShutDown = tBackendInfo.isIsShutdown(); } return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, - System.currentTimeMillis(), beStartTime, version, nodeRole, isShutDown); + System.currentTimeMillis(), beStartTime, version, nodeRole, isShutDown, arrowFlightSqlPort); } else { return new BackendHbResponse(backendId, backend.getHost(), result.getStatus().getErrorMsgs().isEmpty() @@ -308,7 +313,8 @@ public HeartbeatResponse call() { // heartbeat to self if (Env.getCurrentEnv().isReady()) { return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port, - Env.getCurrentEnv().getMaxJournalId(), System.currentTimeMillis(), + Config.arrow_flight_sql_port, Env.getCurrentEnv().getMaxJournalId(), + System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH, ExecuteEnv.getInstance().getStartupTime(), ExecuteEnv.getInstance().getDiskInfos(), ExecuteEnv.getInstance().getProcessUUID()); @@ -331,7 +337,7 @@ private HeartbeatResponse getHeartbeatResponse() { ok = true; if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(), - result.getRpcPort(), result.getReplayedJournalId(), + result.getRpcPort(), result.getArrowFlightSqlPort(), result.getReplayedJournalId(), System.currentTimeMillis(), result.getVersion(), result.getLastStartupTime(), FeDiskInfo.fromThrifts(result.getDiskInfos()), result.getProcessUUID()); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 9cf9f5ad27988e..69af28dffda8f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -788,6 +788,7 @@ public void updateBackendState(Backend be) { memoryBe.setHttpPort(be.getHttpPort()); memoryBe.setBeRpcPort(be.getBeRpcPort()); memoryBe.setBrpcPort(be.getBrpcPort()); + memoryBe.setArrowFlightSqlPort(be.getArrowFlightSqlPort()); memoryBe.setLastUpdateMs(be.getLastUpdateMs()); memoryBe.setLastStartTime(be.getLastStartTime()); memoryBe.setDisks(be.getDisks()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java index 3141c240fb31c4..4e6fe27398cf38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java @@ -45,6 +45,7 @@ public class BackendsTableValuedFunction extends MetadataTableValuedFunction { new Column("BePort", ScalarType.createType(PrimitiveType.INT)), new Column("HttpPort", ScalarType.createType(PrimitiveType.INT)), new Column("BrpcPort", ScalarType.createType(PrimitiveType.INT)), + new Column("ArrowFlightSqlPort", ScalarType.createType(PrimitiveType.INT)), new Column("LastStartTime", ScalarType.createStringType()), new Column("LastHeartbeat", ScalarType.createStringType()), new Column("Alive", ScalarType.createType(PrimitiveType.BOOLEAN)), diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java index 92109d05a9a71c..d23c9dfd6fdc41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java @@ -44,6 +44,7 @@ public class FrontendsTableValuedFunction extends MetadataTableValuedFunction { new Column("HttpPort", ScalarType.createStringType()), new Column("QueryPort", ScalarType.createStringType()), new Column("RpcPort", ScalarType.createStringType()), + new Column("ArrowFlightSqlPort", ScalarType.createStringType()), new Column("Role", ScalarType.createStringType()), new Column("IsMaster", ScalarType.createStringType()), new Column("ClusterId", ScalarType.createStringType()), diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java index 129c3f930c75bc..bf78faec95eb13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java @@ -92,7 +92,7 @@ private void getFileListFromBackend() throws AnalysisException { } BackendServiceProxy proxy = BackendServiceProxy.getInstance(); - TNetworkAddress address = be.getBrpcAdress(); + TNetworkAddress address = be.getBrpcAddress(); InternalService.PGlobRequest.Builder requestBuilder = InternalService.PGlobRequest.newBuilder(); requestBuilder.setPattern(filePath); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index a2d2599599ab7b..7b9d3f892eacc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -188,6 +188,7 @@ private static TFetchSchemaTableDataResult backendsMetadataResult(TMetadataTable trow.addToColumnValue(new TCell().setIntVal(backend.getBePort())); trow.addToColumnValue(new TCell().setIntVal(backend.getHttpPort())); trow.addToColumnValue(new TCell().setIntVal(backend.getBrpcPort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getArrowFlightSqlPort())); } trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime()))); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs()))); diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java index d8c3877706b9e5..0ad1b0bb645d8f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java @@ -95,6 +95,7 @@ public void invalidateObject(TNetworkAddress address, FrontendService.Client obj normalResult.setReplayedJournalId(191224); normalResult.setQueryPort(9131); normalResult.setRpcPort(9121); + normalResult.setArrowFlightSqlPort(9141); normalResult.setVersion("test"); TFrontendPingFrontendRequest badRequest = new TFrontendPingFrontendRequest(12345, "abcde"); @@ -123,6 +124,7 @@ public void invalidateObject(TNetworkAddress address, FrontendService.Client obj Assert.assertEquals(191224, hbResponse.getReplayedJournalId()); Assert.assertEquals(9131, hbResponse.getQueryPort()); Assert.assertEquals(9121, hbResponse.getRpcPort()); + Assert.assertEquals(9141, hbResponse.getArrowFlightSqlPort()); Assert.assertEquals(HbStatus.OK, hbResponse.getStatus()); Assert.assertEquals("test", hbResponse.getVersion()); @@ -135,6 +137,7 @@ public void invalidateObject(TNetworkAddress address, FrontendService.Client obj Assert.assertEquals(0, hbResponse.getReplayedJournalId()); Assert.assertEquals(0, hbResponse.getQueryPort()); Assert.assertEquals(0, hbResponse.getRpcPort()); + Assert.assertEquals(0, hbResponse.getArrowFlightSqlPort()); Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus()); Assert.assertEquals("not ready", hbResponse.getMsg()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 3c50bd47c8de13..7c5556e8cf2d49 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -97,7 +97,7 @@ public void testBackendHbResponseSerialization() throws IOException { System.out.println(Env.getCurrentEnvJournalVersion()); BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234, 1234, 1234, 1234, 1234, "test", - Tag.VALUE_COMPUTATION, false); + Tag.VALUE_COMPUTATION, false, 1234); // Write objects to file File file1 = new File("./BackendHbResponseSerialization"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java index 6f3cf22d4444d4..98a3893123e4fb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java @@ -52,12 +52,14 @@ public class AnotherDemoTest { private static int fe_http_port; private static int fe_rpc_port; private static int fe_query_port; + private static int fe_arrow_flight_sql_port; private static int fe_edit_log_port; private static int be_heartbeat_port; private static int be_thrift_port; private static int be_brpc_port; private static int be_http_port; + private static int be_arrow_flight_sql_port; // use a unique dir so that it won't be conflict with other unit test which // may also start a Mocked Frontend @@ -81,12 +83,14 @@ private static void getPorts() { fe_http_port = UtFrameUtils.findValidPort(); fe_rpc_port = UtFrameUtils.findValidPort(); fe_query_port = UtFrameUtils.findValidPort(); + fe_arrow_flight_sql_port = UtFrameUtils.findValidPort(); fe_edit_log_port = UtFrameUtils.findValidPort(); be_heartbeat_port = UtFrameUtils.findValidPort(); be_thrift_port = UtFrameUtils.findValidPort(); be_brpc_port = UtFrameUtils.findValidPort(); be_http_port = UtFrameUtils.findValidPort(); + be_arrow_flight_sql_port = UtFrameUtils.findValidPort(); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 83c75f052b1211..10f94adeb8ca9d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -198,7 +198,8 @@ public void testCreateDbAndTable() throws Exception { BackendsProcDir dir = new BackendsProcDir(Env.getCurrentSystemInfo()); ProcResult result = dir.fetchResult(); Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size()); - Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(18)); + Assert.assertEquals("{\"location\" : \"default\"}", + result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 6)); Assert.assertEquals( "{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 3)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 3a09cae73bb434..55abcf9542acbe 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -86,10 +86,11 @@ public class MockedBackendFactory { public static final int BE_DEFAULT_THRIFT_PORT = 9060; public static final int BE_DEFAULT_BRPC_PORT = 8060; public static final int BE_DEFAULT_HTTP_PORT = 8040; + public static final int BE_DEFAULT_ARROW_FLIGHT_SQL_PORT = 8070; // create a mocked backend with customize parameters public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, - int httpPort, + int httpPort, int arrowFlightSqlPort, HeartbeatService.Iface hbService, BeThriftService beThriftService, PBackendServiceGrpc.PBackendServiceImplBase pBackendService) throws IOException { @@ -105,16 +106,20 @@ public static class DefaultHeartbeatServiceImpl implements HeartbeatService.Ifac private int beHttpPort; private int beBrpcPort; - public DefaultHeartbeatServiceImpl(int beThriftPort, int beHttpPort, int beBrpcPort) { + private int beArrowFlightSqlPort; + + public DefaultHeartbeatServiceImpl(int beThriftPort, int beHttpPort, int beBrpcPort, int beArrowFlightSqlPort) { this.beThriftPort = beThriftPort; this.beHttpPort = beHttpPort; this.beBrpcPort = beBrpcPort; + this.beArrowFlightSqlPort = beArrowFlightSqlPort; } @Override public THeartbeatResult heartbeat(TMasterInfo masterInfo) throws TException { TBackendInfo backendInfo = new TBackendInfo(beThriftPort, beHttpPort); backendInfo.setBrpcPort(beBrpcPort); + backendInfo.setArrowFlightSqlPort(beArrowFlightSqlPort); THeartbeatResult result = new THeartbeatResult(new TStatus(TStatusCode.OK), backendInfo); return result; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java index 4ee38199713844..6382621d9051a5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java @@ -79,6 +79,7 @@ public class MockedFrontend { MIN_FE_CONF.put("http_port", "8030"); MIN_FE_CONF.put("rpc_port", "9020"); MIN_FE_CONF.put("query_port", "9030"); + MIN_FE_CONF.put("arrow_flight_sql_port", "9040"); MIN_FE_CONF.put("edit_log_port", "9010"); MIN_FE_CONF.put("priority_networks", "127.0.0.1/24"); MIN_FE_CONF.put("sys_log_verbose_modules", "org"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index ec0a87ef346970..8861112624f06f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -364,12 +364,14 @@ protected int startFEServerWithoutRetry(String runningDir) int feHttpPort = findValidPort(); int feRpcPort = findValidPort(); int feQueryPort = findValidPort(); + int arrowFlightSqlPort = findValidPort(); int feEditLogPort = findValidPort(); Map feConfMap = Maps.newHashMap(); // set additional fe config feConfMap.put("http_port", String.valueOf(feHttpPort)); feConfMap.put("rpc_port", String.valueOf(feRpcPort)); feConfMap.put("query_port", String.valueOf(feQueryPort)); + feConfMap.put("arrow_flight_sql_port", String.valueOf(arrowFlightSqlPort)); feConfMap.put("edit_log_port", String.valueOf(feEditLogPort)); feConfMap.put("tablet_create_timeout_second", "10"); // start fe in "DORIS_HOME/fe/mocked/" @@ -449,10 +451,11 @@ private Backend createBackendWithoutRetry(String beHost, int feRpcPort) throws I int beThriftPort = findValidPort(); int beBrpcPort = findValidPort(); int beHttpPort = findValidPort(); + int beArrowFlightSqlPort = findValidPort(); // start be MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, - beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), + beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort, beArrowFlightSqlPort), new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -471,6 +474,7 @@ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort be.setBePort(beThriftPort); be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); + be.setArrowFlightSqlPort(beArrowFlightSqlPort); Env.getCurrentSystemInfo().addBackend(be); return be; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index 2e2d53edb7a952..407171a69c7ec9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -193,6 +193,7 @@ private static int startFEServerWithoutRetry(String runningDir) throws EnvVarNot int feHttpPort = findValidPort(); int feRpcPort = findValidPort(); int feQueryPort = findValidPort(); + int arrowFlightSqlPort = findValidPort(); int feEditLogPort = findValidPort(); // start fe in "DORIS_HOME/fe/mocked/" @@ -202,6 +203,7 @@ private static int startFEServerWithoutRetry(String runningDir) throws EnvVarNot feConfMap.put("http_port", String.valueOf(feHttpPort)); feConfMap.put("rpc_port", String.valueOf(feRpcPort)); feConfMap.put("query_port", String.valueOf(feQueryPort)); + feConfMap.put("arrow_flight_sql_port", String.valueOf(arrowFlightSqlPort)); feConfMap.put("edit_log_port", String.valueOf(feEditLogPort)); feConfMap.put("tablet_create_timeout_second", "10"); frontend.init(dorisHome + "/" + runningDir, feConfMap); @@ -278,10 +280,11 @@ private static Backend createBackendWithoutRetry(String beHost, int feRpcPort) t int beThriftPort = findValidPort(); int beBrpcPort = findValidPort(); int beHttpPort = findValidPort(); + int beArrowFlightSqlPort = findValidPort(); // start be MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, - beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), + beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort, beArrowFlightSqlPort), new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -299,6 +302,7 @@ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort be.setBePort(beThriftPort); be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); + be.setArrowFlightSqlPort(beArrowFlightSqlPort); Env.getCurrentSystemInfo().addBackend(be); return be; } diff --git a/fe/pom.xml b/fe/pom.xml index a9ba53de1e2274..3e04562d551f03 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -233,16 +233,17 @@ under the License. 2.18.0 2.0.6 4.0.2 - 4.1.94.Final + + 4.1.96.Final 3.10.6.Final 2.1 - 1.30.0 + 1.56.0 3.32.0 - 3.21.12 + 3.24.3 - 3.21.9 + 3.24.3 com.google.protobuf:protoc:${protoc.artifact.version} io.grpc:protoc-gen-grpc-java:${grpc.version} 3.1.5 @@ -275,7 +276,7 @@ under the License. 1.1.0 3.0.0rc1 0.43.3-public - 9.0.0 + 13.0.0 1.11.1 0.13.1 @@ -313,6 +314,9 @@ under the License. 0.4.0-incubating 3.4.4 + + shade-format-flatbuffers + 1.12.0 @@ -1426,6 +1430,65 @@ under the License. client ${vesoft.client.version} + + io.grpc + grpc-netty + ${grpc.version} + + + io.grpc + grpc-core + ${grpc.version} + + + io.grpc + grpc-context + ${grpc.version} + + + io.netty + netty-tcnative-boringssl-static + + + io.grpc + grpc-api + ${grpc.version} + + + com.google.flatbuffers + flatbuffers-java + ${flatbuffers.version} + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + + + org.apache.arrow + flight-core + ${arrow.version} + + + org.apache.arrow + flight-sql + ${arrow.version} + + + org.apache.arrow + arrow-memory-core + ${arrow.version} + + + org.apache.arrow + arrow-jdbc + ${arrow.version} + diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index a3d14593144e61..878544e74b0bd4 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -249,6 +249,16 @@ message PFetchDataResult { optional bool empty_batch = 6; }; +message PFetchArrowFlightSchemaRequest { + optional PUniqueId finst_id = 1; +}; + +message PFetchArrowFlightSchemaResult { + optional PStatus status = 1; + // valid when status is ok + optional bytes schema = 2; +}; + message KeyTuple { repeated string key_column_rep = 1; } @@ -812,5 +822,6 @@ service PBackendService { rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns (PReportStreamLoadStatusResponse); rpc glob(PGlobRequest) returns (PGlobResponse); rpc group_commit_insert(PGroupCommitInsertRequest) returns (PGroupCommitInsertResponse); + rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); }; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e5c7b9bb0b600b..6c85c0290cc014 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -824,6 +824,7 @@ struct TFrontendPingFrontendResult { 7: optional i64 lastStartupTime 8: optional list diskInfos 9: optional i64 processUUID + 10: optional i32 arrowFlightSqlPort } struct TPropertyVal { diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 7fcf45804d85d3..5a7e47d982b8fb 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -50,6 +50,7 @@ struct TBackendInfo { 6: optional i64 be_start_time // This field will also be uesd to identify a be process 7: optional string be_node_role 8: optional bool is_shutdown + 9: optional Types.TPort arrow_flight_sql_port } struct THeartbeatResult { diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out b/regression-test/data/performance_p0/redundant_conjuncts.out index f82e0c94536911..6a7fd4fd9318ac 100644 --- a/regression-test/data/performance_p0/redundant_conjuncts.out +++ b/regression-test/data/performance_p0/redundant_conjuncts.out @@ -6,6 +6,7 @@ PLAN FRAGMENT 0 PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` VRESULT SINK + MYSQL_PROTOCAL 0:VOlapScanNode TABLE: default_cluster:regression_test_performance_p0.redundant_conjuncts(redundant_conjuncts), PREAGGREGATION: OFF. Reason: No AggregateInfo @@ -21,6 +22,7 @@ PLAN FRAGMENT 0 PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` VRESULT SINK + MYSQL_PROTOCAL 0:VOlapScanNode TABLE: default_cluster:regression_test_performance_p0.redundant_conjuncts(redundant_conjuncts), PREAGGREGATION: OFF. Reason: No AggregateInfo diff --git a/regression-test/suites/demo_p0/httpTest_action.groovy b/regression-test/suites/demo_p0/httpTest_action.groovy index 6d03e081f8f131..3120a92b5f864c 100644 --- a/regression-test/suites/demo_p0/httpTest_action.groovy +++ b/regression-test/suites/demo_p0/httpTest_action.groovy @@ -24,7 +24,7 @@ suite("http_test_action") { def backendIdToBackendIP = [:] def backendIdToBackendBrpcPort = [:] for (String[] backend in backends) { - if (backend[8].equals("true")) { + if (backend[9].equals("true")) { backendIdToBackendIP.put(backend[0], backend[1]) backendIdToBackendBrpcPort.put(backend[0], backend[5]) } diff --git a/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy index be497ee25a881f..2f6f774ad85332 100644 --- a/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy @@ -19,7 +19,7 @@ suite("test_backends_tvf","p0,external,tvf,external_docker") { List> table = sql """ select * from backends(); """ assertTrue(table.size() > 0) - assertEquals(24, table[0].size) + assertEquals(25, table[0].size) // filter columns table = sql """ select BackendId, Host, Alive, TotalCapacity, Version, NodeRole from backends();""" diff --git a/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy index e247f8bdf1a1c0..0f7a4f1b2d3c21 100644 --- a/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy @@ -19,7 +19,7 @@ suite("test_frontends_tvf","p0,external,tvf,external_docker") { List> table = sql """ select * from `frontends`(); """ assertTrue(table.size() > 0) - assertTrue(table[0].size == 17) + assertTrue(table[0].size == 18) // filter columns table = sql """ select Name from `frontends`();""" @@ -43,7 +43,7 @@ suite("test_frontends_tvf","p0,external,tvf,external_docker") { assertTrue(res[0][0] > 0) sql """ select Name, Host, EditLogPort - HttpPort, QueryPort, RpcPort, `Role`, IsMaster, ClusterId + HttpPort, QueryPort, RpcPort, ArrowFlightSqlPort, `Role`, IsMaster, ClusterId `Join`, Alive, ReplayedJournalId, LastHeartbeat IsHelper, ErrMsg, Version, CurrentConnected from frontends(); """ diff --git a/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy b/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy index c5eb2689bdedaa..4a3b99c1d06f2d 100644 --- a/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy +++ b/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy @@ -134,7 +134,7 @@ suite("test_map_load_and_compaction", "p0") { backends = sql """ show backends; """ assertTrue(backends.size() > 0) for (String[] b : backends) { - assertEquals("true", b[8]) + assertEquals("true", b[9]) } } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") diff --git a/regression-test/suites/nereids_syntax_p0/information_schema.groovy b/regression-test/suites/nereids_syntax_p0/information_schema.groovy index c4fead201798d5..59ab91ab97d9ef 100644 --- a/regression-test/suites/nereids_syntax_p0/information_schema.groovy +++ b/regression-test/suites/nereids_syntax_p0/information_schema.groovy @@ -18,7 +18,7 @@ suite("information_schema") { List> table = sql """ select * from backends(); """ assertTrue(table.size() > 0) - assertTrue(table[0].size == 24) + assertTrue(table[0].size == 25) sql "SELECT DATABASE();" sql "select USER();"