Skip to content

Commit 2d5cc10

Browse files
umanwizardbenesch
authored andcommitted
Correctly decode negative timestamps with Avro (#16609)
1 parent d5f3829 commit 2d5cc10

File tree

2 files changed

+110
-20
lines changed

2 files changed

+110
-20
lines changed

src/avro/src/decode.rs

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
// The original source code is subject to the terms of the MIT license, a copy
2222
// of which can be found in the LICENSE file at the root of this repository.
2323

24-
use std::cmp;
24+
use std::cmp::{self, Ordering};
2525
use std::fmt::{self, Display};
2626
use std::fs::File;
2727
use std::io::{self, Cursor, Read, Seek, SeekFrom};
@@ -110,23 +110,116 @@ impl Display for TsUnit {
110110
}
111111
}
112112

113+
#[cfg(test)]
114+
mod tests {
115+
use chrono::NaiveDateTime;
116+
117+
use crate::types::Value;
118+
use crate::util::TsUnit;
119+
120+
use super::build_ts_value;
121+
122+
#[test]
123+
fn test_negative_timestamps() {
124+
// TODO[btv] The currently released `from_timestamp_millis` is buggy,
125+
// so we use `from_timestamp_opt` everywhere here.
126+
//
127+
// See discussion at https://github.com/chronotope/chrono/issues/903 .
128+
// We should update to the new version of Chrono whenever that
129+
// goes to master.
130+
assert_eq!(
131+
build_ts_value(-1, TsUnit::Millis).unwrap(),
132+
Value::Timestamp(NaiveDateTime::from_timestamp_opt(-1, 999_000_000).unwrap())
133+
);
134+
assert_eq!(
135+
build_ts_value(-1000, TsUnit::Millis).unwrap(),
136+
Value::Timestamp(NaiveDateTime::from_timestamp_opt(-1, 0).unwrap())
137+
);
138+
assert_eq!(
139+
build_ts_value(-1000, TsUnit::Micros).unwrap(),
140+
Value::Timestamp(NaiveDateTime::from_timestamp_opt(-1, 999_000_000).unwrap())
141+
);
142+
assert_eq!(
143+
build_ts_value(-1, TsUnit::Micros).unwrap(),
144+
Value::Timestamp(NaiveDateTime::from_timestamp_opt(-1, 999_999_000).unwrap())
145+
);
146+
assert_eq!(
147+
build_ts_value(-123_456_789_123, TsUnit::Micros).unwrap(),
148+
Value::Timestamp(
149+
NaiveDateTime::from_timestamp_opt(-123_457, (1_000_000 - 789_123) * 1_000).unwrap()
150+
)
151+
);
152+
}
153+
}
154+
113155
fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
156+
// The algorithm here is taken from NaiveDateTime::from_timestamp_millis
157+
// on the unreleased 0.4.x branch,
158+
// made general to work with either millis or micros.
159+
//
160+
// That function is reproduced below for clarity.
161+
//
162+
// pub fn from_timestamp_millis(millis: i64) -> Option<NaiveDateTime> {
163+
// let (secs, subsec_millis) = (millis / 1000, millis % 1000);
164+
165+
// match subsec_millis.cmp(&0) {
166+
// Ordering::Less => {
167+
// // in the case where our subsec part is negative, then we are actually in the earlier second
168+
// // hence we subtract one from the seconds part, and we then add a whole second worth of nanos
169+
// // to our nanos part. Due to the use of u32 datatype, it is more convenient to subtract
170+
// // the absolute value of the subsec nanos from a whole second worth of nanos
171+
// let nsecs = u32::try_from(subsec_millis.abs()).ok()? * NANOS_IN_MILLISECOND;
172+
// NaiveDateTime::from_timestamp_opt(
173+
// secs.checked_sub(1)?,
174+
// NANOS_IN_SECOND.checked_sub(nsecs)?,
175+
// )
176+
// }
177+
// Ordering::Equal => NaiveDateTime::from_timestamp_opt(secs, 0),
178+
// Ordering::Greater => {
179+
// // convert the subsec millis into nanosecond scale so they can be supplied
180+
// // as the nanoseconds parameter
181+
// let nsecs = u32::try_from(subsec_millis).ok()? * NANOS_IN_MILLISECOND;
182+
// NaiveDateTime::from_timestamp_opt(secs, nsecs)
183+
// }
184+
// }
185+
// }
186+
const NANOS_PER_SECOND: u32 = 1_000_000_000;
114187
let units_per_second = match unit {
115188
TsUnit::Millis => 1_000,
116189
TsUnit::Micros => 1_000_000,
117190
};
118-
let nanos_per_unit = 1_000_000_000 / units_per_second as u32;
119-
let seconds = value / units_per_second;
120-
let fraction = (value % units_per_second) as u32;
121-
Ok(Value::Timestamp(
122-
NaiveDateTime::from_timestamp_opt(seconds, fraction * nanos_per_unit).ok_or(
123-
AvroError::Decode(DecodeError::BadTimestamp {
124-
unit,
125-
seconds,
126-
fraction,
127-
}),
128-
)?,
129-
))
191+
let nanos_per_unit = NANOS_PER_SECOND / units_per_second as u32;
192+
193+
let (secs, subsec_units) = (value / units_per_second, value % units_per_second);
194+
// See comment in copied Chrono code above for explanation of what's
195+
// going on in this match statement.
196+
//
197+
// TODO[btv] - The expects below should never fail and are just here to document assumptions.
198+
// Since they're potentially being called in a tight loop,
199+
// we can optimize with `as` or unsafe code, if this ever proves to
200+
// be a bottleneck.
201+
let result = match subsec_units.cmp(&0) {
202+
Ordering::Less => {
203+
let nsecs = u32::try_from(subsec_units.abs())
204+
.expect("abs(subsec_units) can't be greater than 1M")
205+
* nanos_per_unit;
206+
NaiveDateTime::from_timestamp_opt(
207+
secs.checked_sub(1)
208+
.expect("secs is the result of a division by at least 1000"),
209+
NANOS_PER_SECOND
210+
.checked_sub(nsecs)
211+
.expect("abs(nsecs) can't be greater than 1B"),
212+
)
213+
}
214+
Ordering::Equal => NaiveDateTime::from_timestamp_opt(secs, 0),
215+
Ordering::Greater => {
216+
let nsecs = u32::try_from(subsec_units).expect("subsec_units can't be greater than 1M")
217+
* nanos_per_unit;
218+
NaiveDateTime::from_timestamp_opt(secs, nsecs)
219+
}
220+
};
221+
let ndt = result.ok_or(AvroError::Decode(DecodeError::BadTimestamp { unit, value }))?;
222+
Ok(Value::Timestamp(ndt))
130223
}
131224

132225
/// A convenience trait for types that are both readable and skippable.

src/avro/src/error.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ pub enum DecodeError {
4141
ExpectedNonnegInteger(i64),
4242
BadTimestamp {
4343
unit: TsUnit,
44-
seconds: i64,
45-
fraction: u32,
44+
value: i64,
4645
},
4746
BadBoolean(u8),
4847
BadDate(i32),
@@ -108,11 +107,9 @@ impl DecodeError {
108107
DecodeError::ExpectedNonnegInteger(i) => {
109108
write!(f, "Expected non-negative integer, got {}", i)
110109
}
111-
DecodeError::BadTimestamp {
112-
unit,
113-
seconds,
114-
fraction,
115-
} => write!(f, "Invalid {} timestamp {}.{}", unit, seconds, fraction),
110+
DecodeError::BadTimestamp { unit, value } => {
111+
write!(f, "Invalid timestamp {value} {unit}")
112+
}
116113
DecodeError::BadBoolean(byte) => write!(f, "Invalid boolean: {:x}", byte),
117114
DecodeError::BadDate(since_epoch) => {
118115
write!(f, "Invalid num days since epoch: {}", since_epoch)

0 commit comments

Comments
 (0)