diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 863da0d7a6..945d54b658 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -416,6 +416,11 @@ impl RecordSchema { self.column_schemas.column(i) } + pub fn column_by_name(&self, name: &str) -> Option<&ColumnSchema> { + let index = self.column_schemas.index_of(name)?; + Some(self.column_schemas.column(index)) + } + pub fn to_arrow_schema_ref(&self) -> ArrowSchemaRef { self.arrow_schema.clone() } diff --git a/common_util/src/avro.rs b/common_util/src/avro.rs index 9c37c7fe07..a56585f8ca 100644 --- a/common_util/src/avro.rs +++ b/common_util/src/avro.rs @@ -37,7 +37,7 @@ pub enum Error { }, #[snafu(display("Failed to convert to avro record, err:{}", source))] - ConvertToAvroRecord { + ParseAvroRecord { source: Box, }, @@ -48,6 +48,23 @@ pub enum Error { ))] InvalidAvroRecord { value: Value, backtrace: Backtrace }, + #[snafu(display( + "Column not found in record schema, column:{}.\nBacktrace:\n{}", + column, + backtrace + ))] + ColumnNotFound { + column: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Invalid avro record, expect record, value:{:?}.\nBacktrace:\n{}", + value, + backtrace + ))] + AvroRowsToRowNoCause { value: Value, backtrace: Backtrace }, + #[snafu(display( "Failed to convert avro rows to record batch, msg:{}, err:{}", msg, @@ -85,8 +102,12 @@ pub enum Error { source: Box, }, - #[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))] - UnsupportedType { value: Value, backtrace: Backtrace }, + #[snafu(display("Unsupported conversion from avro value to datum, value:{:?}, datum_type:{}.\nBacktrace:\n{}", value, datum_type, backtrace))] + UnsupportedConversion { + value: Value, + datum_type: DatumKind, + backtrace: Backtrace, + }, } define_result!(Error); @@ -166,7 +187,7 @@ pub fn avro_rows_to_record_batch( for raw in raws { row_buf.clear(); - avro_row_to_row(&avro_schema, &raw, &mut row_buf) + avro_row_to_row(&avro_schema, &record_schema, &raw, &mut row_buf) .map_err(|e| Box::new(e) as _) .context(AvroRowsToRecordBatch { msg: format!( @@ -203,11 +224,12 @@ pub fn avro_rows_to_record_batch( } pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec]) -> Result { - let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema()); + let record_schema = schema.to_record_schema(); + let avro_schema = to_avro_schema(RECORD_NAME, &record_schema); let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len()); for raw_row in rows { let mut row = Vec::with_capacity(schema.num_columns()); - avro_row_to_row(&avro_schema, raw_row, &mut row)?; + avro_row_to_row(&avro_schema, &record_schema, raw_row, &mut row)?; builder.push_checked_row(Row::from_datums(row)); } @@ -315,10 +337,9 @@ pub fn datum_to_avro_value(datum: Datum, is_nullable: bool) -> Value { Datum::Float(v) => may_union(Value::Float(v), is_nullable), Datum::Varbinary(v) => may_union(Value::Bytes(v.to_vec()), is_nullable), Datum::String(v) => may_union(Value::String(v.to_string()), is_nullable), - // TODO(yingwen): Should we return error if overflow? Avro does not support uint64. Datum::UInt64(v) => may_union(Value::Long(v as i64), is_nullable), Datum::Int64(v) => may_union(Value::Long(v), is_nullable), - Datum::UInt32(v) => may_union(Value::Long(i64::from(v)), is_nullable), + Datum::UInt32(v) => may_union(Value::Int(v as i32), is_nullable), Datum::UInt16(v) => may_union(Value::Int(i32::from(v)), is_nullable), Datum::UInt8(v) => may_union(Value::Int(i32::from(v)), is_nullable), Datum::Int32(v) => may_union(Value::Int(v), is_nullable), @@ -332,32 +353,31 @@ pub fn datum_to_avro_value(datum: Datum, is_nullable: bool) -> Value { /// /// Some types defined by avro are not used and the conversion rule is totally /// based on the implementation in the server. -fn avro_value_to_datum(value: Value) -> Result { - let datum = match value { - Value::Null => Datum::Null, - Value::TimestampMillis(v) => Datum::Timestamp(Timestamp::new(v)), - Value::Double(v) => Datum::Double(v), - Value::Float(v) => Datum::Float(v), - Value::Bytes(v) => Datum::Varbinary(Bytes::from(v)), - Value::String(v) => Datum::String(StringBytes::from(v)), - // FIXME: Now the server converts both uint64 and int64 into`Value::Long` because uint64 is - // not supported by avro, that is to say something may go wrong in some corner case. - Value::Long(v) => Datum::Int64(v), - Value::Int(v) => Datum::Int32(v), - Value::Boolean(v) => Datum::Boolean(v), - Value::Union(inner_val) => avro_value_to_datum(*inner_val)?, - Value::Fixed(_, _) - | Value::Enum(_, _) - | Value::Array(_) - | Value::Map(_) - | Value::Record(_) - | Value::Date(_) - | Value::Decimal(_) - | Value::TimeMillis(_) - | Value::TimeMicros(_) - | Value::TimestampMicros(_) - | Value::Duration(_) - | Value::Uuid(_) => return UnsupportedType { value }.fail(), +fn avro_value_to_datum(value: Value, datum_type: DatumKind) -> Result { + let datum = match (value, datum_type) { + (Value::Null, _) => Datum::Null, + (Value::TimestampMillis(v), DatumKind::Timestamp) => Datum::Timestamp(Timestamp::new(v)), + (Value::Double(v), DatumKind::Double) => Datum::Double(v), + (Value::Float(v), DatumKind::Float) => Datum::Float(v), + (Value::Bytes(v), DatumKind::Varbinary) => Datum::Varbinary(Bytes::from(v)), + (Value::String(v), DatumKind::String) => Datum::String(StringBytes::from(v)), + (Value::Boolean(v), DatumKind::Boolean) => Datum::Boolean(v), + (Value::Long(v), DatumKind::Int64) => Datum::Int64(v), + (Value::Long(v), DatumKind::UInt64) => Datum::UInt64(v as u64), + (Value::Int(v), DatumKind::Int8) => Datum::Int8(v as i8), + (Value::Int(v), DatumKind::UInt8) => Datum::UInt8(v as u8), + (Value::Int(v), DatumKind::Int16) => Datum::Int16(v as i16), + (Value::Int(v), DatumKind::UInt16) => Datum::UInt16(v as u16), + (Value::Int(v), DatumKind::Int32) => Datum::Int32(v), + (Value::Int(v), DatumKind::UInt32) => Datum::UInt32(v as u32), + (Value::Union(inner_val), _) => avro_value_to_datum(*inner_val, datum_type)?, + (other_value, _) => { + return UnsupportedConversion { + value: other_value, + datum_type, + } + .fail() + } }; Ok(datum) @@ -372,13 +392,24 @@ fn may_union(val: Value, is_nullable: bool) -> Value { } } -fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec) -> Result<()> { +fn avro_row_to_row( + schema: &AvroSchema, + record_schema: &RecordSchema, + mut raw: &[u8], + row: &mut Vec, +) -> Result<()> { let record = avro_rs::from_avro_datum(schema, &mut raw, None) .map_err(|e| Box::new(e) as _) - .context(ConvertToAvroRecord)?; + .context(ParseAvroRecord)?; if let Value::Record(cols) = record { - for (_, column_value) in cols { - let datum = avro_value_to_datum(column_value)?; + for (column_name, column_value) in cols { + let column_schema = + record_schema + .column_by_name(&column_name) + .context(ColumnNotFound { + column: column_name, + })?; + let datum = avro_value_to_datum(column_value, column_schema.data_type)?; row.push(datum); } @@ -387,3 +418,18 @@ fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec) -> InvalidAvroRecord { value: record }.fail() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_avro_value_to_datum_overflow() { + let overflow_value = i64::MAX as u64 + 1; + let avro_value = Value::Long(overflow_value as i64); + let datum = avro_value_to_datum(avro_value, DatumKind::UInt64).unwrap(); + let expected = Datum::UInt64(overflow_value); + + assert_eq!(datum, expected); + } +}