Skip to content

Commit

Permalink
Fix bug in Avro schema resolution with defaulted logical types (Mater…
Browse files Browse the repository at this point in the history
…ializeInc#24094)

H/t @benesch for diagnosing the issue.

When reading default values of fields, we were only taking into account
physical types, not logical types.

### Motivation

  * This PR fixes a recognized bug.

Discussed in Slack:
https://materializeinc.slack.com/archives/CUFU852KT/p1703154411455399


### Tips for reviewer

### Checklist

- [x] This PR has adequate test coverage / QA involvement has been duly
considered.
- [x] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [x] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [x] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](https://github.com/MaterializeInc/cloud/pull/5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [x] This PR includes the following [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note):
- <!-- Add release notes here or explicitly state that there are no
user-facing behavior changes. -->
  • Loading branch information
umanwizard authored Dec 22, 2023
1 parent 57bf39a commit b3ef64d
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 41 deletions.
3 changes: 2 additions & 1 deletion src/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ mod tests {
}
}

fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
/// A convenience function to build timestamp values from underlying longs.
pub fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
let result = match unit {
TsUnit::Millis => NaiveDateTime::from_timestamp_millis(value),
TsUnit::Micros => NaiveDateTime::from_timestamp_micros(value),
Expand Down
103 changes: 64 additions & 39 deletions src/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ use serde::{Serialize, Serializer};
use serde_json::{self, Map, Value};
use tracing::{debug, warn};

use crate::decode::build_ts_value;
use crate::error::Error as AvroError;
use crate::reader::SchemaResolver;
use crate::types::{self, DecimalValue, Value as AvroValue};
use crate::util::MapHelper;
use crate::util::{MapHelper, TsUnit};

pub fn resolve_schemas(
writer_schema: &Schema,
Expand Down Expand Up @@ -317,14 +318,32 @@ pub enum SchemaPiece {
impl SchemaPiece {
/// Returns whether the schema node is "underlyingly" an Int (but possibly a logicalType typedef)
pub fn is_underlying_int(&self) -> bool {
matches!(self, SchemaPiece::Int | SchemaPiece::Date)
self.underlying_int_value(0).is_some()
}
/// Returns whether the schema node is "underlyingly" an Int64 (but possibly a logicalType typedef)
pub fn is_underlying_long(&self) -> bool {
matches!(
self,
SchemaPiece::Long | SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro
)
self.underlying_long_value(0).is_some()
}
/// Constructs an `avro::Value` if this is of underlying int type.
/// Guaranteed to be `Some` when `is_underlying_int` is `true`.
pub fn underlying_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
match self {
SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
// TODO[btv] - should we bounds-check the date here? We
// don't elsewhere... maybe we should everywhere.
SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
_ => None,
}
}
/// Constructs an `avro::Value` if this is of underlying long type.
/// Guaranteed to be `Some` when `is_underlying_long` is `true`.
pub fn underlying_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
match self {
SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
_ => None,
}
}
}

Expand Down Expand Up @@ -1706,41 +1725,47 @@ impl<'a> SchemaNode<'a> {
},
(Null, SchemaPiece::Null) => AvroValue::Null,
(Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
(Number(n), piece) => match piece {
SchemaPiece::Int => {
let i = n
.as_i64()
.and_then(|i| i32::try_from(i).ok())
.ok_or_else(|| {
ParseSchemaError(format!("{} is not a 32-bit integer", n))
(Number(n), piece) => {
match piece {
piece if piece.is_underlying_int() => {
let i =
n.as_i64()
.and_then(|i| i32::try_from(i).ok())
.ok_or_else(|| {
ParseSchemaError(format!("{} is not a 32-bit integer", n))
})?;
piece.underlying_int_value(i).unwrap().map_err(|e| {
ParseSchemaError(format!("invalid default int {i}: {e}"))
})?
}
piece if piece.is_underlying_long() => {
let i = n.as_i64().ok_or_else(|| {
ParseSchemaError(format!("{} is not a 64-bit integer", n))
})?;
AvroValue::Int(i)
}
SchemaPiece::Long => {
let i = n.as_i64().ok_or_else(|| {
ParseSchemaError(format!("{} is not a 64-bit integer", n))
})?;
AvroValue::Long(i)
}
SchemaPiece::Float => {
let f = n
.as_f64()
.ok_or_else(|| ParseSchemaError(format!("{} is not a 32-bit float", n)))?;
AvroValue::Float(f as f32)
}
SchemaPiece::Double => {
let f = n
.as_f64()
.ok_or_else(|| ParseSchemaError(format!("{} is not a 64-bit float", n)))?;
AvroValue::Double(f)
}
_ => {
return Err(ParseSchemaError(format!(
"Unexpected number in default: {}",
n
)))
piece.underlying_long_value(i).unwrap().map_err(|e| {
ParseSchemaError(format!("invalid default long {i}: {e}"))
})?
}
SchemaPiece::Float => {
let f = n.as_f64().ok_or_else(|| {
ParseSchemaError(format!("{} is not a 32-bit float", n))
})?;
AvroValue::Float(f as f32)
}
SchemaPiece::Double => {
let f = n.as_f64().ok_or_else(|| {
ParseSchemaError(format!("{} is not a 64-bit float", n))
})?;
AvroValue::Double(f)
}
_ => {
return Err(ParseSchemaError(format!(
"Unexpected number in default: {}",
n
)))
}
}
},
}
(String(s), piece)
if s.eq_ignore_ascii_case("nan")
&& (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
Expand Down
39 changes: 39 additions & 0 deletions test/testdrive/avro-resolution-logical-type-default.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

#
# Attempt to resovle schemas with a logical type involving a default
#

$ set writer={"type": "record", "name": "row", "fields": [ ] }
$ set reader={"type": "record", "name": "row", "fields": [ {"name": "f1", "default": 0, "type": { "logicalType": "timestamp-micros", "type": "long" } } ] }

$ kafka-create-topic topic=resolution

$ kafka-ingest format=avro topic=resolution schema=${writer} timestamp=1
{}

$ kafka-ingest format=avro topic=resolution schema=${reader} timestamp=2
{"f1": 123 }

> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL '${testdrive.schema-registry-url}'
);

> CREATE CONNECTION kafka_conn
TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);

> CREATE SOURCE resolution
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-resolution-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE

> SELECT f1 FROM resolution
"1970-01-01 00:00:00"
"1970-01-01 00:00:00.000123"
1 change: 0 additions & 1 deletion test/testdrive/avro-resolution-union-concrete.td
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
Expand Down

0 comments on commit b3ef64d

Please sign in to comment.