From c083f518776197cc1151a3b4b882756e4285814d Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Fri, 31 Mar 2023 10:30:36 +0800 Subject: [PATCH] feat: impl get_table_info in remote_engine_client (#798) --- remote_engine_client/src/client.rs | 82 +++++++++++++++++++++++++++--- remote_engine_client/src/lib.rs | 12 ++++- table_engine/src/remote/mod.rs | 12 ++++- table_engine/src/remote/model.rs | 31 ++++++++++- 4 files changed, 125 insertions(+), 12 deletions(-) diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs index 483bdc863c..e38a617e8d 100644 --- a/remote_engine_client/src/client.rs +++ b/remote_engine_client/src/client.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Client for accessing remote table engine @@ -18,8 +18,11 @@ use common_types::{ use common_util::error::BoxError; use futures::{Stream, StreamExt}; use router::RouterRef; -use snafu::{ensure, ResultExt}; -use table_engine::remote::model::{ReadRequest, TableIdentifier, WriteRequest}; +use snafu::{ensure, OptionExt, ResultExt}; +use table_engine::{ + remote::model::{GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, WriteRequest}, + table::{SchemaId, TableId}, +}; use tonic::{transport::Channel, Request, Streaming}; use crate::{cached_router::CachedRouter, config::Config, error::*, status_code}; @@ -46,7 +49,7 @@ impl Client { let request_pb = ceresdbproto::remote_engine::ReadRequest::try_from(request) .box_err() .context(Convert { - msg: "convert ReadRequest to pb", + msg: "Failed to convert ReadRequest to pb", })?; let result = rpc_client @@ -54,7 +57,7 @@ impl Client { .await .with_context(|| Rpc { table_ident: table_ident.clone(), - msg: "read from remote", + msg: "Failed to read from remote engine", }); let response = match result { @@ -85,7 +88,7 @@ impl Client { let request_pb = ceresdbproto::remote_engine::WriteRequest::try_from(request) .box_err() .context(Convert { - msg: "convert WriteRequest to pb failed", + msg: "Failed to convert WriteRequest to pb", })?; let mut rpc_client = RemoteEngineServiceClient::::new(channel); @@ -94,7 +97,7 @@ impl Client { .await .with_context(|| Rpc { table_ident: table_ident.clone(), - msg: "write to remote failed", + msg: "Failed to write to remote engine", }); let response = match result { @@ -119,6 +122,71 @@ impl Client { Ok(response.affected_rows as usize) } } + + pub async fn get_table_info(&self, request: GetTableInfoRequest) -> Result { + // Find the channel from router firstly. + let channel = self.cached_router.route(&request.table).await?; + let table_ident = request.table.clone(); + let request_pb = ceresdbproto::remote_engine::GetTableInfoRequest::try_from(request) + .box_err() + .context(Convert { + msg: "Failed to convert GetTableInfoRequest to pb", + })?; + + let mut rpc_client = RemoteEngineServiceClient::::new(channel); + + let result = rpc_client + .get_table_info(Request::new(request_pb)) + .await + .with_context(|| Rpc { + table_ident: table_ident.clone(), + msg: "Failed to get table info", + }); + + let response = match result { + Ok(response) => response, + Err(e) => { + // If occurred error, we simply evict the corresponding channel now. + // TODO: evict according to the type of error. + self.cached_router.evict(&table_ident).await; + return Err(e); + } + }; + + let response = response.into_inner(); + if let Some(header) = response.header && !status_code::is_ok(header.code) { + Server { + table_ident: table_ident.clone(), + code: header.code, + msg: header.error, + }.fail() + } else { + let table_info = response.table_info.context(Server { + table_ident: table_ident.clone(), + code: status_code::StatusCode::Internal.as_u32(), + msg: "Table info is empty", + })?; + + Ok(TableInfo { + catalog_name: table_info.catalog_name, + schema_name: table_info.schema_name, + schema_id: SchemaId::from(table_info.schema_id), + table_name: table_info.table_name, + table_id: TableId::from(table_info.table_id), + table_schema: table_info.table_schema.map(TryInto::try_into).transpose().box_err() + .context(Convert { msg: "Failed to covert table schema" })? + .context(Server { + table_ident, + code: status_code::StatusCode::Internal.as_u32(), + msg: "Table schema is empty", + })?, + engine: table_info.engine, + options: table_info.options, + partition_info: table_info.partition_info.map(TryInto::try_into).transpose().box_err() + .context(Convert { msg: "Failed to covert partition info" })?, + }) + } + } } pub struct ClientReadRecordBatchStream { diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs index 9033f82dcb..0dcba650eb 100644 --- a/remote_engine_client/src/lib.rs +++ b/remote_engine_client/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Remote table engine implementation @@ -25,7 +25,7 @@ use snafu::ResultExt; use table_engine::{ remote::{ self, - model::{ReadRequest, WriteRequest}, + model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteRequest}, RemoteEngine, }, stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream}, @@ -121,6 +121,14 @@ impl RemoteEngine for RemoteEngineImpl { async fn write(&self, request: WriteRequest) -> remote::Result { self.0.write(request).await.box_err().context(remote::Write) } + + async fn get_table_info(&self, request: GetTableInfoRequest) -> remote::Result { + self.0 + .get_table_info(request) + .await + .box_err() + .context(remote::GetTableInfo) + } } struct RemoteReadRecordBatchStream(ClientReadRecordBatchStream); diff --git a/table_engine/src/remote/mod.rs b/table_engine/src/remote/mod.rs index 1818495986..afb17751dd 100644 --- a/table_engine/src/remote/mod.rs +++ b/table_engine/src/remote/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Remote table engine @@ -11,7 +11,10 @@ use common_util::{define_result, error::GenericError}; use model::{ReadRequest, WriteRequest}; use snafu::Snafu; -use crate::stream::SendableRecordBatchStream; +use crate::{ + remote::model::{GetTableInfoRequest, TableInfo}, + stream::SendableRecordBatchStream, +}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -21,6 +24,9 @@ pub enum Error { #[snafu(display("Failed to write to remote, err:{}", source))] Write { source: GenericError }, + + #[snafu(display("Failed to get table info from remote, err:{}", source))] + GetTableInfo { source: GenericError }, } define_result!(Error); @@ -33,6 +39,8 @@ pub trait RemoteEngine: Send + Sync { /// Write to the remote engine. async fn write(&self, request: WriteRequest) -> Result; + + async fn get_table_info(&self, request: GetTableInfoRequest) -> Result; } /// Remote engine reference diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 56cd78fe6f..38f56ae4ee 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -2,6 +2,8 @@ //! Model for remote table engine +use std::collections::HashMap; + use arrow_ext::{ ipc, ipc::{CompressOptions, CompressionMethod}, @@ -13,11 +15,17 @@ use ceresdbproto::{ use common_types::{ record_batch::{RecordBatch, RecordBatchWithKeyBuilder}, row::{RowGroup, RowGroupBuilder}, + schema::Schema, }; use common_util::error::{BoxError, GenericError}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; -use crate::table::{ReadRequest as TableReadRequest, WriteRequest as TableWriteRequest}; +use crate::{ + partition::PartitionInfo, + table::{ + ReadRequest as TableReadRequest, SchemaId, TableId, WriteRequest as TableWriteRequest, + }, +}; #[derive(Debug, Snafu)] pub enum Error { @@ -253,6 +261,27 @@ impl TryFrom for ceresdbproto::remote_engine::GetTableInfoR } } +pub struct TableInfo { + /// Catalog name + pub catalog_name: String, + /// Schema name + pub schema_name: String, + /// Schema id + pub schema_id: SchemaId, + /// Table name + pub table_name: String, + /// Table id + pub table_id: TableId, + /// Table schema + pub table_schema: Schema, + /// Table engine type + pub engine: String, + /// Table options + pub options: HashMap, + /// Partition Info + pub partition_info: Option, +} + fn build_row_group_from_record_batch( record_batches: Vec, ) -> Result {