Skip to content

Commit

Permalink
feat: use arrow-ipc to communicate between remote server and client (#…
Browse files Browse the repository at this point in the history
…552)

* feat: use arrow-ipc to communicate bewteen remote server and client

* add benchmarks

* add zstd for ipc

* add RemoteEngineVersion

* fix message
  • Loading branch information
jiacai2050 authored Jan 10, 2023
1 parent abb1bb5 commit 1e6d46b
Show file tree
Hide file tree
Showing 15 changed files with 284 additions and 30 deletions.
32 changes: 28 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
arrow = { version = "23.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "23.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand Down Expand Up @@ -108,6 +109,7 @@ tonic = "0.8.1"
tokio = { version = "1.24", features = ["full"] }
wal = { path = "wal" }
message_queue = { path = "components/message_queue" }
zstd = { version = "0.12", default-features = false }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ analytic_engine = { workspace = true }
arena = { workspace = true }
arrow = { workspace = true }
arrow2 = { version = "0.12.0", features = [ "io_parquet" ] }
arrow_ext = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true }
common_types = { workspace = true }
Expand All @@ -34,6 +35,7 @@ table_engine = { workspace = true }
table_kv = { workspace = true }
tokio = { workspace = true }
wal = { workspace = true }
zstd = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
Expand Down
63 changes: 63 additions & 0 deletions benchmarks/src/bin/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{sync::Arc, time::Instant};

use arrow::{
array::{Int32Array, StringArray},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_ext::ipc;
use common_util::{avro, time::InstantExt};

fn create_batch(rows: usize) -> RecordBatch {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]);

let a = Int32Array::from_iter_values(0..rows as i32);
let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));

RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
}

pub fn main() {
let arrow_batch = create_batch(102400);

// Arrow IPC
{
let batch = common_types::record_batch::RecordBatch::try_from(arrow_batch.clone()).unwrap();
let compression = ipc::Compression::Zstd;

let begin = Instant::now();
let bytes =
ipc::encode_record_batch(&batch.into_arrow_record_batch(), compression).unwrap();
println!("Arrow IPC encoded size:{}", bytes.len());

let decode_batch = ipc::decode_record_batch(bytes, compression).unwrap();
let _ = common_types::record_batch::RecordBatch::try_from(decode_batch).unwrap();

let cost = begin.saturating_elapsed();
println!("Arrow IPC encode/decode cost:{}ms", cost.as_millis());
}

// Avro
{
let record_schema =
common_types::schema::RecordSchema::try_from(arrow_batch.schema()).unwrap();
let batch = common_types::record_batch::RecordBatch::try_from(arrow_batch).unwrap();

let begin = Instant::now();
let bytes = avro::record_batch_to_avro_rows(&batch).unwrap();
println!(
"Avro encoded size:{}",
bytes.iter().map(|row| row.len()).sum::<usize>()
);

let _decode_batch = avro::avro_rows_to_record_batch(bytes, record_schema);

let cost = begin.saturating_elapsed();
println!("Avro encode/decode cost:{}ms", cost.as_millis());
}
}
3 changes: 3 additions & 0 deletions common_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod hash;
pub mod projected_schema;
#[cfg(feature = "arrow")]
pub mod record_batch;
pub mod remote_engine;
pub mod request_id;
#[cfg(feature = "arrow")]
pub mod row;
Expand All @@ -30,5 +31,7 @@ pub const MAX_SEQUENCE_NUMBER: u64 = u64::MAX;
/// sequence number should starts from 1.
pub const MIN_SEQUENCE_NUMBER: u64 = 0;

pub use remote_engine::Version as RemoteEngineVersion;

#[cfg(any(test, feature = "test"))]
pub mod tests;
36 changes: 36 additions & 0 deletions common_types/src/remote_engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Common type for remote engine rpc service

use snafu::{Backtrace, Snafu};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unknown version, value:{}.\nBacktrace:\n{}", version, backtrace))]
UnknownVersion { version: u32, backtrace: Backtrace },
}

type Result<T> = std::result::Result<T, Error>;

pub enum Version {
ArrowIPCWithZstd,
}

impl TryFrom<u32> for Version {
type Error = Error;

fn try_from(version: u32) -> Result<Self> {
match version {
0 => Ok(Self::ArrowIPCWithZstd),
_ => UnknownVersion { version }.fail(),
}
}
}

impl Version {
pub fn as_u32(&self) -> u32 {
match self {
Self::ArrowIPCWithZstd => 0,
}
}
}
6 changes: 2 additions & 4 deletions components/arrow_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ workspace = true

[package.edition]
workspace = true
[dependencies.uncover]
git = "https://github.com/matklad/uncover.git"
rev = "1d0770d997e29731b287e9e11e4ffbbea5f456da"

[dependencies]
arrow = { workspace = true }

snafu = { workspace = true }
zstd = { workspace = true }
100 changes: 100 additions & 0 deletions components/arrow_ext/src/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Utilities for `RecordBatch` serialization using Arrow IPC

use std::io::Cursor;

use arrow::{
ipc::{reader::StreamReader, writer::StreamWriter},
record_batch::RecordBatch,
};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};

#[derive(Snafu, Debug)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Arror error, err:{}.\nBacktrace:\n{}", source, backtrace))]
ArrowError {
source: arrow::error::ArrowError,
backtrace: Backtrace,
},

#[snafu(display("Zstd decode error, err:{}.\nBacktrace:\n{}", source, backtrace))]
ZstdError {
source: std::io::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to decode record batch.\nBacktrace:\n{}", backtrace))]
Decode { backtrace: Backtrace },
}

type Result<T> = std::result::Result<T, Error>;

#[derive(Copy, Clone)]
pub enum Compression {
None,
Zstd,
}

// https://facebook.github.io/zstd/zstd_manual.html
// The lower the level, the faster the speed (at the cost of compression).
const ZSTD_LEVEL: i32 = 3;

pub fn encode_record_batch(batch: &RecordBatch, compression: Compression) -> Result<Vec<u8>> {
let buffer: Vec<u8> = Vec::new();
let mut stream_writer = StreamWriter::try_new(buffer, &batch.schema()).context(ArrowError)?;
stream_writer.write(batch).context(ArrowError)?;
stream_writer
.into_inner()
.context(ArrowError)
.and_then(|bytes| match compression {
Compression::None => Ok(bytes),
Compression::Zstd => {
zstd::stream::encode_all(Cursor::new(bytes), ZSTD_LEVEL).context(ZstdError)
}
})
}

pub fn decode_record_batch(bytes: Vec<u8>, compression: Compression) -> Result<RecordBatch> {
let bytes = match compression {
Compression::None => bytes,
Compression::Zstd => zstd::stream::decode_all(Cursor::new(bytes)).context(ZstdError)?,
};

let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).context(ArrowError)?;
stream_reader.next().context(Decode)?.context(ArrowError)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow::{
array::{Int32Array, StringArray},
datatypes::{DataType, Field, Schema},
};

use super::*;

fn create_batch(rows: usize) -> RecordBatch {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]);

let a = Int32Array::from_iter_values(0..rows as i32);
let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));

RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
}

#[test]
fn test_ipc_encode_decode() {
let batch = create_batch(1024);
for compression in &[Compression::None, Compression::Zstd] {
let bytes = encode_record_batch(&batch, *compression).unwrap();
assert_eq!(batch, decode_record_batch(bytes, *compression).unwrap());
}
}
}
1 change: 1 addition & 0 deletions components/arrow_ext/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

pub mod display;
pub mod ipc;
pub mod operation;
2 changes: 1 addition & 1 deletion proto/protos/remote_engine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ message ReadResponse {
ResponseHeader header = 1;
// Version of row encoding method
uint32 version = 2;
repeated bytes rows = 3;
bytes rows = 3;
}

message RowGroup {
Expand Down
1 change: 1 addition & 0 deletions remote_engine_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
workspace = true

[dependencies]
arrow_ext = { workspace = true }
async-trait = { workspace = true }
ceresdbproto = { workspace = true }
clru = { workspace = true }
Expand Down
Loading

0 comments on commit 1e6d46b

Please sign in to comment.