Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl getTableInfo in remoteEngine service #793

Merged
merged 2 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.1"
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand Down Expand Up @@ -150,6 +149,10 @@ table_engine = { workspace = true }
toml = { workspace = true }
tracing_util = { workspace = true }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "1c3bf4e803ef8b7a1fbc8c3d4a07fdd372e1830b"

[build-dependencies]
vergen = { version = "7", default-features = false, features = ["build", "git", "rustc"] }

Expand Down
2 changes: 1 addition & 1 deletion common_types/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Schema of table

Expand Down
3 changes: 2 additions & 1 deletion server/src/grpc/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// Grpc server metrics

Expand All @@ -24,6 +24,7 @@ make_auto_flush_static_metric! {
pub label_enum RemoteEngineTypeKind {
stream_read,
write,
get_table_info,
}

pub struct RemoteEngineGrpcHandlerDurationHistogramVec: LocalHistogram {
Expand Down
125 changes: 105 additions & 20 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// Remote engine rpc service implementation.

use std::{sync::Arc, time::Instant};

use arrow_ext::ipc::{self, CompressOptions, CompressOutput, CompressionMethod};
use async_trait::async_trait;
use catalog::manager::ManagerRef;
use catalog::{manager::ManagerRef, schema::SchemaRef};
use ceresdbproto::{
remote_engine::{
read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService,
ReadRequest, ReadResponse, WriteRequest, WriteResponse,
GetTableInfoRequest, GetTableInfoResponse, ReadRequest, ReadResponse, WriteRequest,
WriteResponse,
},
storage::{arrow_payload, ArrowPayload},
};
Expand All @@ -31,9 +32,7 @@ use tonic::{Request, Response, Status};
use crate::{
grpc::{
metrics::REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
remote_engine_service::error::{
build_ok_header, ErrNoCause, ErrWithCause, Result, StatusCode,
},
remote_engine_service::error::{ErrNoCause, ErrWithCause, Result, StatusCode},
},
instance::InstanceRef,
};
Expand Down Expand Up @@ -123,6 +122,39 @@ impl<Q: QueryExecutor + 'static> RemoteEngineServiceImpl<Q> {
Ok(Response::new(resp))
}

async fn get_table_info_internal(
&self,
request: Request<GetTableInfoRequest>,
) -> std::result::Result<Response<GetTableInfoResponse>, Status> {
let begin_instant = Instant::now();
let ctx = self.handler_ctx();
let handle = self.runtimes.read_runtime.spawn(async move {
let request = request.into_inner();
handle_get_table_info(ctx, request).await
});

let res = handle.await.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "fail to join task",
});

let mut resp = GetTableInfoResponse::default();
match res {
Ok(Ok(v)) => {
resp.header = Some(error::build_ok_header());
resp.table_info = v.table_info;
}
Ok(Err(e)) | Err(e) => {
resp.header = Some(error::build_err_header(e));
}
};

REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.get_table_info
.observe(begin_instant.saturating_elapsed().as_secs_f64());
Ok(Response::new(resp))
}

fn handler_ctx(&self) -> HandlerContext {
HandlerContext {
catalog_manager: self.instance.catalog_manager.clone(),
Expand Down Expand Up @@ -170,7 +202,7 @@ impl<Q: QueryExecutor + 'static> RemoteEngineService for RemoteEngineServiceImpl
};

ReadResponse {
header: Some(build_ok_header()),
header: Some(error::build_ok_header()),
output: Some(Arrow(ArrowPayload {
record_batches: vec![payload],
compression: compression as i32,
Expand Down Expand Up @@ -209,6 +241,13 @@ impl<Q: QueryExecutor + 'static> RemoteEngineService for RemoteEngineServiceImpl
) -> std::result::Result<Response<WriteResponse>, Status> {
self.write_internal(request).await
}

async fn get_table_info(
&self,
request: Request<GetTableInfoRequest>,
) -> std::result::Result<Response<GetTableInfoResponse>, Status> {
self.get_table_info_internal(request).await
}
}

async fn handle_stream_read(
Expand Down Expand Up @@ -258,10 +297,68 @@ async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result<Writ
})
}

async fn handle_get_table_info(
ctx: HandlerContext,
request: GetTableInfoRequest,
) -> Result<GetTableInfoResponse> {
let request: table_engine::remote::model::GetTableInfoRequest =
request.try_into().box_err().context(ErrWithCause {
code: StatusCode::BadRequest,
msg: "fail to convert get table info request",
})?;

let schema = find_schema_by_identifier(&ctx, &request.table)?;
let table = schema
.table_by_name(&request.table.table)
.box_err()
.context(ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to get table, table:{}", request.table.table),
})?
.context(ErrNoCause {
code: StatusCode::NotFound,
msg: format!("table is not found, table:{}", request.table.table),
})?;

Ok(GetTableInfoResponse {
header: None,
table_info: Some(ceresdbproto::remote_engine::TableInfo {
catalog_name: request.table.catalog,
schema_name: schema.name().to_string(),
schema_id: schema.id().as_u32(),
table_name: table.name().to_string(),
table_id: table.id().as_u64(),
table_schema: Some((&table.schema()).into()),
engine: table.engine_type().to_string(),
options: table.options(),
partition_info: table.partition_info().map(Into::into),
}),
})
}

fn find_table_by_identifier(
ctx: &HandlerContext,
table_identifier: &TableIdentifier,
) -> Result<TableRef> {
let schema = find_schema_by_identifier(ctx, table_identifier)?;

schema
.table_by_name(&table_identifier.table)
.box_err()
.context(ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to get table, table:{}", table_identifier.table),
})?
.context(ErrNoCause {
code: StatusCode::NotFound,
msg: format!("table is not found, table:{}", table_identifier.table),
})
}

fn find_schema_by_identifier(
ctx: &HandlerContext,
table_identifier: &TableIdentifier,
) -> Result<SchemaRef> {
let catalog = ctx
.catalog_manager
.catalog_by_name(&table_identifier.catalog)
Expand All @@ -274,7 +371,7 @@ fn find_table_by_identifier(
code: StatusCode::NotFound,
msg: format!("catalog is not found, catalog:{}", table_identifier.catalog),
})?;
let schema = catalog
catalog
.schema_by_name(&table_identifier.schema)
.box_err()
.context(ErrWithCause {
Expand All @@ -290,17 +387,5 @@ fn find_table_by_identifier(
"schema of table is not found, schema:{}",
table_identifier.schema
),
})?;

schema
.table_by_name(&table_identifier.table)
.box_err()
.context(ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to get table, table:{}", table_identifier.table),
})?
.context(ErrNoCause {
code: StatusCode::NotFound,
msg: format!("table is not found, table:{}", table_identifier.table),
})
}
Loading