Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3926: [Rust] Allow UUID to serialize to Fixed[16] #2676

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lang/rust/avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ fn main() -> Result<(), Error> {

`apache-avro` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):

1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/0.2.6/num_bigint) crate
1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/latest/num_bigint) crate
1. UUID using the [`uuid`](https://docs.rs/uuid/latest/uuid) crate
1. Date, Time (milli) as `i32` and Time (micro) as `i64`
1. Timestamp (milli and micro) as `i64`
1. Local timestamp (milli and micro) as `i64`
Expand Down
103 changes: 94 additions & 9 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
bigdecimal::deserialize_big_decimal,
decimal::Decimal,
duration::Duration,
encode::encode_long,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Schema,
Expand Down Expand Up @@ -121,15 +122,60 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
value => Err(Error::BytesValue(value.into())),
}
}
Schema::Uuid => Ok(Value::Uuid(
Uuid::from_str(
match decode_internal(&Schema::String, names, enclosing_namespace, reader)? {
Value::String(ref s) => s,
value => return Err(Error::GetUuidFromStringValue(value.into())),
},
)
.map_err(Error::ConvertStrToUuid)?,
)),
Schema::Uuid => {
let len = decode_len(reader)?;
let mut bytes = vec![0u8; len];
reader.read_exact(&mut bytes).map_err(Error::ReadIntoBuf)?;

// use a Vec to be able re-read the bytes more than once if needed
let mut reader = Vec::with_capacity(len + 1);
encode_long(len as i64, &mut reader);
reader.extend_from_slice(&bytes);

let decode_from_string = |reader| match decode_internal(
&Schema::String,
names,
enclosing_namespace,
reader,
)? {
Value::String(ref s) => Uuid::from_str(s).map_err(Error::ConvertStrToUuid),
value => Err(Error::GetUuidFromStringValue(value.into())),
};

let uuid: Uuid = if len == 16 {
// most probably a Fixed schema
let fixed_result = decode_internal(
&Schema::Fixed(FixedSchema {
size: 16,
name: "uuid".into(),
aliases: None,
doc: None,
attributes: Default::default(),
}),
names,
enclosing_namespace,
&mut bytes.as_slice(),
);
if fixed_result.is_ok() {
match fixed_result? {
Value::Fixed(ref size, ref bytes) => {
if *size != 16 {
return Err(Error::ConvertFixedToUuid(*size));
}
Uuid::from_slice(bytes).map_err(Error::ConvertSliceToUuid)?
}
_ => decode_from_string(&mut reader.as_slice())?,
}
} else {
// try to decode as string
decode_from_string(&mut reader.as_slice())?
}
} else {
// definitely a string
decode_from_string(&mut reader.as_slice())?
};
Ok(Value::Uuid(uuid))
}
Schema::Int => decode_int(reader),
Schema::Date => zag_i32(reader).map(Value::Date),
Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
Expand Down Expand Up @@ -317,6 +363,7 @@ mod tests {
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;

#[test]
fn test_decode_array_without_size() -> TestResult {
Expand Down Expand Up @@ -815,4 +862,42 @@ mod tests {

Ok(())
}

#[test]
fn avro_3926_encode_decode_uuid_to_string() -> TestResult {
use crate::encode::encode;

let schema = Schema::String;
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));

let result = decode(&Schema::Uuid, &mut &buffer[..])?;
assert_eq!(result, value);

Ok(())
}

#[test]
fn avro_3926_encode_decode_uuid_to_fixed() -> TestResult {
use crate::encode::encode;

let schema = Schema::Fixed(FixedSchema {
size: 16,
name: "uuid".into(),
aliases: None,
doc: None,
attributes: Default::default(),
});
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));

let result = decode(&Schema::Uuid, &mut &buffer[..])?;
assert_eq!(result, value);

Ok(())
}
}
52 changes: 46 additions & 6 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,28 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
let slice: [u8; 12] = duration.into();
buffer.extend_from_slice(&slice);
}
Value::Uuid(uuid) => encode_bytes(
// we need the call .to_string() to properly convert ASCII to UTF-8
#[allow(clippy::unnecessary_to_owned)]
&uuid.to_string(),
buffer,
),
Value::Uuid(uuid) => match *schema {
Schema::Uuid | Schema::String => encode_bytes(
// we need the call .to_string() to properly convert ASCII to UTF-8
#[allow(clippy::unnecessary_to_owned)]
&uuid.to_string(),
buffer,
),
Schema::Fixed(FixedSchema { size, .. }) => {
if size != 16 {
return Err(Error::ConvertFixedToUuid(size));
}

let bytes = uuid.as_bytes();
encode_bytes(bytes, buffer)
}
_ => {
return Err(Error::EncodeValueAsSchemaError {
value_kind: ValueKind::Uuid,
supported_schema: vec![SchemaKind::Uuid, SchemaKind::Fixed],
});
}
},
Value::BigDecimal(bg) => {
let buf: Vec<u8> = serialize_big_decimal(bg);
buffer.extend_from_slice(buf.as_slice());
Expand Down Expand Up @@ -293,8 +309,10 @@ pub fn encode_to_vec(value: &Value, schema: &Schema) -> AvroResult<Vec<u8>> {
#[allow(clippy::expect_fun_call)]
pub(crate) mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;

pub(crate) fn success(value: &Value, schema: &Schema) -> String {
format!(
Expand Down Expand Up @@ -886,4 +904,26 @@ pub(crate) mod tests {
assert!(encoded.is_ok());
assert!(!buffer.is_empty());
}

#[test]
fn avro_3926_encode_decode_uuid_to_fixed_wrong_schema_size() -> TestResult {
let schema = Schema::Fixed(FixedSchema {
size: 15,
name: "uuid".into(),
aliases: None,
doc: None,
attributes: Default::default(),
});
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
match encode(&value, &schema, &mut buffer) {
Err(Error::ConvertFixedToUuid(actual)) => {
assert_eq!(actual, 15);
}
_ => panic!("Expected Error::ConvertFixedToUuid"),
}

Ok(())
}
}
6 changes: 6 additions & 0 deletions lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ pub enum Error {
#[error("Failed to convert &str to UUID")]
ConvertStrToUuid(#[source] uuid::Error),

#[error("Failed to convert Fixed bytes to UUID. It must be exactly 16 bytes, got {0}")]
ConvertFixedToUuid(usize),

#[error("Failed to convert Fixed bytes to UUID")]
ConvertSliceToUuid(#[source] uuid::Error),

#[error("Map key is not a string; key type is {0:?}")]
MapKeyType(ValueKind),

Expand Down
4 changes: 2 additions & 2 deletions lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@
//!
//! `apache-avro` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):
//!
//! 1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/0.2.6/num_bigint) crate
//! 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
//! 1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/latest/num_bigint) crate
//! 1. UUID using the [`uuid`](https://docs.rs/uuid/latest/uuid) crate
//! 1. Date, Time (milli) as `i32` and Time (micro) as `i64`
//! 1. Timestamp (milli and micro) as `i64`
//! 1. Local timestamp (milli and micro) as `i64`
Expand Down
65 changes: 50 additions & 15 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,8 +1404,22 @@ impl Parser {
return try_convert_to_logical_type(
"uuid",
parse_as_native_complex(complex, self, enclosing_namespace)?,
&[SchemaKind::String],
|_| -> AvroResult<Schema> { Ok(Schema::Uuid) },
&[SchemaKind::String, SchemaKind::Fixed],
|schema| match schema {
Schema::String => Ok(Schema::Uuid),
Schema::Fixed(FixedSchema { size: 16, .. }) => Ok(Schema::Uuid),
Schema::Fixed(FixedSchema { size, .. }) => {
warn!("Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {:?}", schema);
Ok(schema)
}
_ => {
warn!(
"Ignoring invalid uuid logical type for schema: {:?}",
schema
);
Ok(schema)
}
},
);
}
"date" => {
Expand Down Expand Up @@ -2367,8 +2381,10 @@ pub mod derive {
#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use apache_avro_test_helper::{
logger::{assert_logged, assert_not_logged},
TestResult,
};
use serde_json::json;

#[test]
Expand Down Expand Up @@ -6299,7 +6315,7 @@ mod tests {
}

#[test]
fn test_avro_3896_uuid_schema() -> TestResult {
fn avro_3896_uuid_schema_for_string() -> TestResult {
// string uuid, represents as native logical type.
let schema = json!(
{
Expand All @@ -6310,8 +6326,11 @@ mod tests {
let parse_result = Schema::parse(&schema)?;
assert_eq!(parse_result, Schema::Uuid);

// uuid logical type is not supported for SchemaKind::Fixed, so it is parsed as Schema::Fixed
// and the `logicalType` is preserved as an attribute.
Ok(())
}

#[test]
fn avro_3926_uuid_schema_for_fixed_with_size_16() -> TestResult {
let schema = json!(
{
"type": "fixed",
Expand All @@ -6320,19 +6339,37 @@ mod tests {
"logicalType": "uuid"
});
let parse_result = Schema::parse(&schema)?;
assert_eq!(parse_result, Schema::Uuid);
assert_not_logged(
r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
);

Ok(())
}

#[test]
fn avro_3926_uuid_schema_for_fixed_with_size_different_than_16() -> TestResult {
let schema = json!(
{
"type": "fixed",
"name": "FixedUUID",
"size": 6,
"logicalType": "uuid"
});
let parse_result = Schema::parse(&schema)?;
assert_eq!(
parse_result,
Schema::Fixed(FixedSchema {
name: Name::new("FixedUUID")?,
doc: None,
aliases: None,
size: 16,
attributes: BTreeMap::from([(
"logicalType".to_string(),
Value::String(String::from("uuid")),
)]),
doc: None,
size: 6,
attributes: BTreeMap::from([("logicalType".to_string(), "uuid".into())]),
})
);
assert_logged(
r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
);

Ok(())
}
Expand Down Expand Up @@ -6380,8 +6417,6 @@ mod tests {

#[test]
fn test_avro_3899_parse_decimal_type() -> TestResult {
use apache_avro_test_helper::logger::{assert_logged, assert_not_logged};

let schema = Schema::parse_str(
r#"{
"name": "InvalidDecimal",
Expand Down