From 1e6d46b736ddd2211602f247b79a211b747d8a31 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Tue, 10 Jan 2023 15:21:12 +0800 Subject: [PATCH] feat: use arrow-ipc to communicate between remote server and client (#552) * feat: use arrow-ipc to communicate bewteen remote server and client * add benchmarks * add zstd for ipc * add RemoteEngineVersion * fix message --- Cargo.lock | 32 +++++- Cargo.toml | 2 + benchmarks/Cargo.toml | 2 + benchmarks/src/bin/ipc.rs | 63 ++++++++++++ common_types/src/lib.rs | 3 + common_types/src/remote_engine.rs | 36 +++++++ components/arrow_ext/Cargo.toml | 6 +- components/arrow_ext/src/ipc.rs | 100 +++++++++++++++++++ components/arrow_ext/src/lib.rs | 1 + proto/protos/remote_engine.proto | 2 +- remote_engine_client/Cargo.toml | 1 + remote_engine_client/src/client.rs | 34 +++++-- remote_engine_client/src/lib.rs | 9 +- server/Cargo.toml | 1 + server/src/grpc/remote_engine_service/mod.rs | 22 ++-- 15 files changed, 284 insertions(+), 30 deletions(-) create mode 100644 benchmarks/src/bin/ipc.rs create mode 100644 common_types/src/remote_engine.rs create mode 100644 components/arrow_ext/src/ipc.rs diff --git a/Cargo.lock b/Cargo.lock index bf90eb750e..5ce3eb79c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -271,7 +271,8 @@ name = "arrow_ext" version = "1.0.0-alpha02" dependencies = [ "arrow", - "uncover", + "snafu 0.6.10", + "zstd 0.12.1+zstd.1.5.2", ] [[package]] @@ -437,6 +438,7 @@ dependencies = [ "arena", "arrow", "arrow2", + "arrow_ext 1.0.0-alpha02", "base64 0.13.0", "clap 3.2.23", "common_types 1.0.0-alpha02", @@ -457,6 +459,7 @@ dependencies = [ "table_kv", "tokio 1.24.1", "wal", + "zstd 0.12.1+zstd.1.5.2", ] [[package]] @@ -3738,7 +3741,7 @@ dependencies = [ "spin", "tokio-codec", "uuid 0.7.4", - "zstd", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] @@ -3964,7 +3967,7 @@ dependencies = [ "snap", "thrift 0.16.0", "tokio 1.24.1", - "zstd", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] @@ -4821,6 +4824,7 @@ checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" name = "remote_engine_client" version = "1.0.0-alpha02" dependencies = [ + "arrow_ext 1.0.0-alpha02", "async-trait", "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98)", "clru", @@ -5281,6 +5285,7 @@ version = "1.0.0-alpha02" dependencies = [ "analytic_engine", "arrow", + "arrow_ext 1.0.0-alpha02", "async-trait", "bytes 1.2.1", "catalog", @@ -7247,7 +7252,16 @@ version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ - "zstd-safe", + "zstd-safe 5.0.2+zstd.1.5.2", +] + +[[package]] +name = "zstd" +version = "0.12.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c947d2adc84ff9a59f2e3c03b81aa4128acf28d6ad7d56273f7e8af14e47bea" +dependencies = [ + "zstd-safe 6.0.2+zstd.1.5.2", ] [[package]] @@ -7260,6 +7274,16 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "6.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cf39f730b440bab43da8fb5faf5f254574462f73f260f85f7987f32154ff17" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-sys" version = "2.0.1+zstd.1.5.2" diff --git a/Cargo.toml b/Cargo.toml index 4cbe191189..99b688e88b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ path = "src/bin/ceresdb-server.rs" [workspace.dependencies] arrow = { version = "23.0.0", features = ["prettyprint"] } +arrow_ipc = { version = "23.0.0" } arrow_ext = { path = "components/arrow_ext" } analytic_engine = { path = "analytic_engine" } arena = { path = "components/arena" } @@ -108,6 +109,7 @@ tonic = "0.8.1" tokio = { version = "1.24", features = ["full"] } wal = { path = "wal" } message_queue = { path = "components/message_queue" } +zstd = { version = "0.12", default-features = false } [workspace.dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index ecd0d7636c..65f4a67a5a 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -15,6 +15,7 @@ analytic_engine = { workspace = true } arena = { workspace = true } arrow = { workspace = true } arrow2 = { version = "0.12.0", features = [ "io_parquet" ] } +arrow_ext = { workspace = true } base64 = { workspace = true } clap = { workspace = true } common_types = { workspace = true } @@ -34,6 +35,7 @@ table_engine = { workspace = true } table_kv = { workspace = true } tokio = { workspace = true } wal = { workspace = true } +zstd = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/benchmarks/src/bin/ipc.rs b/benchmarks/src/bin/ipc.rs new file mode 100644 index 0000000000..ad9298b344 --- /dev/null +++ b/benchmarks/src/bin/ipc.rs @@ -0,0 +1,63 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{sync::Arc, time::Instant}; + +use arrow::{ + array::{Int32Array, StringArray}, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, +}; +use arrow_ext::ipc; +use common_util::{avro, time::InstantExt}; + +fn create_batch(rows: usize) -> RecordBatch { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + + let a = Int32Array::from_iter_values(0..rows as i32); + let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string())); + + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap() +} + +pub fn main() { + let arrow_batch = create_batch(102400); + + // Arrow IPC + { + let batch = common_types::record_batch::RecordBatch::try_from(arrow_batch.clone()).unwrap(); + let compression = ipc::Compression::Zstd; + + let begin = Instant::now(); + let bytes = + ipc::encode_record_batch(&batch.into_arrow_record_batch(), compression).unwrap(); + println!("Arrow IPC encoded size:{}", bytes.len()); + + let decode_batch = ipc::decode_record_batch(bytes, compression).unwrap(); + let _ = common_types::record_batch::RecordBatch::try_from(decode_batch).unwrap(); + + let cost = begin.saturating_elapsed(); + println!("Arrow IPC encode/decode cost:{}ms", cost.as_millis()); + } + + // Avro + { + let record_schema = + common_types::schema::RecordSchema::try_from(arrow_batch.schema()).unwrap(); + let batch = common_types::record_batch::RecordBatch::try_from(arrow_batch).unwrap(); + + let begin = Instant::now(); + let bytes = avro::record_batch_to_avro_rows(&batch).unwrap(); + println!( + "Avro encoded size:{}", + bytes.iter().map(|row| row.len()).sum::() + ); + + let _decode_batch = avro::avro_rows_to_record_batch(bytes, record_schema); + + let cost = begin.saturating_elapsed(); + println!("Avro encode/decode cost:{}ms", cost.as_millis()); + } +} diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index a796fb207c..0c4eb241a9 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -13,6 +13,7 @@ pub mod hash; pub mod projected_schema; #[cfg(feature = "arrow")] pub mod record_batch; +pub mod remote_engine; pub mod request_id; #[cfg(feature = "arrow")] pub mod row; @@ -30,5 +31,7 @@ pub const MAX_SEQUENCE_NUMBER: u64 = u64::MAX; /// sequence number should starts from 1. pub const MIN_SEQUENCE_NUMBER: u64 = 0; +pub use remote_engine::Version as RemoteEngineVersion; + #[cfg(any(test, feature = "test"))] pub mod tests; diff --git a/common_types/src/remote_engine.rs b/common_types/src/remote_engine.rs new file mode 100644 index 0000000000..52e8ac94c9 --- /dev/null +++ b/common_types/src/remote_engine.rs @@ -0,0 +1,36 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Common type for remote engine rpc service + +use snafu::{Backtrace, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Unknown version, value:{}.\nBacktrace:\n{}", version, backtrace))] + UnknownVersion { version: u32, backtrace: Backtrace }, +} + +type Result = std::result::Result; + +pub enum Version { + ArrowIPCWithZstd, +} + +impl TryFrom for Version { + type Error = Error; + + fn try_from(version: u32) -> Result { + match version { + 0 => Ok(Self::ArrowIPCWithZstd), + _ => UnknownVersion { version }.fail(), + } + } +} + +impl Version { + pub fn as_u32(&self) -> u32 { + match self { + Self::ArrowIPCWithZstd => 0, + } + } +} diff --git a/components/arrow_ext/Cargo.toml b/components/arrow_ext/Cargo.toml index 6bffdb255e..b30364e005 100644 --- a/components/arrow_ext/Cargo.toml +++ b/components/arrow_ext/Cargo.toml @@ -9,10 +9,8 @@ workspace = true [package.edition] workspace = true -[dependencies.uncover] -git = "https://github.com/matklad/uncover.git" -rev = "1d0770d997e29731b287e9e11e4ffbbea5f456da" [dependencies] arrow = { workspace = true } - +snafu = { workspace = true } +zstd = { workspace = true } diff --git a/components/arrow_ext/src/ipc.rs b/components/arrow_ext/src/ipc.rs new file mode 100644 index 0000000000..9d0fde7c84 --- /dev/null +++ b/components/arrow_ext/src/ipc.rs @@ -0,0 +1,100 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Utilities for `RecordBatch` serialization using Arrow IPC + +use std::io::Cursor; + +use arrow::{ + ipc::{reader::StreamReader, writer::StreamWriter}, + record_batch::RecordBatch, +}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; + +#[derive(Snafu, Debug)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Arror error, err:{}.\nBacktrace:\n{}", source, backtrace))] + ArrowError { + source: arrow::error::ArrowError, + backtrace: Backtrace, + }, + + #[snafu(display("Zstd decode error, err:{}.\nBacktrace:\n{}", source, backtrace))] + ZstdError { + source: std::io::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decode record batch.\nBacktrace:\n{}", backtrace))] + Decode { backtrace: Backtrace }, +} + +type Result = std::result::Result; + +#[derive(Copy, Clone)] +pub enum Compression { + None, + Zstd, +} + +// https://facebook.github.io/zstd/zstd_manual.html +// The lower the level, the faster the speed (at the cost of compression). +const ZSTD_LEVEL: i32 = 3; + +pub fn encode_record_batch(batch: &RecordBatch, compression: Compression) -> Result> { + let buffer: Vec = Vec::new(); + let mut stream_writer = StreamWriter::try_new(buffer, &batch.schema()).context(ArrowError)?; + stream_writer.write(batch).context(ArrowError)?; + stream_writer + .into_inner() + .context(ArrowError) + .and_then(|bytes| match compression { + Compression::None => Ok(bytes), + Compression::Zstd => { + zstd::stream::encode_all(Cursor::new(bytes), ZSTD_LEVEL).context(ZstdError) + } + }) +} + +pub fn decode_record_batch(bytes: Vec, compression: Compression) -> Result { + let bytes = match compression { + Compression::None => bytes, + Compression::Zstd => zstd::stream::decode_all(Cursor::new(bytes)).context(ZstdError)?, + }; + + let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).context(ArrowError)?; + stream_reader.next().context(Decode)?.context(ArrowError) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::{ + array::{Int32Array, StringArray}, + datatypes::{DataType, Field, Schema}, + }; + + use super::*; + + fn create_batch(rows: usize) -> RecordBatch { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + + let a = Int32Array::from_iter_values(0..rows as i32); + let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string())); + + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap() + } + + #[test] + fn test_ipc_encode_decode() { + let batch = create_batch(1024); + for compression in &[Compression::None, Compression::Zstd] { + let bytes = encode_record_batch(&batch, *compression).unwrap(); + assert_eq!(batch, decode_record_batch(bytes, *compression).unwrap()); + } + } +} diff --git a/components/arrow_ext/src/lib.rs b/components/arrow_ext/src/lib.rs index 4fa4baf124..d25f223ab3 100644 --- a/components/arrow_ext/src/lib.rs +++ b/components/arrow_ext/src/lib.rs @@ -1,4 +1,5 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. pub mod display; +pub mod ipc; pub mod operation; diff --git a/proto/protos/remote_engine.proto b/proto/protos/remote_engine.proto index 764e80068d..f3c97cd889 100644 --- a/proto/protos/remote_engine.proto +++ b/proto/protos/remote_engine.proto @@ -54,7 +54,7 @@ message ReadResponse { ResponseHeader header = 1; // Version of row encoding method uint32 version = 2; - repeated bytes rows = 3; + bytes rows = 3; } message RowGroup { diff --git a/remote_engine_client/Cargo.toml b/remote_engine_client/Cargo.toml index 824d03ef6c..6bc10825ef 100644 --- a/remote_engine_client/Cargo.toml +++ b/remote_engine_client/Cargo.toml @@ -11,6 +11,7 @@ workspace = true workspace = true [dependencies] +arrow_ext = { workspace = true } async-trait = { workspace = true } ceresdbproto = { workspace = true } clru = { workspace = true } diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs index c3f66e7dbd..4c56cf24f4 100644 --- a/remote_engine_client/src/client.rs +++ b/remote_engine_client/src/client.rs @@ -7,11 +7,12 @@ use std::{ task::{Context, Poll}, }; +use arrow_ext::ipc; use ceresdbproto::storage; use common_types::{ projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema, + RemoteEngineVersion, }; -use common_util::avro; use futures::{Stream, StreamExt}; use proto::remote_engine::{self, remote_engine_service_client::*}; use router::{endpoint::Endpoint, RouterRef}; @@ -178,15 +179,28 @@ impl Stream for ClientReadRecordBatchStream { }.fail())); } - // It's ok, try to convert rows to record batch and return. - let record_schema = this.projected_schema.to_record_schema(); - let record_batch_result = - avro::avro_rows_to_record_batch(response.rows, record_schema) - .map_err(|e| Box::new(e) as _) - .context(ConvertReadResponse { - msg: "build record batch failed", - }); - Poll::Ready(Some(record_batch_result)) + let record_batch = match RemoteEngineVersion::try_from(response.version) + .context(ConvertVersion)? + { + RemoteEngineVersion::ArrowIPCWithZstd => { + ipc::decode_record_batch(response.rows, ipc::Compression::Zstd) + .map_err(|e| Box::new(e) as _) + .context(ConvertReadResponse { + msg: "decode record batch failed", + version: response.version, + }) + .and_then(|v| { + RecordBatch::try_from(v) + .map_err(|e| Box::new(e) as _) + .context(ConvertReadResponse { + msg: "convert record batch failed", + version: response.version, + }) + }) + } + }; + + Poll::Ready(Some(record_batch)) } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e).context(Rpc { diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs index a90435218e..0e0c179eb0 100644 --- a/remote_engine_client/src/lib.rs +++ b/remote_engine_client/src/lib.rs @@ -55,12 +55,14 @@ pub mod error { }, #[snafu(display( - "Failed to convert request or response, table, msg:{}, err:{}", + "Failed to convert request or response, table, msg:{}, version:{}, err:{}", msg, + version, source ))] ConvertReadResponse { msg: String, + version: u32, source: Box, }, @@ -109,6 +111,11 @@ pub mod error { table_ident: TableIdentifier, msg: String, }, + + #[snafu(display("Failed to convert version, source:{}", source))] + ConvertVersion { + source: common_types::remote_engine::Error, + }, } define_result!(Error); diff --git a/server/Cargo.toml b/server/Cargo.toml index 595b444e29..9fad2b34c2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] analytic_engine = { workspace = true } arrow = { workspace = true } +arrow_ext = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } catalog = { workspace = true } diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 637ef5e802..5531cb31d3 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -4,10 +4,10 @@ use std::sync::Arc; +use arrow_ext::ipc; use async_trait::async_trait; use catalog::manager::ManagerRef; -use common_types::record_batch::RecordBatch; -use common_util::avro; +use common_types::{record_batch::RecordBatch, RemoteEngineVersion}; use futures::stream::{self, BoxStream, StreamExt}; use log::error; use proto::remote_engine::{ @@ -34,7 +34,6 @@ use crate::{ pub(crate) mod error; const STREAM_QUERY_CHANNEL_LEN: usize = 20; -const ENCODE_ROWS_WITH_AVRO: u32 = 0; #[derive(Clone)] pub struct RemoteEngineServiceImpl { @@ -137,19 +136,22 @@ impl RemoteEngineService for RemoteEngineServiceImpl Ok(stream) => { let new_stream: Self::ReadStream = Box::pin(stream.map(|res| match res { Ok(record_batch) => { - let resp = match avro::record_batch_to_avro_rows(&record_batch) - .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { - code: StatusCode::Internal, - msg: "fail to convert record batch to avro", - }) { + let resp = match ipc::encode_record_batch( + &record_batch.into_arrow_record_batch(), + ipc::Compression::Zstd, + ) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "encode record batch failed", + }) { Err(e) => ReadResponse { header: Some(error::build_err_header(e)), ..Default::default() }, Ok(rows) => ReadResponse { header: Some(build_ok_header()), - version: ENCODE_ROWS_WITH_AVRO, + version: RemoteEngineVersion::ArrowIPCWithZstd.as_u32(), rows, }, };