Skip to content

Commit

Permalink
feat: impl remote_engine grpc service (#508)
Browse files Browse the repository at this point in the history
* feat: impl remote_engine grpc service

* chore: refactor code

* chore: refactor code

* refactor by CR

* refactor by CR

* make CI happy
  • Loading branch information
chunshao90 authored Dec 27, 2022
1 parent cd3f976 commit 03f5f78
Show file tree
Hide file tree
Showing 19 changed files with 761 additions and 49 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::{fmt, sync::Arc};

use snafu::{ensure, Backtrace, ResultExt, Snafu};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::{
column_schema::{ColumnSchema, ReadOp},
Expand Down Expand Up @@ -36,6 +36,14 @@ pub enum Error {
backtrace
))]
MissingReadColumn { name: String, backtrace: Backtrace },

#[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
EmptyTableSchema { backtrace: Backtrace },

#[snafu(display("Failed to covert table schema, err:{}", source))]
ConvertTableSchema {
source: Box<dyn std::error::Error + Send + Sync>,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -147,6 +155,24 @@ impl ProjectedSchema {
}
}

impl TryFrom<proto::common::ProjectedSchema> for ProjectedSchema {
type Error = Error;

fn try_from(pb: proto::common::ProjectedSchema) -> std::result::Result<Self, Self::Error> {
let schema: Schema = pb
.table_schema
.context(EmptyTableSchema)?
.try_into()
.map_err(|e| Box::new(e) as _)
.context(ConvertTableSchema)?;
let projection = pb
.projection
.map(|v| v.idx.into_iter().map(|id| id as usize).collect());

ProjectedSchema::new(schema, projection)
}
}

/// Schema with projection informations
struct ProjectedSchemaInner {
/// The schema before projection that the reader intended to read, may
Expand Down
2 changes: 2 additions & 0 deletions common_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ test = ["env_logger"]
[dependencies]
# In alphabetical order
arrow = { workspace = true }
avro-rs = "0.13"
backtrace = "0.3.9"
chrono = { workspace = true }
common_types = { workspace = true, features = ["test"] }
Expand All @@ -30,6 +31,7 @@ prometheus = { workspace = true }
proto = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
time = "0.1"
tokio = { workspace = true }
Expand Down
99 changes: 95 additions & 4 deletions server/src/avro_util.rs → common_util/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ use std::collections::HashMap;
use avro_rs::{
schema::{Name, RecordField, RecordFieldOrder},
types::{Record, Value},
Schema as AvroSchema,
};
use common_types::{
bytes::ByteVec,
bytes::{ByteVec, Bytes},
column::ColumnBlock,
datum::{Datum, DatumKind},
record_batch::RecordBatch,
schema::RecordSchema,
row::{Row, RowGroup, RowGroupBuilder},
schema::{RecordSchema, Schema},
string::StringBytes,
time::Timestamp,
};
use common_util::define_result;
use snafu::{Backtrace, ResultExt, Snafu};

/// Schema name of the record
const RECORD_NAME: &str = "Result";

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
Expand All @@ -29,6 +35,21 @@ pub enum Error {
source: avro_rs::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to convert to avro record, err:{}", source))]
ConvertToAvroRecord {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Invalid avro record, expect record, value:{:?}.\nBacktrace:\n{}",
value,
backtrace
))]
InvalidAvroRecord { value: Value, backtrace: Backtrace },

#[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))]
UnsupportedType { value: Value, backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -85,6 +106,25 @@ pub fn to_avro_schema(name: &str, schema: &RecordSchema) -> avro_rs::Schema {
}
}

pub fn record_batch_to_avro_rows(record_batch: &RecordBatch) -> Result<Vec<ByteVec>> {
let mut rows = Vec::new();
let avro_schema = to_avro_schema(RECORD_NAME, record_batch.schema());
record_batch_to_avro(record_batch, &avro_schema, &mut rows)?;
Ok(rows)
}

pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec<u8>]) -> Result<RowGroup> {
let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema());
let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len());
for raw_row in rows {
let mut row = Vec::with_capacity(schema.num_columns());
avro_row_to_row(&avro_schema, raw_row, &mut row)?;
builder.push_checked_row(Row::from_datums(row));
}

Ok(builder.build())
}

fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema {
match data_type {
DatumKind::Null => avro_rs::Schema::Null,
Expand All @@ -104,7 +144,7 @@ fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema {
}

/// Convert record batch to avro format
pub fn record_batch_to_avro(
fn record_batch_to_avro(
record_batch: &RecordBatch,
schema: &avro_rs::Schema,
rows: &mut Vec<ByteVec>,
Expand Down Expand Up @@ -158,6 +198,41 @@ fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Va
}
}

/// Convert the avro `Value` into the `Datum`.
///
/// Some types defined by avro are not used and the conversion rule is totally
/// based on the implementation in the server.
fn avro_value_to_datum(value: Value) -> Result<Datum> {
let datum = match value {
Value::Null => Datum::Null,
Value::TimestampMillis(v) => Datum::Timestamp(Timestamp::new(v)),
Value::Double(v) => Datum::Double(v),
Value::Float(v) => Datum::Float(v),
Value::Bytes(v) => Datum::Varbinary(Bytes::from(v)),
Value::String(v) => Datum::String(StringBytes::from(v)),
// FIXME: Now the server converts both uint64 and int64 into`Value::Long` because uint64 is
// not supported by avro, that is to say something may go wrong in some corner case.
Value::Long(v) => Datum::Int64(v),
Value::Int(v) => Datum::Int32(v),
Value::Boolean(v) => Datum::Boolean(v),
Value::Union(inner_val) => avro_value_to_datum(*inner_val)?,
Value::Fixed(_, _)
| Value::Enum(_, _)
| Value::Array(_)
| Value::Map(_)
| Value::Record(_)
| Value::Date(_)
| Value::Decimal(_)
| Value::TimeMillis(_)
| Value::TimeMicros(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
| Value::Uuid(_) => return UnsupportedType { value }.fail(),
};

Ok(datum)
}

#[inline]
fn may_union(val: Value, is_nullable: bool) -> Value {
if is_nullable {
Expand All @@ -166,3 +241,19 @@ fn may_union(val: Value, is_nullable: bool) -> Value {
val
}
}

fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec<Datum>) -> Result<()> {
let record = avro_rs::from_avro_datum(schema, &mut raw, None)
.map_err(|e| Box::new(e) as _)
.context(ConvertToAvroRecord)?;
if let Value::Record(cols) = record {
for (_, column_value) in cols {
let datum = avro_value_to_datum(column_value)?;
row.push(datum);
}

Ok(())
} else {
InvalidAvroRecord { value: record }.fail()
}
}
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod macros;

// TODO(yingwen): Move some mod into components as a crate
pub mod alloc_tracker;
pub mod avro;
pub mod codec;
pub mod config;
pub mod error;
Expand Down
10 changes: 10 additions & 0 deletions proto/protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,13 @@ message TimeRange {
// exclusive end
int64 end = 2;
}

// Projected Schema
message ProjectedSchema {
common.TableSchema table_schema = 1;
Projection projection = 2;
}

message Projection {
repeated uint64 idx = 1;
}
23 changes: 9 additions & 14 deletions proto/protos/remote_engine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ message ReadOptions {
uint64 read_parallelism = 2;
}

message Projection {
repeated uint64 idx = 1;
}

message ProjectedSchema {
common.TableSchema table_schema = 1;
Projection projection = 2;
}

message Predicate {
repeated bytes exprs = 1;
common.TimeRange time_range = 2;
Expand All @@ -49,7 +40,7 @@ enum ReadOrder {
message TableReadRequest {
uint64 request_id = 1;
ReadOptions opts = 2;
ProjectedSchema projected_schema = 3;
common.ProjectedSchema projected_schema = 3;
Predicate predicate = 4;
ReadOrder order = 5;
}
Expand All @@ -61,14 +52,18 @@ message ReadRequest {

message ReadResponse {
ResponseHeader header = 1;
repeated bytes rows = 2;
// Version of row encoding method
uint32 version = 2;
repeated bytes rows = 3;
}

message RowGroup {
// Version of row encoding method
common.TableSchema table_schema = 1;
repeated bytes rows = 2;
int64 min_timestamp = 3;
int64 max_timestamp = 4;
int64 min_timestamp = 2;
int64 max_timestamp = 3;
uint32 version = 4;
repeated bytes rows = 5;
}

message WriteRequest {
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ workspace = true
analytic_engine = { workspace = true }
arrow = { workspace = true }
async-trait = { workspace = true }
avro-rs = "0.13"
bytes = { workspace = true }
catalog = { workspace = true }
ceresdbproto = { workspace = true }
Expand All @@ -36,6 +35,7 @@ profile = { workspace = true }
prometheus = { workspace = true }
prometheus-static-metric = { workspace = true }
prost = { workspace = true }
proto = { workspace = true }
query_engine = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
Expand Down
Loading

0 comments on commit 03f5f78

Please sign in to comment.