Skip to content

Commit

Permalink
repr: support binary encoding of adt::Range
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Loiselle committed Feb 24, 2023
1 parent c7e5a35 commit 7d26720
Show file tree
Hide file tree
Showing 5 changed files with 1,196 additions and 1,130 deletions.
24 changes: 22 additions & 2 deletions src/pgrepr/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use mz_repr::adt::array::ArrayDimension;
use mz_repr::adt::char;
use mz_repr::adt::date::Date;
use mz_repr::adt::jsonb::JsonbRef;
use mz_repr::adt::range::Range;
use mz_repr::adt::range::{Range, RangeInner};
use mz_repr::adt::timestamp::CheckedTimestamp;
use mz_repr::strconv::{self, Nestable};
use mz_repr::{Datum, RelationType, Row, RowArena, ScalarType};
Expand Down Expand Up @@ -482,7 +482,27 @@ impl Value {
Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
Value::Range(_) => Err("binary encodings of range types not yet implemented".into()),
Value::Range(range) => {
buf.put_u8(range.pg_flag_bits());

let elem_type = match ty {
Type::Range { element_type } => element_type,
_ => unreachable!(),
};

if let Some(RangeInner { lower, upper }) = &range.inner {
for bound in [&lower.bound, &upper.bound] {
if let Some(bound) = bound {
let base = buf.len();
buf.put_i32(0);
bound.encode_binary(elem_type, buf)?;
let len = pg_len("encoded range bound", buf.len() - base - 4)?;
buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
}
}
}
Ok(postgres_types::IsNull::No)
}
}
.expect("encode_binary should never trigger a to_sql failure");
if let IsNull::Yes = is_null {
Expand Down
62 changes: 61 additions & 1 deletion src/repr/src/adt/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use super::numeric::Numeric;
include!(concat!(env!("OUT_DIR"), "/mz_repr.adt.range.rs"));

bitflags! {
pub(crate) struct Flags: u8 {
pub(crate) struct InternalFlags: u8 {
const EMPTY = 1;
const LB_INCLUSIVE = 1 << 1;
const LB_INFINITE = 1 << 2;
Expand All @@ -39,6 +39,16 @@ bitflags! {
}
}

bitflags! {
pub(crate) struct PgFlags: u8 {
const EMPTY = 0b0000_0001;
const LB_INCLUSIVE = 0b0000_0010;
const UB_INCLUSIVE = 0b0000_0100;
const LB_INFINITE = 0b0000_1000;
const UB_INFINITE = 0b0001_0000;
}
}

/// A range of values along the domain `D`.
///
/// `D` is generic to facilitate interoperating over multiple representation,
Expand Down Expand Up @@ -173,6 +183,56 @@ impl<D> Range<D> {
}
}

/// Get the flag bits appropriate to use in our internal (i.e. row) encoding
/// of range values.
///
/// Note that this differs from the flags appropriate to encode with
/// Postgres, which has `UB_INFINITE` and `LB_INCLUSIVE` in the alternate
/// position.
pub fn internal_flag_bits(&self) -> u8 {
let mut flags = InternalFlags::empty();

match &self.inner {
None => {
flags.set(InternalFlags::EMPTY, true);
}
Some(RangeInner { lower, upper }) => {
flags.set(InternalFlags::EMPTY, false);
flags.set(InternalFlags::LB_INFINITE, lower.bound.is_none());
flags.set(InternalFlags::UB_INFINITE, upper.bound.is_none());
flags.set(InternalFlags::LB_INCLUSIVE, lower.inclusive);
flags.set(InternalFlags::UB_INCLUSIVE, upper.inclusive);
}
}

flags.bits()
}

/// Get the flag bits appropriate to use in PG-compatible encodings of range
/// values.
///
/// Note that this differs from the flags appropriate for our internal
/// encoding, which has `UB_INFINITE` and `LB_INCLUSIVE` in the alternate
/// position.
pub fn pg_flag_bits(&self) -> u8 {
let mut flags = PgFlags::empty();

match &self.inner {
None => {
flags.set(PgFlags::EMPTY, true);
}
Some(RangeInner { lower, upper }) => {
flags.set(PgFlags::EMPTY, false);
flags.set(PgFlags::LB_INFINITE, lower.bound.is_none());
flags.set(PgFlags::UB_INFINITE, upper.bound.is_none());
flags.set(PgFlags::LB_INCLUSIVE, lower.inclusive);
flags.set(PgFlags::UB_INCLUSIVE, upper.inclusive);
}
}

flags.bits()
}

/// Converts `self` from having bounds of type `D` to type `O`, converting
/// the current bounds using `conv`.
pub fn into_bounds<F, O>(self, conv: F) -> Range<O>
Expand Down
59 changes: 23 additions & 36 deletions src/repr/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,37 +617,37 @@ unsafe fn read_datum<'a>(data: &'a [u8], offset: &mut usize) -> Datum<'a> {
Tag::Range => {
// See notes on `push_range_with` for details about encoding.
let flag_byte = read_byte(data, offset);
let flags =
range::Flags::from_bits(flag_byte).expect("range flags must be encoded validly");
let flags = range::InternalFlags::from_bits(flag_byte)
.expect("range flags must be encoded validly");

if flags.contains(range::Flags::EMPTY) {
if flags.contains(range::InternalFlags::EMPTY) {
assert!(
flags == range::Flags::EMPTY,
flags == range::InternalFlags::EMPTY,
"empty ranges contain only RANGE_EMPTY flag"
);

return Datum::Range(Range { inner: None });
}

let lower_bound = if flags.contains(range::Flags::LB_INFINITE) {
let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
None
} else {
Some(DatumNested::extract(data, offset))
};

let lower = RangeBound {
inclusive: flags.contains(range::Flags::LB_INCLUSIVE),
inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
bound: lower_bound,
};

let upper_bound = if flags.contains(range::Flags::UB_INFINITE) {
let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
None
} else {
Some(DatumNested::extract(data, offset))
};

let upper = RangeBound {
inclusive: flags.contains(range::Flags::UB_INCLUSIVE),
inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
bound: upper_bound,
};

Expand Down Expand Up @@ -875,30 +875,17 @@ where
}
}
}
Datum::Range(Range { inner }) => {
Datum::Range(range) => {
// See notes on `push_range_with` for details about encoding.
data.push(Tag::Range.into());

match inner {
None => {
data.push(range::Flags::EMPTY.bits());
}
Some(RangeInner { lower, upper }) => {
let mut flags = range::Flags::empty();

flags.set(range::Flags::LB_INFINITE, lower.bound.is_none());
flags.set(range::Flags::UB_INFINITE, upper.bound.is_none());
flags.set(range::Flags::LB_INCLUSIVE, lower.inclusive);
flags.set(range::Flags::UB_INCLUSIVE, upper.inclusive);

data.push(flags.bits());

for bound in [lower.bound, upper.bound] {
if let Some(bound) = bound {
match bound.datum() {
Datum::Null => panic!("cannot push Datum::Null into range"),
d => push_datum(data, d),
}
data.push(range.internal_flag_bits());

if let Some(RangeInner { lower, upper }) = range.inner {
for bound in [lower.bound, upper.bound] {
if let Some(bound) = bound {
match bound.datum() {
Datum::Null => panic!("cannot push Datum::Null into range"),
d => push_datum(data, d),
}
}
}
Expand Down Expand Up @@ -1389,7 +1376,7 @@ impl RowPacker<'_> {
None => {
self.row.data.push(Tag::Range.into());
// Untagged bytes only contains the `RANGE_EMPTY` flag value.
self.row.data.push(range::Flags::EMPTY.bits());
self.row.data.push(range::InternalFlags::EMPTY.bits());
Ok(())
}
Some(inner) => self.push_range_with(
Expand Down Expand Up @@ -1446,12 +1433,12 @@ impl RowPacker<'_> {
let start = self.row.data.len();
self.row.data.push(Tag::Range.into());

let mut flags = range::Flags::empty();
let mut flags = range::InternalFlags::empty();

flags.set(range::Flags::LB_INFINITE, lower.bound.is_none());
flags.set(range::Flags::UB_INFINITE, upper.bound.is_none());
flags.set(range::Flags::LB_INCLUSIVE, lower.inclusive);
flags.set(range::Flags::UB_INCLUSIVE, upper.inclusive);
flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);

let mut expected_datums = 0;

Expand Down
Loading

0 comments on commit 7d26720

Please sign in to comment.