Skip to content

Commit 6123956

Browse files
authored
Add Decimal type support to arrow-avro (#7832)
# Which issue does this PR close? - Part of #4886 - Related to #6965 # Rationale for this change This PR addresses a feature gap by introducing support for the Avro `decimal` logical type, which is currently unimplemented as indicated by the `test_decimal_logical_type_not_implemented` test case. The `decimal` type is crucial for handling precise numerical data common in financial and scientific applications, making this a necessary addition for broader Avro compatibility. # What changes are included in this PR? This PR introduces the necessary changes to both parse and decode the Avro `decimal` logical type into the corresponding Arrow `Decimal128` or `Decimal256` data types. The main changes are: 1. **Schema Parsing (`codec.rs`):** * Implemented the logic within `make_data_type` to correctly parse the `decimal` logical type from the Avro schema. * The `Codec` enum's `Decimal` variant now correctly stores the precision, scale, and optional fixed-size from the schema's attributes. 2. **Decoding Logic (`record.rs`):** * Added `Decoder::Decimal128` and `Decoder::Decimal256` variants to handle decoding of decimal values from both `bytes` and `fixed` Avro types. * The implementation correctly handles sign extension for negative numbers to ensure accurate representation in Arrow's decimal arrays. # Are these changes tested? This PR includes comprehensive tests to validate the new functionality: * The existing `test_decimal_logical_type_not_implemented` test has been replaced with concrete test cases. * Added unit tests in `record.rs` (`test_decimal_decoding_fixed256`, `test_decimal_decoding_fixed128`, `test_decimal_decoding_bytes_with_nulls`, etc.) to cover various scenarios, including: * Decoding from Avro `fixed` and `bytes` primitive types. * Handling different precisions to select between `Decimal128` and `Decimal256`. * Correctly processing null values within decimal arrays. # Are there any user-facing changes? N/A
1 parent 248ee73 commit 6123956

File tree

4 files changed

+312
-22
lines changed

4 files changed

+312
-22
lines changed

arrow-avro/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ crc = { version = "3.0", optional = true }
5353

5454
[dev-dependencies]
5555
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }
56-
criterion = { version = "0.5", default-features = false }
56+
criterion = { version = "0.6.0", default-features = false }
5757
tempfile = "3.3"
5858
arrow = { workspace = true }
5959

arrow-avro/benches/avro_reader.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ fn bench_array_creation(c: &mut Criterion) {
163163
)
164164
.unwrap();
165165

166-
criterion::black_box(batch)
166+
std::hint::black_box(batch)
167167
})
168168
});
169169

@@ -187,7 +187,7 @@ fn bench_array_creation(c: &mut Criterion) {
187187
)
188188
.unwrap();
189189

190-
criterion::black_box(batch)
190+
std::hint::black_box(batch)
191191
})
192192
});
193193
}
@@ -214,7 +214,7 @@ fn bench_string_operations(c: &mut Criterion) {
214214
for i in 0..rows {
215215
sum_len += string_array.value(i).len();
216216
}
217-
criterion::black_box(sum_len)
217+
std::hint::black_box(sum_len)
218218
})
219219
});
220220

@@ -224,7 +224,7 @@ fn bench_string_operations(c: &mut Criterion) {
224224
for i in 0..rows {
225225
sum_len += string_view_array.value(i).len();
226226
}
227-
criterion::black_box(sum_len)
227+
std::hint::black_box(sum_len)
228228
})
229229
});
230230
}
@@ -246,15 +246,15 @@ fn bench_avro_reader(c: &mut Criterion) {
246246
b.iter(|| {
247247
let options = ReadOptions::default();
248248
let batch = read_avro_test_file(file_path, &options).unwrap();
249-
criterion::black_box(batch)
249+
std::hint::black_box(batch)
250250
})
251251
});
252252

253253
group.bench_function(format!("string_view_{str_length}_chars"), |b| {
254254
b.iter(|| {
255255
let options = ReadOptions::default().with_utf8view(true);
256256
let batch = read_avro_test_file(file_path, &options).unwrap();
257-
criterion::black_box(batch)
257+
std::hint::black_box(batch)
258258
})
259259
});
260260
}

arrow-avro/src/codec.rs

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
// under the License.
1717

1818
use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName};
19+
use arrow_schema::DataType::{Decimal128, Decimal256};
1920
use arrow_schema::{
20-
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit,
21+
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, SchemaBuilder, SchemaRef,
22+
TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
2123
};
2224
use std::borrow::Cow;
2325
use std::collections::HashMap;
@@ -192,6 +194,13 @@ pub enum Codec {
192194
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
193195
/// The i32 parameter indicates the fixed binary size
194196
Fixed(i32),
197+
/// Represents Avro decimal type, maps to Arrow's Decimal128 or Decimal256 data types
198+
///
199+
/// The fields are `(precision, scale, fixed_size)`.
200+
/// - `precision` (`usize`): Total number of digits.
201+
/// - `scale` (`Option<usize>`): Number of fractional digits.
202+
/// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
203+
Decimal(usize, Option<usize>, Option<usize>),
195204
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
196205
Uuid,
197206
/// Represents Avro array type, maps to Arrow's List data type
@@ -227,6 +236,22 @@ impl Codec {
227236
}
228237
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
229238
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
239+
Self::Decimal(precision, scale, size) => {
240+
let p = *precision as u8;
241+
let s = scale.unwrap_or(0) as i8;
242+
let too_large_for_128 = match *size {
243+
Some(sz) => sz > 16,
244+
None => {
245+
(p as usize) > DECIMAL128_MAX_PRECISION as usize
246+
|| (s as usize) > DECIMAL128_MAX_SCALE as usize
247+
}
248+
};
249+
if too_large_for_128 {
250+
Decimal256(p, s)
251+
} else {
252+
Decimal128(p, s)
253+
}
254+
}
230255
Self::Uuid => DataType::FixedSizeBinary(16),
231256
Self::List(f) => {
232257
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
@@ -267,6 +292,32 @@ impl From<PrimitiveType> for Codec {
267292
}
268293
}
269294

295+
fn parse_decimal_attributes(
296+
attributes: &Attributes,
297+
fallback_size: Option<usize>,
298+
precision_required: bool,
299+
) -> Result<(usize, usize, Option<usize>), ArrowError> {
300+
let precision = attributes
301+
.additional
302+
.get("precision")
303+
.and_then(|v| v.as_u64())
304+
.or(if precision_required { None } else { Some(10) })
305+
.ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
306+
as usize;
307+
let scale = attributes
308+
.additional
309+
.get("scale")
310+
.and_then(|v| v.as_u64())
311+
.unwrap_or(0) as usize;
312+
let size = attributes
313+
.additional
314+
.get("size")
315+
.and_then(|v| v.as_u64())
316+
.map(|s| s as usize)
317+
.or(fallback_size);
318+
Ok((precision, scale, size))
319+
}
320+
270321
impl Codec {
271322
/// Converts a string codec to use Utf8View if requested
272323
///
@@ -412,7 +463,6 @@ fn make_data_type<'a>(
412463
let size = f.size.try_into().map_err(|e| {
413464
ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
414465
})?;
415-
416466
let field = AvroDataType {
417467
nullability: None,
418468
metadata: f.attributes.field_metadata(),
@@ -443,11 +493,27 @@ fn make_data_type<'a>(
443493

444494
// https://avro.apache.org/docs/1.11.1/specification/#logical-types
445495
match (t.attributes.logical_type, &mut field.codec) {
446-
(Some("decimal"), c @ Codec::Fixed(_)) => {
447-
return Err(ArrowError::NotYetImplemented(
448-
"Decimals are not currently supported".to_string(),
449-
))
450-
}
496+
(Some("decimal"), c) => match *c {
497+
Codec::Fixed(sz_val) => {
498+
let (prec, sc, size_opt) =
499+
parse_decimal_attributes(&t.attributes, Some(sz_val as usize), true)?;
500+
let final_sz = if let Some(sz_actual) = size_opt {
501+
sz_actual
502+
} else {
503+
sz_val as usize
504+
};
505+
*c = Codec::Decimal(prec, Some(sc), Some(final_sz));
506+
}
507+
Codec::Binary => {
508+
let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
509+
*c = Codec::Decimal(prec, Some(sc), None);
510+
}
511+
_ => {
512+
return Err(ArrowError::SchemaError(format!(
513+
"Decimal logical type can only be backed by Fixed or Bytes, found {c:?}"
514+
)))
515+
}
516+
},
451517
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
452518
(Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
453519
(Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,

0 commit comments

Comments
 (0)