Skip to content

Commit

Permalink
feat: impl remote sdk (#509)
Browse files Browse the repository at this point in the history
* move router from server to a dependent crate.

* move avro util to common util.

* impl remote sdk.

* modfiy global Cargo.toml and Cargo.lock.

* rename some variable.

* address CR.
  • Loading branch information
Rachelint authored Dec 28, 2022
1 parent 03f5f78 commit 3b21fd1
Show file tree
Hide file tree
Showing 35 changed files with 1,085 additions and 184 deletions.
39 changes: 37 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ members = [
"meta_client",
"proto",
"query_engine",
"remote_engine_client",
"router",
"server",
"sql",
"system_catalog",
Expand All @@ -61,6 +63,7 @@ catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
Expand All @@ -86,6 +89,8 @@ proto = { path = "proto" }
prost = "0.11"
query_engine = { path = "query_engine" }
rand = "0.7"
remote_engine_client = { path = "remote_engine_client" }
router = { path = "router" }
snafu = { version = "0.6.10", features = ["backtraces"] }
serde = "1.0"
serde_derive = "1.0"
Expand Down Expand Up @@ -133,6 +138,7 @@ log = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
sort = "0.8.5"
Expand Down
1 change: 0 additions & 1 deletion cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ workspace = true
workspace = true

[dependencies]
analytic_engine = { workspace = true }
async-trait = { workspace = true }
ceresdbproto = { workspace = true }
common_types = { workspace = true }
Expand Down
18 changes: 18 additions & 0 deletions common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ impl ProjectedSchema {
}
}

impl From<ProjectedSchema> for proto::common::ProjectedSchema {
fn from(request: ProjectedSchema) -> Self {
let table_schema_pb = (&request.0.original_schema).into();
let projection_pb = request.0.projection.as_ref().map(|project| {
let project = project
.iter()
.map(|one_project| *one_project as u64)
.collect::<Vec<u64>>();
proto::common::Projection { idx: project }
});

Self {
table_schema: Some(table_schema_pb),
projection: projection_pb,
}
}
}

impl TryFrom<proto::common::ProjectedSchema> for ProjectedSchema {
type Error = Error;

Expand Down
5 changes: 5 additions & 0 deletions common_types/src/request_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ impl RequestId {

Self(id)
}

#[inline]
pub fn as_u64(&self) -> u64 {
self.0
}
}

impl fmt::Display for RequestId {
Expand Down
134 changes: 132 additions & 2 deletions common_util/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use avro_rs::{
};
use common_types::{
bytes::{ByteVec, Bytes},
column::ColumnBlock,
column::{ColumnBlock, ColumnBlockBuilder},
datum::{Datum, DatumKind},
record_batch::RecordBatch,
row::{Row, RowGroup, RowGroupBuilder},
schema::{RecordSchema, Schema},
string::StringBytes,
time::Timestamp,
};
use snafu::{Backtrace, ResultExt, Snafu};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};

/// Schema name of the record
const RECORD_NAME: &str = "Result";
Expand Down Expand Up @@ -48,6 +48,43 @@ pub enum Error {
))]
InvalidAvroRecord { value: Value, backtrace: Backtrace },

#[snafu(display(
"Failed to convert avro rows to record batch, msg:{}, err:{}",
msg,
source
))]
AvroRowsToRecordBatch {
msg: String,
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Failed to convert avro rows to row group, msg:{}, err:{}",
msg,
source
))]
AvroRowsToRowGroup {
msg: String,
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Failed to convert row group to avro rows with no cause, msg:{}.\nBacktrace:\n{}",
msg,
backtrace
))]
RowGroupToAvroRowsNoCause { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to convert row group to avro rows with cause, msg:{}, err:{}",
msg,
source
))]
RowGroupToAvroRowsWithCause {
msg: String,
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))]
UnsupportedType { value: Value, backtrace: Backtrace },
}
Expand Down Expand Up @@ -113,6 +150,58 @@ pub fn record_batch_to_avro_rows(record_batch: &RecordBatch) -> Result<Vec<ByteV
Ok(rows)
}

pub fn avro_rows_to_record_batch(
raws: Vec<Vec<u8>>,
record_schema: RecordSchema,
) -> Result<RecordBatch> {
let avro_schema = to_avro_schema(RECORD_NAME, &record_schema);

// Collect datums and append to `ColumnBlockBuilder`s.
let mut row_buf = Vec::with_capacity(record_schema.num_columns());
let mut column_block_builders = record_schema
.columns()
.iter()
.map(|col_schema| ColumnBlockBuilder::new(&col_schema.data_type))
.collect::<Vec<_>>();

for raw in raws {
row_buf.clear();
avro_row_to_row(&avro_schema, &raw, &mut row_buf)
.map_err(|e| Box::new(e) as _)
.context(AvroRowsToRecordBatch {
msg: format!(
"parse avro raw to row failed, avro schema:{:?}, raw:{:?}",
avro_schema, raw
),
})?;
assert_eq!(row_buf.len(), column_block_builders.len());

for (col_idx, datum) in row_buf.iter().enumerate() {
let column_block_builder = column_block_builders.get_mut(col_idx).unwrap();
column_block_builder
.append(datum.clone())
.map_err(|e| Box::new(e) as _)
.context(AvroRowsToRecordBatch {
msg: format!(
"append datum to column block builder failed, datum:{:?}, builder:{:?}",
datum, column_block_builder
),
})?
}
}

// Build `RecordBatch`.
let column_blocks = column_block_builders
.into_iter()
.map(|mut builder| builder.build())
.collect::<Vec<_>>();
RecordBatch::new(record_schema, column_blocks)
.map_err(|e| Box::new(e) as _)
.context(AvroRowsToRecordBatch {
msg: "build record batch failed",
})
}

pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec<u8>]) -> Result<RowGroup> {
let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema());
let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len());
Expand All @@ -125,6 +214,43 @@ pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec<u8>]) -> Result<RowGro
Ok(builder.build())
}

pub fn row_group_to_avro_rows(row_group: RowGroup) -> Result<Vec<Vec<u8>>> {
let record_schema = row_group.schema().to_record_schema();
let column_schemas = record_schema.columns();
let avro_schema = to_avro_schema(RECORD_NAME, &record_schema);

let mut rows = Vec::with_capacity(row_group.num_rows());
let row_len = row_group.num_rows();
for row_idx in 0..row_len {
// Convert `Row` to `Record` in avro.
let row = row_group.get_row(row_idx).unwrap();
let mut avro_record = Record::new(&avro_schema).context(RowGroupToAvroRowsNoCause {
msg: format!(
"new avro record with schema failed, schema:{:?}",
avro_schema
),
})?;

for (col_idx, column_schema) in column_schemas.iter().enumerate() {
let column_value = row[col_idx].clone();
let avro_value = datum_to_avro_value(column_value, column_schema.is_nullable);
avro_record.put(&column_schema.name, avro_value);
}

let row_bytes = avro_rs::to_avro_datum(&avro_schema, avro_record)
.map_err(|e| Box::new(e) as _)
.context(RowGroupToAvroRowsWithCause {
msg: format!(
"new avro record with schema failed, schema:{:?}",
avro_schema
),
})?;
rows.push(row_bytes);
}

Ok(rows)
}

fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema {
match data_type {
DatumKind::Null => avro_rs::Schema::Null,
Expand Down Expand Up @@ -178,6 +304,10 @@ fn record_batch_to_avro(
/// Panic if row_idx is out of bound.
fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Value {
let datum = array.datum(row_idx);
datum_to_avro_value(datum, is_nullable)
}

pub fn datum_to_avro_value(datum: Datum, is_nullable: bool) -> Value {
match datum {
Datum::Null => may_union(Value::Null, is_nullable),
Datum::Timestamp(v) => may_union(Value::TimestampMillis(v.as_i64()), is_nullable),
Expand Down
2 changes: 1 addition & 1 deletion components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ workspace = true
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clru = "0.6.1"
clru = { workspace = true }
common_util = { workspace = true }
crc = "3.0.0"
futures = { workspace = true }
Expand Down
25 changes: 25 additions & 0 deletions remote_engine_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "remote_engine_client"

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true

[dependencies]
async-trait = { workspace = true }
ceresdbproto = { workspace = true }
clru = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
futures = { workspace = true }
proto = { workspace = true }
router = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
Loading

0 comments on commit 3b21fd1

Please sign in to comment.