diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index a796fb207c..0c4eb241a9 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -13,6 +13,7 @@ pub mod hash; pub mod projected_schema; #[cfg(feature = "arrow")] pub mod record_batch; +pub mod remote_engine; pub mod request_id; #[cfg(feature = "arrow")] pub mod row; @@ -30,5 +31,7 @@ pub const MAX_SEQUENCE_NUMBER: u64 = u64::MAX; /// sequence number should starts from 1. pub const MIN_SEQUENCE_NUMBER: u64 = 0; +pub use remote_engine::Version as RemoteEngineVersion; + #[cfg(any(test, feature = "test"))] pub mod tests; diff --git a/common_types/src/remote_engine.rs b/common_types/src/remote_engine.rs new file mode 100644 index 0000000000..43a0498b56 --- /dev/null +++ b/common_types/src/remote_engine.rs @@ -0,0 +1,42 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Common type for remote engine rpc service + +use snafu::{Backtrace, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Unknown version, value:{}.\nBacktrace:\n{}", version, backtrace))] + UnknownVersion { version: u32, backtrace: Backtrace }, +} + +type Result = std::result::Result; + +pub enum Version { + ArrowIPCWithZstd, +} + +impl TryFrom for Version { + type Error = Error; + + fn try_from(value: u32) -> Result { + match value { + 0 => Ok(Self::ArrowIPCWithZstd), + _ => UnknownVersion { version: value }.fail(), + } + } +} + +impl Default for Version { + fn default() -> Self { + Self::ArrowIPCWithZstd + } +} + +impl Version { + pub fn as_u32(&self) -> u32 { + match self { + Self::ArrowIPCWithZstd => 0, + } + } +} diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs index a2238beb10..4c56cf24f4 100644 --- a/remote_engine_client/src/client.rs +++ b/remote_engine_client/src/client.rs @@ -11,6 +11,7 @@ use arrow_ext::ipc; use ceresdbproto::storage; use common_types::{ projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema, + RemoteEngineVersion, }; use futures::{Stream, StreamExt}; use proto::remote_engine::{self, remote_engine_service_client::*}; @@ -178,19 +179,27 @@ impl Stream for ClientReadRecordBatchStream { }.fail())); } - // TODO(chenxiang): read compression from config - let record_batch = ipc::decode_record_batch(response.rows, ipc::Compression::Zstd) - .map_err(|e| Box::new(e) as _) - .context(ConvertReadResponse { - msg: "decode record batch failed", - }) - .and_then(|v| { - RecordBatch::try_from(v) + let record_batch = match RemoteEngineVersion::try_from(response.version) + .context(ConvertVersion)? + { + RemoteEngineVersion::ArrowIPCWithZstd => { + ipc::decode_record_batch(response.rows, ipc::Compression::Zstd) .map_err(|e| Box::new(e) as _) .context(ConvertReadResponse { - msg: "convert record batch failed", + msg: "decode record batch failed", + version: response.version, }) - }); + .and_then(|v| { + RecordBatch::try_from(v) + .map_err(|e| Box::new(e) as _) + .context(ConvertReadResponse { + msg: "convert record batch failed", + version: response.version, + }) + }) + } + }; + Poll::Ready(Some(record_batch)) } diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs index a90435218e..0e0c179eb0 100644 --- a/remote_engine_client/src/lib.rs +++ b/remote_engine_client/src/lib.rs @@ -55,12 +55,14 @@ pub mod error { }, #[snafu(display( - "Failed to convert request or response, table, msg:{}, err:{}", + "Failed to convert request or response, table, msg:{}, version:{}, err:{}", msg, + version, source ))] ConvertReadResponse { msg: String, + version: u32, source: Box, }, @@ -109,6 +111,11 @@ pub mod error { table_ident: TableIdentifier, msg: String, }, + + #[snafu(display("Failed to convert version, source:{}", source))] + ConvertVersion { + source: common_types::remote_engine::Error, + }, } define_result!(Error); diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 4a77c5deb9..20e48385ce 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use arrow_ext::ipc; use async_trait::async_trait; use catalog::manager::ManagerRef; -use common_types::record_batch::RecordBatch; +use common_types::{record_batch::RecordBatch, RemoteEngineVersion}; use futures::stream::{self, BoxStream, StreamExt}; use log::error; use proto::remote_engine::{ @@ -34,7 +34,6 @@ use crate::{ pub(crate) mod error; const STREAM_QUERY_CHANNEL_LEN: usize = 20; -const ENCODE_ROWS_WITH_AVRO: u32 = 0; #[derive(Clone)] pub struct RemoteEngineServiceImpl { @@ -137,7 +136,6 @@ impl RemoteEngineService for RemoteEngineServiceImpl Ok(stream) => { let new_stream: Self::ReadStream = Box::pin(stream.map(|res| match res { Ok(record_batch) => { - // TODO(chenxiang): read compression from config let resp = match ipc::encode_record_batch( &record_batch.into_arrow_record_batch(), ipc::Compression::Zstd, @@ -153,7 +151,7 @@ impl RemoteEngineService for RemoteEngineServiceImpl }, Ok(rows) => ReadResponse { header: Some(build_ok_header()), - version: ENCODE_ROWS_WITH_AVRO, + version: RemoteEngineVersion::default().as_u32(), rows, }, };