Skip to content

Commit

Permalink
[feature-wip](arrow-flight)(step2) FE support Arrow Flight server (ap…
Browse files Browse the repository at this point in the history
…ache#24314)

This is a POC, the design documentation will be updated soon
  • Loading branch information
xinyiZzz authored Sep 20, 2023
1 parent a3361df commit fc12362
Show file tree
Hide file tree
Showing 88 changed files with 1,226 additions and 71 deletions.
1 change: 1 addition & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
heartbeat_result.backend_info.__set_http_port(config::webserver_port);
heartbeat_result.backend_info.__set_be_rpc_port(-1);
heartbeat_result.backend_info.__set_brpc_port(config::brpc_port);
heartbeat_result.backend_info.__set_arrow_flight_sql_port(config::arrow_flight_sql_port);
heartbeat_result.backend_info.__set_version(get_short_version());
heartbeat_result.backend_info.__set_be_start_time(_be_epoch);
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ DEFINE_Int32(be_port, "9060");
// port for brpc
DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_port, "-1");
DEFINE_Int32(arrow_flight_sql_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
6 changes: 3 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ DECLARE_Int32(be_port);
// port for brpc
DECLARE_Int32(brpc_port);

// port for arrow flight
// Default -1, do not start arrow flight server.
DECLARE_Int32(arrow_flight_port);
// port for arrow flight sql
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
_fe_result_batch_queue.push_back(std::move(result));
}
_buffer_rows += num_rows;
_data_arrival.notify_one();
} else {
auto ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
Expand Down
40 changes: 23 additions & 17 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <stdint.h>

#include <chrono>
#include <thread>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <memory>
Expand All @@ -33,6 +36,7 @@
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/thread.h"
#include "util/uid_util.h"

namespace doris {

Expand All @@ -42,7 +46,7 @@ ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) {
// Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the
// actual size of all BufferControlBlock.
REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
// std::lock_guard<std::mutex> l(_buffer_map_lock);
return _buffer_map.size();
});
}
Expand Down Expand Up @@ -80,7 +84,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
}

{
std::lock_guard<std::mutex> l(_lock);
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
_buffer_map.insert(std::make_pair(query_id, control_block));
// BufferControlBlock should destroy after max_timeout
// for exceed max_timeout FE will return timeout to client
Expand All @@ -95,8 +99,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
}

std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TUniqueId& query_id) {
// TODO(zhaochun): this lock can be bottleneck?
std::lock_guard<std::mutex> l(_lock);
std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
Expand All @@ -108,14 +111,12 @@ std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TU

void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id,
const RowDescriptor& row_desc) {
{
std::lock_guard<std::mutex> l(_lock);
_row_descriptor_map.insert(std::make_pair(query_id, row_desc));
}
std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
_row_descriptor_map.insert(std::make_pair(query_id, row_desc));
}

RowDescriptor ResultBufferMgr::find_row_descriptor(const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_lock);
std::shared_lock<std::shared_mutex> rlock(_row_descriptor_map_lock);
RowDescriptorMap::iterator iter = _row_descriptor_map.find(query_id);

if (_row_descriptor_map.end() != iter) {
Expand Down Expand Up @@ -150,18 +151,23 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
}

Status ResultBufferMgr::cancel(const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
_buffer_map.erase(iter);
if (_buffer_map.end() != iter) {
iter->second->cancel();
_buffer_map.erase(iter);
}
}

RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id);
{
std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id);

if (_row_descriptor_map.end() != row_desc_iter) {
_row_descriptor_map.erase(row_desc_iter);
if (_row_descriptor_map.end() != row_desc_iter) {
_row_descriptor_map.erase(row_desc_iter);
}
}

return Status::OK();
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/result_buffer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -86,9 +87,11 @@ class ResultBufferMgr {
void cancel_thread();

// lock for buffer map
std::mutex _lock;
std::shared_mutex _buffer_map_lock;
// buffer block map
BufferMap _buffer_map;
// lock for descriptor map
std::shared_mutex _row_descriptor_map_lock;
// for arrow flight
RowDescriptorMap _row_descriptor_map;

Expand Down
9 changes: 7 additions & 2 deletions be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,17 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::C
}

arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
CHECK(*out == nullptr);
// *out not nullptr
*out = nullptr;
auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out);
if (UNLIKELY(!st.ok())) {
LOG(WARNING) << st.to_string();
LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string();
ARROW_RETURN_NOT_OK(to_arrow_status(st));
}
if (*out != nullptr) {
VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", "
<< (*out)->num_columns();
}
return arrow::Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ int main(int argc, char** argv) {
// 5. arrow flight service
std::shared_ptr<doris::flight::FlightSqlServer> flight_server =
std::move(doris::flight::FlightSqlServer::create()).ValueOrDie();
status = flight_server->init(doris::config::arrow_flight_port);
status = flight_server->init(doris::config::arrow_flight_sql_port);

// 6. start daemon thread to do clean or gc jobs
doris::Daemon daemon;
Expand Down
35 changes: 35 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "service/point_query_executor.h"
#include "util/arrow/row_batch.h"
#include "util/async_io.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -704,6 +705,40 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
}
}

void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
const PFetchArrowFlightSchemaRequest* request,
PFetchArrowFlightSchemaResult* result,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([request, result, done]() {
brpc::ClosureGuard closure_guard(done);
RowDescriptor row_desc = ExecEnv::GetInstance()->result_mgr()->find_row_descriptor(
UniqueId(request->finst_id()).to_thrift());
if (row_desc.equals(RowDescriptor())) {
auto st = Status::NotFound("not found row descriptor");
st.to_protobuf(result->mutable_status());
return;
}

std::shared_ptr<arrow::Schema> schema;
auto st = convert_to_arrow_schema(row_desc, &schema);
if (UNLIKELY(!st.ok())) {
st.to_protobuf(result->mutable_status());
return;
}

std::string schema_str;
st = serialize_arrow_schema(row_desc, &schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
}
st.to_protobuf(result->mutable_status());
});
if (!ret) {
offer_failed(result, done, _heavy_work_pool);
return;
}
}

Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request,
PTabletKeyLookupResponse* response) {
PointQueryExecutor lookup_util;
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class PInternalServiceImpl : public PBackendService {
PFetchTableSchemaResult* result,
google::protobuf::Closure* done) override;

void fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
const PFetchArrowFlightSchemaRequest* request,
PFetchArrowFlightSchemaResult* result,
google::protobuf::Closure* done) override;

void tablet_writer_open(google::protobuf::RpcController* controller,
const PTabletWriterOpenRequest* request,
PTabletWriterOpenResult* response,
Expand Down
14 changes: 14 additions & 0 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "util/arrow/block_convertor.h"
#include "vec/core/block.h"

namespace doris {

Expand Down Expand Up @@ -188,4 +190,16 @@ Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::strin
return Status::OK();
}

Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr<arrow::Schema>* schema,
std::string* result) {
std::vector<SlotDescriptor*> slots;
for (auto tuple_desc : row_desc.tuple_descriptors()) {
slots.insert(slots.end(), tuple_desc->slots().begin(), tuple_desc->slots().end());
}
auto block = vectorized::Block(slots, 0);
std::shared_ptr<arrow::RecordBatch> batch;
RETURN_IF_ERROR(convert_to_arrow_batch(block, *schema, arrow::default_memory_pool(), &batch));
return serialize_record_batch(*batch, result);
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/util/arrow/row_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,

Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);

Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr<arrow::Schema>* schema,
std::string* result);

} // namespace doris
2 changes: 1 addition & 1 deletion conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ be_port = 9060
webserver_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
arrow_flight_port = -1
arrow_flight_sql_port = -1

# HTTPS configures
enable_https = false
Expand Down
1 change: 1 addition & 0 deletions conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ http_port = 8030
rpc_port = 9020
query_port = 9030
edit_log_port = 9010
arrow_flight_sql_port = -1

# Choose one if there are more than one ip except loopback address.
# Note that there should at most one ip match this list.
Expand Down
1 change: 1 addition & 0 deletions docs/en/community/developer-guide/be-vscode-dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ be_rpc_port = 9070
webserver_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
arrow_flight_sql_port = -1
# Note that there should be at most one ip that matches this list.
# If no ip matches this rule, it will choose one randomly.
Expand Down
1 change: 1 addition & 0 deletions docs/en/community/developer-guide/fe-idea-dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ sys_log_level = INFO
http_port = 8030
rpc_port = 9020
query_port = 9030
arrow_flight_sql_port = -1
edit_log_port = 9010

# Choose one if there are more than one ip except loopback address.
Expand Down
1 change: 1 addition & 0 deletions docs/en/docs/admin-manual/cluster-management/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ admin set frontend config("disable_tablet_scheduler" = "true");
http_port = 18030
rpc_port = 19020
query_port = 19030
arrow_flight_sql_port = 19040
edit_log_port = 19010
...
```
Expand Down
6 changes: 6 additions & 0 deletions docs/en/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ There are two ways to configure BE configuration items:
* Description: The port of BRPC on BE, used for communication between BEs
* Default value: 8060
#### `arrow_flight_sql_port`
* Type: int32
* Description: The port of Arrow Flight SQL server on BE, used for communication between Arrow Flight Client and BE
* Default value: -1
#### `enable_https`
* Type: bool
Expand Down
6 changes: 6 additions & 0 deletions docs/en/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ Default:9030
FE MySQL server port
#### `arrow_flight_sql_port`
Default:-1
Arrow Flight SQL server port
#### `frontend_address`
Status: Deprecated, not recommended use. This parameter may be deleted later
Expand Down
3 changes: 3 additions & 0 deletions docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ none
"data": {
"queryPort": 9030,
"rpcPort": 9020,
"arrowFlightSqlPort": 9040,
"maxReplayedJournal": 17287
},
"count": 0
Expand All @@ -85,6 +86,7 @@ none
* `queryPort` is the MySQL protocol port of the FE node.
* `rpcPort` is the thrift RPC port of the FE node.
* `maxReplayedJournal` represents the maximum metadata journal id currently played back by the FE node.
* `arrowFlightSqlPort` is the Arrow Flight SQL port of the FE node.
## Examples
Expand Down Expand Up @@ -114,6 +116,7 @@ none
"data": {
"queryPort": 9030,
"rpcPort": 9020,
"arrowFlightSqlPort": 9040,
"maxReplayedJournal": 17287
},
"count": 0
Expand Down
1 change: 1 addition & 0 deletions docs/en/docs/admin-manual/http-actions/fe/node-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ frontends:
"HttpPort",
"QueryPort",
"RpcPort",
"ArrowFlightSqlPort",
"Role",
"IsMaster",
"ClusterId",
Expand Down
5 changes: 5 additions & 0 deletions docs/en/docs/admin-manual/maint-monitor/metadata-operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ FE currently has the following ports
* http_port: http port, also used to push image
* rpc_port: thrift server port of Frontend
* query_port: Mysql connection port
* arrow_flight_sql_port: Arrow Flight SQL connection port

1. edit_log_port

Expand All @@ -256,6 +257,10 @@ FE currently has the following ports

After modifying the configuration, restart FE directly. This only affects mysql's connection target.

5. arrow_flight_sql_port

After modifying the configuration, restart FE directly. This only affects arrow flight sql server connection target.

### Recover metadata from FE memory
In some extreme cases, the image file on the disk may be damaged, but the metadata in the memory is intact. At this point, we can dump the metadata from the memory and replace the image file on the disk to recover the metadata. the entire non-stop query service operation steps are as follows:

Expand Down
Loading

0 comments on commit fc12362

Please sign in to comment.