Skip to content

Commit

Permalink
add RemoteEngineVersion
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Jan 10, 2023
1 parent cd7408b commit a7c4794
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 15 deletions.
3 changes: 3 additions & 0 deletions common_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod hash;
pub mod projected_schema;
#[cfg(feature = "arrow")]
pub mod record_batch;
pub mod remote_engine;
pub mod request_id;
#[cfg(feature = "arrow")]
pub mod row;
Expand All @@ -30,5 +31,7 @@ pub const MAX_SEQUENCE_NUMBER: u64 = u64::MAX;
/// sequence number should starts from 1.
pub const MIN_SEQUENCE_NUMBER: u64 = 0;

pub use remote_engine::Version as RemoteEngineVersion;

#[cfg(any(test, feature = "test"))]
pub mod tests;
42 changes: 42 additions & 0 deletions common_types/src/remote_engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Common type for remote engine rpc service

use snafu::{Backtrace, Snafu};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unknown version, value:{}.\nBacktrace:\n{}", version, backtrace))]
UnknownVersion { version: u32, backtrace: Backtrace },
}

type Result<T> = std::result::Result<T, Error>;

pub enum Version {
ArrowIPCWithZstd,
}

impl TryFrom<u32> for Version {
type Error = Error;

fn try_from(value: u32) -> Result<Self> {
match value {
0 => Ok(Self::ArrowIPCWithZstd),
_ => UnknownVersion { version: value }.fail(),
}
}
}

impl Default for Version {
fn default() -> Self {
Self::ArrowIPCWithZstd
}
}

impl Version {
pub fn as_u32(&self) -> u32 {
match self {
Self::ArrowIPCWithZstd => 0,
}
}
}
29 changes: 19 additions & 10 deletions remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use arrow_ext::ipc;
use ceresdbproto::storage;
use common_types::{
projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema,
RemoteEngineVersion,
};
use futures::{Stream, StreamExt};
use proto::remote_engine::{self, remote_engine_service_client::*};
Expand Down Expand Up @@ -178,19 +179,27 @@ impl Stream for ClientReadRecordBatchStream {
}.fail()));
}

// TODO(chenxiang): read compression from config
let record_batch = ipc::decode_record_batch(response.rows, ipc::Compression::Zstd)
.map_err(|e| Box::new(e) as _)
.context(ConvertReadResponse {
msg: "decode record batch failed",
})
.and_then(|v| {
RecordBatch::try_from(v)
let record_batch = match RemoteEngineVersion::try_from(response.version)
.context(ConvertVersion)?
{
RemoteEngineVersion::ArrowIPCWithZstd => {
ipc::decode_record_batch(response.rows, ipc::Compression::Zstd)
.map_err(|e| Box::new(e) as _)
.context(ConvertReadResponse {
msg: "convert record batch failed",
msg: "decode record batch failed",
version: response.version,
})
});
.and_then(|v| {
RecordBatch::try_from(v)
.map_err(|e| Box::new(e) as _)
.context(ConvertReadResponse {
msg: "convert record batch failed",
version: response.version,
})
})
}
};

Poll::Ready(Some(record_batch))
}

Expand Down
9 changes: 8 additions & 1 deletion remote_engine_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ pub mod error {
},

#[snafu(display(
"Failed to convert request or response, table, msg:{}, err:{}",
"Failed to convert request or response, table, msg:{}, version:{}, err:{}",
msg,
version,
source
))]
ConvertReadResponse {
msg: String,
version: u32,
source: Box<dyn std::error::Error + Send + Sync>,
},

Expand Down Expand Up @@ -109,6 +111,11 @@ pub mod error {
table_ident: TableIdentifier,
msg: String,
},

#[snafu(display("Failed to convert version, source:{}", source))]
ConvertVersion {
source: common_types::remote_engine::Error,
},
}

define_result!(Error);
Expand Down
6 changes: 2 additions & 4 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use arrow_ext::ipc;
use async_trait::async_trait;
use catalog::manager::ManagerRef;
use common_types::record_batch::RecordBatch;
use common_types::{record_batch::RecordBatch, RemoteEngineVersion};
use futures::stream::{self, BoxStream, StreamExt};
use log::error;
use proto::remote_engine::{
Expand All @@ -34,7 +34,6 @@ use crate::{
pub(crate) mod error;

const STREAM_QUERY_CHANNEL_LEN: usize = 20;
const ENCODE_ROWS_WITH_AVRO: u32 = 0;

#[derive(Clone)]
pub struct RemoteEngineServiceImpl<Q: QueryExecutor + 'static> {
Expand Down Expand Up @@ -137,7 +136,6 @@ impl<Q: QueryExecutor + 'static> RemoteEngineService for RemoteEngineServiceImpl
Ok(stream) => {
let new_stream: Self::ReadStream = Box::pin(stream.map(|res| match res {
Ok(record_batch) => {
// TODO(chenxiang): read compression from config
let resp = match ipc::encode_record_batch(
&record_batch.into_arrow_record_batch(),
ipc::Compression::Zstd,
Expand All @@ -153,7 +151,7 @@ impl<Q: QueryExecutor + 'static> RemoteEngineService for RemoteEngineServiceImpl
},
Ok(rows) => ReadResponse {
header: Some(build_ok_header()),
version: ENCODE_ROWS_WITH_AVRO,
version: RemoteEngineVersion::default().as_u32(),
rows,
},
};
Expand Down

0 comments on commit a7c4794

Please sign in to comment.