Skip to content

Commit

Permalink
fix: fix conversion by avro (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint authored Dec 29, 2022
1 parent 56081b0 commit 1660efe
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 38 deletions.
5 changes: 5 additions & 0 deletions common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ impl RecordSchema {
self.column_schemas.column(i)
}

pub fn column_by_name(&self, name: &str) -> Option<&ColumnSchema> {
let index = self.column_schemas.index_of(name)?;
Some(self.column_schemas.column(index))
}

pub fn to_arrow_schema_ref(&self) -> ArrowSchemaRef {
self.arrow_schema.clone()
}
Expand Down
122 changes: 84 additions & 38 deletions common_util/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub enum Error {
},

#[snafu(display("Failed to convert to avro record, err:{}", source))]
ConvertToAvroRecord {
ParseAvroRecord {
source: Box<dyn std::error::Error + Send + Sync>,
},

Expand All @@ -48,6 +48,23 @@ pub enum Error {
))]
InvalidAvroRecord { value: Value, backtrace: Backtrace },

#[snafu(display(
"Column not found in record schema, column:{}.\nBacktrace:\n{}",
column,
backtrace
))]
ColumnNotFound {
column: String,
backtrace: Backtrace,
},

#[snafu(display(
"Invalid avro record, expect record, value:{:?}.\nBacktrace:\n{}",
value,
backtrace
))]
AvroRowsToRowNoCause { value: Value, backtrace: Backtrace },

#[snafu(display(
"Failed to convert avro rows to record batch, msg:{}, err:{}",
msg,
Expand Down Expand Up @@ -85,8 +102,12 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))]
UnsupportedType { value: Value, backtrace: Backtrace },
#[snafu(display("Unsupported conversion from avro value to datum, value:{:?}, datum_type:{}.\nBacktrace:\n{}", value, datum_type, backtrace))]
UnsupportedConversion {
value: Value,
datum_type: DatumKind,
backtrace: Backtrace,
},
}

define_result!(Error);
Expand Down Expand Up @@ -166,7 +187,7 @@ pub fn avro_rows_to_record_batch(

for raw in raws {
row_buf.clear();
avro_row_to_row(&avro_schema, &raw, &mut row_buf)
avro_row_to_row(&avro_schema, &record_schema, &raw, &mut row_buf)
.map_err(|e| Box::new(e) as _)
.context(AvroRowsToRecordBatch {
msg: format!(
Expand Down Expand Up @@ -203,11 +224,12 @@ pub fn avro_rows_to_record_batch(
}

pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec<u8>]) -> Result<RowGroup> {
let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema());
let record_schema = schema.to_record_schema();
let avro_schema = to_avro_schema(RECORD_NAME, &record_schema);
let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len());
for raw_row in rows {
let mut row = Vec::with_capacity(schema.num_columns());
avro_row_to_row(&avro_schema, raw_row, &mut row)?;
avro_row_to_row(&avro_schema, &record_schema, raw_row, &mut row)?;
builder.push_checked_row(Row::from_datums(row));
}

Expand Down Expand Up @@ -315,10 +337,9 @@ pub fn datum_to_avro_value(datum: Datum, is_nullable: bool) -> Value {
Datum::Float(v) => may_union(Value::Float(v), is_nullable),
Datum::Varbinary(v) => may_union(Value::Bytes(v.to_vec()), is_nullable),
Datum::String(v) => may_union(Value::String(v.to_string()), is_nullable),
// TODO(yingwen): Should we return error if overflow? Avro does not support uint64.
Datum::UInt64(v) => may_union(Value::Long(v as i64), is_nullable),
Datum::Int64(v) => may_union(Value::Long(v), is_nullable),
Datum::UInt32(v) => may_union(Value::Long(i64::from(v)), is_nullable),
Datum::UInt32(v) => may_union(Value::Int(v as i32), is_nullable),
Datum::UInt16(v) => may_union(Value::Int(i32::from(v)), is_nullable),
Datum::UInt8(v) => may_union(Value::Int(i32::from(v)), is_nullable),
Datum::Int32(v) => may_union(Value::Int(v), is_nullable),
Expand All @@ -332,32 +353,31 @@ pub fn datum_to_avro_value(datum: Datum, is_nullable: bool) -> Value {
///
/// Some types defined by avro are not used and the conversion rule is totally
/// based on the implementation in the server.
fn avro_value_to_datum(value: Value) -> Result<Datum> {
let datum = match value {
Value::Null => Datum::Null,
Value::TimestampMillis(v) => Datum::Timestamp(Timestamp::new(v)),
Value::Double(v) => Datum::Double(v),
Value::Float(v) => Datum::Float(v),
Value::Bytes(v) => Datum::Varbinary(Bytes::from(v)),
Value::String(v) => Datum::String(StringBytes::from(v)),
// FIXME: Now the server converts both uint64 and int64 into`Value::Long` because uint64 is
// not supported by avro, that is to say something may go wrong in some corner case.
Value::Long(v) => Datum::Int64(v),
Value::Int(v) => Datum::Int32(v),
Value::Boolean(v) => Datum::Boolean(v),
Value::Union(inner_val) => avro_value_to_datum(*inner_val)?,
Value::Fixed(_, _)
| Value::Enum(_, _)
| Value::Array(_)
| Value::Map(_)
| Value::Record(_)
| Value::Date(_)
| Value::Decimal(_)
| Value::TimeMillis(_)
| Value::TimeMicros(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
| Value::Uuid(_) => return UnsupportedType { value }.fail(),
fn avro_value_to_datum(value: Value, datum_type: DatumKind) -> Result<Datum> {
let datum = match (value, datum_type) {
(Value::Null, _) => Datum::Null,
(Value::TimestampMillis(v), DatumKind::Timestamp) => Datum::Timestamp(Timestamp::new(v)),
(Value::Double(v), DatumKind::Double) => Datum::Double(v),
(Value::Float(v), DatumKind::Float) => Datum::Float(v),
(Value::Bytes(v), DatumKind::Varbinary) => Datum::Varbinary(Bytes::from(v)),
(Value::String(v), DatumKind::String) => Datum::String(StringBytes::from(v)),
(Value::Boolean(v), DatumKind::Boolean) => Datum::Boolean(v),
(Value::Long(v), DatumKind::Int64) => Datum::Int64(v),
(Value::Long(v), DatumKind::UInt64) => Datum::UInt64(v as u64),
(Value::Int(v), DatumKind::Int8) => Datum::Int8(v as i8),
(Value::Int(v), DatumKind::UInt8) => Datum::UInt8(v as u8),
(Value::Int(v), DatumKind::Int16) => Datum::Int16(v as i16),
(Value::Int(v), DatumKind::UInt16) => Datum::UInt16(v as u16),
(Value::Int(v), DatumKind::Int32) => Datum::Int32(v),
(Value::Int(v), DatumKind::UInt32) => Datum::UInt32(v as u32),
(Value::Union(inner_val), _) => avro_value_to_datum(*inner_val, datum_type)?,
(other_value, _) => {
return UnsupportedConversion {
value: other_value,
datum_type,
}
.fail()
}
};

Ok(datum)
Expand All @@ -372,13 +392,24 @@ fn may_union(val: Value, is_nullable: bool) -> Value {
}
}

fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec<Datum>) -> Result<()> {
fn avro_row_to_row(
schema: &AvroSchema,
record_schema: &RecordSchema,
mut raw: &[u8],
row: &mut Vec<Datum>,
) -> Result<()> {
let record = avro_rs::from_avro_datum(schema, &mut raw, None)
.map_err(|e| Box::new(e) as _)
.context(ConvertToAvroRecord)?;
.context(ParseAvroRecord)?;
if let Value::Record(cols) = record {
for (_, column_value) in cols {
let datum = avro_value_to_datum(column_value)?;
for (column_name, column_value) in cols {
let column_schema =
record_schema
.column_by_name(&column_name)
.context(ColumnNotFound {
column: column_name,
})?;
let datum = avro_value_to_datum(column_value, column_schema.data_type)?;
row.push(datum);
}

Expand All @@ -387,3 +418,18 @@ fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec<Datum>) ->
InvalidAvroRecord { value: record }.fail()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_avro_value_to_datum_overflow() {
let overflow_value = i64::MAX as u64 + 1;
let avro_value = Value::Long(overflow_value as i64);
let datum = avro_value_to_datum(avro_value, DatumKind::UInt64).unwrap();
let expected = Datum::UInt64(overflow_value);

assert_eq!(datum, expected);
}
}

0 comments on commit 1660efe

Please sign in to comment.