Skip to content

Commit

Permalink
AVRO-3631: Add serde serialize_with functions
Browse files Browse the repository at this point in the history
Those should be used for hinting the serialization process how to serialize a byte array to Value::(Bytes|Fixed)

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
  • Loading branch information
martin-g committed Oct 6, 2022
1 parent 3e3f125 commit 7c9e938
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 22 deletions.
1 change: 1 addition & 0 deletions lang/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions lang/rust/avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ uuid = { default-features = false, version = "1.1.2", features = ["serde", "std"
xz2 = { default-features = false, version = "0.1.7", optional = true }
zerocopy = { default-features = false, version = "0.6.1" }
zstd = { default-features = false, version = "0.11.2+zstd.1.5.2", optional = true }
ref_thread_local = { default-features = false, version = "0.1.1" }


[target.'cfg(target_arch = "wasm32")'.dependencies]
quad-rand = { default-features = false, version = "0.2.1" }
Expand Down
2 changes: 1 addition & 1 deletion lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ pub use reader::{
from_avro_datum, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
};
pub use schema::{AvroSchema, Schema};
pub use ser::to_value;
pub use ser::{avro_serialize_bytes, avro_serialize_fixed, to_value};
pub use util::max_allocation_bytes;
pub use writer::{to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};

Expand Down
135 changes: 120 additions & 15 deletions lang/rust/avro/src/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,25 @@

//! Logic for serde-compatible serialization.
use crate::{types::Value, Error};
use ref_thread_local::ref_thread_local;
use ref_thread_local::RefThreadLocal;
use serde::{ser, Serialize};
use std::{collections::HashMap, iter::once};

ref_thread_local! {
/// A thread local that is used to decide how to serialize
/// a byte array into Avro `types::Value`.
///
/// Depends on the fact that serde's serialization process is single-threaded!
static managed BYTES_TYPE: BytesType = BytesType::Bytes;
}

/// A hint helping in the serialization of a byte arrays (&[u8], [u8; N])
enum BytesType {
Bytes,
Fixed,
}

#[derive(Clone, Default)]
pub struct Serializer {}

Expand Down Expand Up @@ -174,7 +190,10 @@ impl<'b> ser::Serializer for &'b mut Serializer {
}

fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
Ok(Value::Fixed(v.len(), v.to_owned()))
match *BYTES_TYPE.borrow() {
BytesType::Bytes => Ok(Value::Bytes(v.to_owned())),
BytesType::Fixed => Ok(Value::Fixed(v.len(), v.to_owned())),
}
}

fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Expand Down Expand Up @@ -473,7 +492,7 @@ impl<'a> ser::SerializeStructVariant for StructVariantSerializer<'a> {
}
}

/// Interpret a serializeable instance as a `Value`.
/// Interpret a serializable instance as a `Value`.
///
/// This conversion can fail if the value is not valid as per the Avro specification.
/// e.g: HashMap with non-string keys
Expand All @@ -482,6 +501,42 @@ pub fn to_value<S: Serialize>(value: S) -> Result<Value, Error> {
value.serialize(&mut serializer)
}

/// A function that could be used by #[serde(serialize_with = ...)] to give a
/// hint to Avro's `Serializer` how to serialize a byte array like `[u8; N]` to
/// `Value::Fixed`
#[allow(dead_code)]
pub fn avro_serialize_fixed<S>(value: &[u8], serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
serialize_bytes_type(value, serializer, BytesType::Fixed)
}

/// A function that could be used by #[serde(serialize_with = ...)] to give a
/// hint to Avro's `Serializer` how to serialize a byte array like `&[u8]` to
/// `Value::Bytes`
#[allow(dead_code)]
pub fn avro_serialize_bytes<S>(value: &[u8], serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
serialize_bytes_type(value, serializer, BytesType::Bytes)
}

fn serialize_bytes_type<S>(
value: &[u8],
serializer: S,
bytes_type: BytesType,
) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
*BYTES_TYPE.borrow_mut() = bytes_type;
let res = serializer.serialize_bytes(value);
*BYTES_TYPE.borrow_mut() = BytesType::Bytes;
res
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1002,20 +1057,70 @@ mod tests {
#[test]
fn avro_3631_test_to_value_fixed_field() {
#[derive(Debug, Serialize, Deserialize)]
struct TestStructFixedField {
#[serde(with = "serde_bytes")]
field: [u8; 6],
struct TestStructFixedField<'a> {
// will be serialized as Value::Array<Vec<Value::Int>>
array_field: &'a [u8],

// will be serialized as Value::Fixed
#[serde(serialize_with = "avro_serialize_fixed")]
fixed_field: [u8; 6],
#[serde(serialize_with = "avro_serialize_fixed")]
fixed_field2: &'a [u8],

// will be serialized as Value::Bytes
#[serde(serialize_with = "avro_serialize_bytes")]
bytes_field: &'a [u8],
#[serde(serialize_with = "avro_serialize_bytes")]
bytes_field2: [u8; 6],

// will be serialized as Value::Array<Vec<Value::Int>>
vec_field: Vec<u8>,
}

let test = TestStructFixedField { field: [1; 6] };
let expected = Value::Record(vec![(
"field".to_owned(),
Value::Fixed(6, Vec::from(test.field.clone())),
)]);
assert_eq!(
expected,
to_value(test).unwrap(),
"error serializing fixed array"
);
let test = TestStructFixedField {
array_field: &[1, 11, 111],
bytes_field: &[2, 22, 222],
bytes_field2: [2; 6],
fixed_field: [1; 6],
fixed_field2: &[6, 66],
vec_field: vec![3, 33],
};
let expected = Value::Record(vec![
(
"array_field".to_owned(),
Value::Array(
test.array_field
.iter()
.map(|i| Value::Int(*i as i32))
.collect(),
),
),
(
"fixed_field".to_owned(),
Value::Fixed(6, Vec::from(test.fixed_field.clone())),
),
(
"fixed_field2".to_owned(),
Value::Fixed(2, Vec::from(test.fixed_field2.clone())),
),
(
"bytes_field".to_owned(),
Value::Bytes(Vec::from(test.bytes_field.clone())),
),
(
"bytes_field2".to_owned(),
Value::Bytes(Vec::from(test.bytes_field2.clone())),
),
(
"vec_field".to_owned(),
Value::Array(
test.vec_field
.iter()
.map(|i| Value::Int(*i as i32))
.collect(),
),
),
]);
assert_eq!(expected, to_value(test).unwrap());
}
}
31 changes: 25 additions & 6 deletions lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,10 @@ impl Value {
}
})
}
(_v, _s) => Some("Unsupported value-schema combination".to_string()),
(v, s) => Some(format!(
"Unsupported value-schema combination: \nValue: {:?},\nSchema: {:?}",
v, s
)),
}
}

Expand Down Expand Up @@ -2397,12 +2400,20 @@ Field with name '"b"' is not a member of the map items"#,
#[test]
fn avro_3631_test_serialize_fixed_fields() {
#[derive(Debug, Serialize, Deserialize)]
struct TestStructFixedField {
struct TestStructFixedField<'a> {
bytes_field: &'a [u8],
vec_field: Vec<u8>,
#[serde(with = "serde_bytes")]
field: [u8; 6],
fixed_field: [u8; 6],
// #[serde(with = "serde_bytes")]
// #[serde(with = "serde_bytes")]
}

let test = TestStructFixedField { field: [1; 6] };
let test = TestStructFixedField {
bytes_field: &[1, 2, 3],
fixed_field: [1; 6],
vec_field: vec![2, 3, 4],
};
let value: Value = to_value(test).unwrap();
let schema = Schema::parse_str(
r#"
Expand All @@ -2411,9 +2422,17 @@ Field with name '"b"' is not a member of the map items"#,
"name": "TestStructFixedField",
"fields": [
{
"name": "field",
"name": "bytes_field",
"type": "bytes"
},
{
"name": "vec_field",
"type": "bytes"
},
{
"name": "fixed_field",
"type": {
"name": "field",
"name": "fixed_field",
"type": "fixed",
"size": 6
}
Expand Down
37 changes: 37 additions & 0 deletions lang/rust/avro/tests/ser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use apache_avro::to_value;
use apache_avro::types::Value;
use serde::{Deserialize, Serialize};

#[test]
fn avro_3631_visibility_of_avro_serialize_bytes_type() {
use apache_avro::{avro_serialize_bytes, avro_serialize_fixed};

#[derive(Debug, Serialize, Deserialize)]
struct TestStructFixedField<'a> {
// will be serialized as Value::Bytes
#[serde(serialize_with = "avro_serialize_bytes")]
bytes_field: &'a [u8],

// will be serialized as Value::Fixed
#[serde(serialize_with = "avro_serialize_fixed")]
fixed_field: [u8; 6],
}

let test = TestStructFixedField {
bytes_field: &[2, 22, 222],
fixed_field: [1; 6],
};

let expected = Value::Record(vec![
(
"bytes_field".to_owned(),
Value::Bytes(Vec::from(test.bytes_field.clone())),
),
(
"fixed_field".to_owned(),
Value::Fixed(6, Vec::from(test.fixed_field.clone())),
),
]);

assert_eq!(expected, to_value(test).unwrap());
}

0 comments on commit 7c9e938

Please sign in to comment.