From 6ef6c955ed8de5b56ec3f2e868a013168caaa965 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Tue, 20 Jun 2023 18:05:06 +0800 Subject: [PATCH] feat: fork Column and ColumnData from influxdb_iox --- Cargo.lock | 1 + common_types/Cargo.toml | 1 + common_types/src/bitset.rs | 593 ++++++++++++++ common_types/src/column.rs | 1162 +++++---------------------- common_types/src/column_block.rs | 1018 +++++++++++++++++++++++ common_types/src/lib.rs | 6 +- common_types/src/record_batch.rs | 6 +- df_operator/src/functions.rs | 8 +- df_operator/src/udfs/time_bucket.rs | 8 +- interpreters/src/insert.rs | 10 +- proxy/src/grpc/prom_query.rs | 2 +- proxy/src/influxdb/types.rs | 2 +- table_engine/src/memory.rs | 2 +- 13 files changed, 1837 insertions(+), 982 deletions(-) create mode 100644 common_types/src/bitset.rs create mode 100644 common_types/src/column_block.rs diff --git a/Cargo.lock b/Cargo.lock index d4183bacca..0adbfab939 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1333,6 +1333,7 @@ dependencies = [ "murmur3", "paste 1.0.12", "prost", + "rand 0.7.3", "seahash", "serde", "serde_json", diff --git a/common_types/Cargo.toml b/common_types/Cargo.toml index 5f9a0c5b6b..5338347010 100644 --- a/common_types/Cargo.toml +++ b/common_types/Cargo.toml @@ -27,6 +27,7 @@ datafusion = { workspace = true, optional = true } murmur3 = "0.4.1" paste = { workspace = true } prost = { workspace = true } +rand = { workspace = true } seahash = "4.1.0" serde = { workspace = true } serde_json = { workspace = true } diff --git a/common_types/src/bitset.rs b/common_types/src/bitset.rs new file mode 100644 index 0000000000..6113c8d81e --- /dev/null +++ b/common_types/src/bitset.rs @@ -0,0 +1,593 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +// Fork from https://github.com/influxdata/influxdb_iox/blob/123ba32fb9d8941f888d79f7608448e0cd722253/arrow_util/src/bitset.rs. + +use std::ops::Range; + +use arrow::buffer::{BooleanBuffer, Buffer}; + +/// An arrow-compatible mutable bitset implementation +/// +/// Note: This currently operates on individual bytes at a time +/// it could be optimised to instead operate on usize blocks +#[derive(Debug, Default, Clone)] +pub struct BitSet { + /// The underlying data + /// + /// Data is stored in the least significant bit of a byte first + buffer: Vec, + + /// The length of this mask in bits + len: usize, +} + +impl BitSet { + /// Creates a new BitSet + pub fn new() -> Self { + Self::default() + } + + /// Creates a new BitSet with `count` unset bits. + pub fn with_size(count: usize) -> Self { + let mut bitset = Self::default(); + bitset.append_unset(count); + bitset + } + + /// Reserve space for `count` further bits + pub fn reserve(&mut self, count: usize) { + let new_buf_len = (self.len + count + 7) >> 3; + self.buffer.reserve(new_buf_len); + } + + /// Appends `count` unset bits + pub fn append_unset(&mut self, count: usize) { + self.len += count; + let new_buf_len = (self.len + 7) >> 3; + self.buffer.resize(new_buf_len, 0); + } + + /// Appends `count` set bits + pub fn append_set(&mut self, count: usize) { + let new_len = self.len + count; + let new_buf_len = (new_len + 7) >> 3; + + let skew = self.len & 7; + if skew != 0 { + *self.buffer.last_mut().unwrap() |= 0xFF << skew; + } + + self.buffer.resize(new_buf_len, 0xFF); + + let rem = new_len & 7; + if rem != 0 { + *self.buffer.last_mut().unwrap() &= (1 << rem) - 1; + } + + self.len = new_len; + } + + /// Truncates the bitset to the provided length + pub fn truncate(&mut self, len: usize) { + let new_buf_len = (len + 7) >> 3; + self.buffer.truncate(new_buf_len); + let overrun = len & 7; + if overrun > 0 { + *self.buffer.last_mut().unwrap() &= (1 << overrun) - 1; + } + self.len = len; + } + + /// Extends this [`BitSet`] by the context of `other` + pub fn extend_from(&mut self, other: &BitSet) { + self.append_bits(other.len, &other.buffer) + } + + /// Extends this [`BitSet`] by `range` elements in `other` + pub fn extend_from_range(&mut self, other: &BitSet, range: Range) { + let count = range.end - range.start; + if count == 0 { + return; + } + + let start_byte = range.start >> 3; + let end_byte = (range.end + 7) >> 3; + let skew = range.start & 7; + + // `append_bits` requires the provided `to_set` to be byte aligned, therefore + // if the range being copied is not byte aligned we must first append + // the leading bits to reach a byte boundary + if skew == 0 { + // No skew can simply append bytes directly + self.append_bits(count, &other.buffer[start_byte..end_byte]) + } else if start_byte + 1 == end_byte { + // Append bits from single byte + self.append_bits(count, &[other.buffer[start_byte] >> skew]) + } else { + // Append trailing bits from first byte to reach byte boundary, then append + // bits from the remaining byte-aligned mask + let offset = 8 - skew; + self.append_bits(offset, &[other.buffer[start_byte] >> skew]); + self.append_bits(count - offset, &other.buffer[(start_byte + 1)..end_byte]); + } + } + + /// Appends `count` boolean values from the slice of packed bits + pub fn append_bits(&mut self, count: usize, to_set: &[u8]) { + assert_eq!((count + 7) >> 3, to_set.len()); + + let new_len = self.len + count; + let new_buf_len = (new_len + 7) >> 3; + self.buffer.reserve(new_buf_len - self.buffer.len()); + + let whole_bytes = count >> 3; + let overrun = count & 7; + + let skew = self.len & 7; + if skew == 0 { + self.buffer.extend_from_slice(&to_set[..whole_bytes]); + if overrun > 0 { + let masked = to_set[whole_bytes] & ((1 << overrun) - 1); + self.buffer.push(masked) + } + + self.len = new_len; + debug_assert_eq!(self.buffer.len(), new_buf_len); + return; + } + + for to_set_byte in &to_set[..whole_bytes] { + let low = *to_set_byte << skew; + let high = *to_set_byte >> (8 - skew); + + *self.buffer.last_mut().unwrap() |= low; + self.buffer.push(high); + } + + if overrun > 0 { + let masked = to_set[whole_bytes] & ((1 << overrun) - 1); + let low = masked << skew; + *self.buffer.last_mut().unwrap() |= low; + + if overrun > 8 - skew { + let high = masked >> (8 - skew); + self.buffer.push(high) + } + } + + self.len = new_len; + debug_assert_eq!(self.buffer.len(), new_buf_len); + } + + /// Sets a given bit + pub fn set(&mut self, idx: usize) { + assert!(idx <= self.len); + + let byte_idx = idx >> 3; + let bit_idx = idx & 7; + self.buffer[byte_idx] |= 1 << bit_idx; + } + + /// Returns if the given index is set + pub fn get(&self, idx: usize) -> bool { + assert!(idx <= self.len); + + let byte_idx = idx >> 3; + let bit_idx = idx & 7; + (self.buffer[byte_idx] >> bit_idx) & 1 != 0 + } + + /// Converts this BitSet to a buffer compatible with arrows boolean encoding + pub fn to_arrow(&self) -> BooleanBuffer { + let offset = 0; + BooleanBuffer::new(Buffer::from(&self.buffer), offset, self.len) + } + + /// Returns the number of values stored in the bitset + pub fn len(&self) -> usize { + self.len + } + + /// Returns if this bitset is empty + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Returns the number of bytes used by this bitset + pub fn byte_len(&self) -> usize { + self.buffer.len() + } + + /// Return the raw packed bytes used by this bitset + pub fn bytes(&self) -> &[u8] { + &self.buffer + } + + /// Return `true` if all bits in the [`BitSet`] are currently set. + pub fn is_all_set(&self) -> bool { + // An empty bitmap has no set bits. + if self.len == 0 { + return false; + } + + // Check all the bytes in the bitmap that have all their bits considered + // part of the bit set. + let full_blocks = (self.len / 8).saturating_sub(1); + if !self.buffer.iter().take(full_blocks).all(|&v| v == u8::MAX) { + return false; + } + + // Check the last byte of the bitmap that may only be partially part of + // the bit set, and therefore need masking to check only the relevant + // bits. + let mask = match self.len % 8 { + 1..=8 => !(0xFF << (self.len % 8)), // LSB mask + 0 => 0xFF, + _ => unreachable!(), + }; + *self.buffer.last().unwrap() == mask + } + + /// Return `true` if all bits in the [`BitSet`] are currently unset. + pub fn is_all_unset(&self) -> bool { + self.buffer.iter().all(|&v| v == 0) + } +} + +/// Returns an iterator over set bit positions in increasing order +pub fn iter_set_positions(bytes: &[u8]) -> impl Iterator + '_ { + iter_set_positions_with_offset(bytes, 0) +} + +/// Returns an iterator over set bit positions in increasing order starting +/// at the provided bit offset +pub fn iter_set_positions_with_offset( + bytes: &[u8], + offset: usize, +) -> impl Iterator + '_ { + let mut byte_idx = offset >> 3; + let mut in_progress = bytes.get(byte_idx).cloned().unwrap_or(0); + + let skew = offset & 7; + in_progress &= 0xFF << skew; + + std::iter::from_fn(move || loop { + if in_progress != 0 { + let bit_pos = in_progress.trailing_zeros(); + in_progress ^= 1 << bit_pos; + return Some((byte_idx << 3) + (bit_pos as usize)); + } + byte_idx += 1; + in_progress = *bytes.get(byte_idx)?; + }) +} + +#[cfg(test)] +mod tests { + use arrow::array::BooleanBufferBuilder; + use rand::{prelude::*, rngs::OsRng}; + + use super::*; + + /// Computes a compacted representation of a given bool array + fn compact_bools(bools: &[bool]) -> Vec { + bools + .chunks(8) + .map(|x| { + let mut collect = 0_u8; + for (idx, set) in x.iter().enumerate() { + if *set { + collect |= 1 << idx + } + } + collect + }) + .collect() + } + + fn iter_set_bools(bools: &[bool]) -> impl Iterator + '_ { + bools.iter().enumerate().filter_map(|(x, y)| y.then(|| x)) + } + + #[test] + fn test_compact_bools() { + let bools = &[ + false, false, true, true, false, false, true, false, true, false, + ]; + let collected = compact_bools(bools); + let indexes: Vec<_> = iter_set_bools(bools).collect(); + assert_eq!(collected.as_slice(), &[0b01001100, 0b00000001]); + assert_eq!(indexes.as_slice(), &[2, 3, 6, 8]) + } + + #[test] + fn test_bit_mask() { + let mut mask = BitSet::new(); + + mask.append_bits(8, &[0b11111111]); + let d1 = mask.buffer.clone(); + + mask.append_bits(3, &[0b01010010]); + let d2 = mask.buffer.clone(); + + mask.append_bits(5, &[0b00010100]); + let d3 = mask.buffer.clone(); + + mask.append_bits(2, &[0b11110010]); + let d4 = mask.buffer.clone(); + + mask.append_bits(15, &[0b11011010, 0b01010101]); + let d5 = mask.buffer.clone(); + + assert_eq!(d1.as_slice(), &[0b11111111]); + assert_eq!(d2.as_slice(), &[0b11111111, 0b00000010]); + assert_eq!(d3.as_slice(), &[0b11111111, 0b10100010]); + assert_eq!(d4.as_slice(), &[0b11111111, 0b10100010, 0b00000010]); + assert_eq!( + d5.as_slice(), + &[0b11111111, 0b10100010, 0b01101010, 0b01010111, 0b00000001] + ); + + assert!(mask.get(0)); + assert!(!mask.get(8)); + assert!(mask.get(9)); + assert!(mask.get(19)); + } + + fn make_rng() -> StdRng { + let seed = OsRng::default().next_u64(); + println!("Seed: {seed}"); + StdRng::seed_from_u64(seed) + } + + #[test] + fn test_bit_mask_all_set() { + let mut mask = BitSet::new(); + let mut all_bools = vec![]; + let mut rng = make_rng(); + + for _ in 0..100 { + let mask_length = (rng.next_u32() % 50) as usize; + let bools: Vec<_> = std::iter::repeat(true).take(mask_length).collect(); + + let collected = compact_bools(&bools); + mask.append_bits(mask_length, &collected); + all_bools.extend_from_slice(&bools); + } + + let collected = compact_bools(&all_bools); + assert_eq!(mask.buffer, collected); + + let expected_indexes: Vec<_> = iter_set_bools(&all_bools).collect(); + let actual_indexes: Vec<_> = iter_set_positions(&mask.buffer).collect(); + assert_eq!(expected_indexes, actual_indexes); + } + + #[test] + fn test_bit_mask_fuzz() { + let mut mask = BitSet::new(); + let mut all_bools = vec![]; + let mut rng = make_rng(); + + for _ in 0..100 { + let mask_length = (rng.next_u32() % 50) as usize; + let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) + .take(mask_length) + .collect(); + + let collected = compact_bools(&bools); + mask.append_bits(mask_length, &collected); + all_bools.extend_from_slice(&bools); + } + + let collected = compact_bools(&all_bools); + assert_eq!(mask.buffer, collected); + + let expected_indexes: Vec<_> = iter_set_bools(&all_bools).collect(); + let actual_indexes: Vec<_> = iter_set_positions(&mask.buffer).collect(); + assert_eq!(expected_indexes, actual_indexes); + + if !all_bools.is_empty() { + for _ in 0..10 { + let offset = rng.next_u32() as usize % all_bools.len(); + + let expected_indexes: Vec<_> = iter_set_bools(&all_bools[offset..]) + .map(|x| x + offset) + .collect(); + + let actual_indexes: Vec<_> = + iter_set_positions_with_offset(&mask.buffer, offset).collect(); + + assert_eq!(expected_indexes, actual_indexes); + } + } + + for index in actual_indexes { + assert!(mask.get(index)); + } + } + + #[test] + fn test_append_fuzz() { + let mut mask = BitSet::new(); + let mut all_bools = vec![]; + let mut rng = make_rng(); + + for _ in 0..100 { + let len = (rng.next_u32() % 32) as usize; + let set = rng.next_u32() & 1 == 0; + + match set { + true => mask.append_set(len), + false => mask.append_unset(len), + } + + all_bools.extend(std::iter::repeat(set).take(len)); + + let collected = compact_bools(&all_bools); + assert_eq!(mask.buffer, collected); + } + } + + #[test] + fn test_truncate_fuzz() { + let mut mask = BitSet::new(); + let mut all_bools = vec![]; + let mut rng = make_rng(); + + for _ in 0..100 { + let mask_length = (rng.next_u32() % 32) as usize; + let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) + .take(mask_length) + .collect(); + + let collected = compact_bools(&bools); + mask.append_bits(mask_length, &collected); + all_bools.extend_from_slice(&bools); + + if !all_bools.is_empty() { + let truncate = rng.next_u32() as usize % all_bools.len(); + mask.truncate(truncate); + all_bools.truncate(truncate); + } + + let collected = compact_bools(&all_bools); + assert_eq!(mask.buffer, collected); + } + } + + #[test] + fn test_extend_range_fuzz() { + let mut rng = make_rng(); + let src_len = 32; + let src_bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) + .take(src_len) + .collect(); + + let mut src_mask = BitSet::new(); + src_mask.append_bits(src_len, &compact_bools(&src_bools)); + + let mut dst_bools = Vec::new(); + let mut dst_mask = BitSet::new(); + + for _ in 0..100 { + let a = rng.next_u32() as usize % src_len; + let b = rng.next_u32() as usize % src_len; + + let start = a.min(b); + let end = a.max(b); + + dst_bools.extend_from_slice(&src_bools[start..end]); + dst_mask.extend_from_range(&src_mask, start..end); + + let collected = compact_bools(&dst_bools); + assert_eq!(dst_mask.buffer, collected); + } + } + + #[test] + fn test_arrow_compat() { + let bools = &[ + false, false, true, true, false, false, true, false, true, false, false, true, + ]; + + let mut builder = BooleanBufferBuilder::new(bools.len()); + builder.append_slice(bools); + let buffer = builder.finish(); + + let collected = compact_bools(bools); + let mut mask = BitSet::new(); + mask.append_bits(bools.len(), &collected); + let mask_buffer = mask.to_arrow(); + + assert_eq!(collected.as_slice(), buffer.as_slice()); + assert_eq!(buffer.as_slice(), mask_buffer.into_inner().as_slice()); + } + + #[test] + #[should_panic = "idx <= self.len"] + fn test_bitset_set_get_out_of_bounds() { + let mut v = BitSet::with_size(4); + + // The bitset is of length 4, which is backed by a single byte with 8 + // bits of storage capacity. + // + // Accessing bits past the 4 the bitset "contains" should not succeed. + + v.get(5); + v.set(5); + } + + #[test] + fn test_all_set_unset() { + for i in 1..100 { + let mut v = BitSet::new(); + v.append_set(i); + assert!(v.is_all_set()); + assert!(!v.is_all_unset()); + } + } + + #[test] + fn test_all_set_unset_multi_byte() { + let mut v = BitSet::new(); + + // Bitmap is composed of entirely set bits. + v.append_set(100); + assert!(v.is_all_set()); + assert!(!v.is_all_unset()); + + // Now the bitmap is neither composed of entirely set, nor entirely + // unset bits. + v.append_unset(1); + assert!(!v.is_all_set()); + assert!(!v.is_all_unset()); + + let mut v = BitSet::new(); + + // Bitmap is composed of entirely unset bits. + v.append_unset(100); + assert!(!v.is_all_set()); + assert!(v.is_all_unset()); + + // And once again, it is neither all set, nor all unset. + v.append_set(1); + assert!(!v.is_all_set()); + assert!(!v.is_all_unset()); + } + + #[test] + fn test_all_set_unset_single_byte() { + let mut v = BitSet::new(); + + // Bitmap is composed of entirely set bits. + v.append_set(2); + assert!(v.is_all_set()); + assert!(!v.is_all_unset()); + + // Now the bitmap is neither composed of entirely set, nor entirely + // unset bits. + v.append_unset(1); + assert!(!v.is_all_set()); + assert!(!v.is_all_unset()); + + let mut v = BitSet::new(); + + // Bitmap is composed of entirely unset bits. + v.append_unset(2); + assert!(!v.is_all_set()); + assert!(v.is_all_unset()); + + // And once again, it is neither all set, nor all unset. + v.append_set(1); + assert!(!v.is_all_set()); + assert!(!v.is_all_unset()); + } + + #[test] + fn test_all_set_unset_empty() { + let v = BitSet::new(); + assert!(!v.is_all_set()); + assert!(v.is_all_unset()); + } +} diff --git a/common_types/src/column.rs b/common_types/src/column.rs index 4c09a84644..570994a421 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -1,1018 +1,250 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -//! Column -use std::sync::Arc; +// Fork from https://github.com/influxdata/influxdb_iox/blob/7d878b21bd78cf7d0618804c1ccf8506521703bd/mutable_batch/src/column.rs. + +//! A [`Column`] stores the rows for a given column name + +use std::{fmt::Formatter, mem, sync::Arc}; use arrow::{ array::{ - Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, - Date32Array as DateArray, Date32Builder as DateBuilder, Float32Array as FloatArray, - Float32Builder as FloatBuilder, Float64Array as DoubleArray, - Float64Builder as DoubleBuilder, Int16Array, Int16Builder, Int32Array, Int32Builder, - Int64Array, Int64Builder, Int8Array, Int8Builder, NullArray, StringArray, StringBuilder, - Time64NanosecondArray as TimeArray, Time64NanosecondBuilder as TimeBuilder, - TimestampMillisecondArray, TimestampMillisecondBuilder, UInt16Array, UInt16Builder, - UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder, + ArrayDataBuilder, ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, + TimestampNanosecondArray, UInt64Array, }, - datatypes::{DataType, TimeUnit}, + buffer::NullBuffer, + datatypes::DataType, error::ArrowError, }; -use datafusion::physical_plan::{ - expressions::{cast_column, DEFAULT_DATAFUSION_CAST_OPTIONS}, - ColumnarValue, -}; -use paste::paste; -use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; -use crate::{ - bytes::Bytes, - datum::{Datum, DatumKind, DatumView}, - string::StringBytes, - time::Timestamp, -}; +use crate::{bitset::BitSet, datum::DatumKind}; #[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] pub enum Error { #[snafu(display( - "Invalid array type, datum_kind:{:?}, data_type:{:?}.\nBacktrace:\n{}", - datum_kind, - data_type, - backtrace - ))] - InvalidArrayType { - datum_kind: DatumKind, - data_type: DataType, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to append value, err:{}.\nBacktrace:\n{}", source, backtrace))] - Append { - source: ArrowError, - backtrace: Backtrace, - }, - - #[snafu(display( - "Data type conflict, expect:{:?}, given:{:?}.\nBacktrace:\n{}", - expect, - given, - backtrace - ))] - ConflictType { - expect: DatumKind, - given: DatumKind, - backtrace: Backtrace, - }, - - #[snafu(display( - "Failed to convert arrow data type, data_type:{}.\nBacktrace:\n{}", - data_type, - backtrace + "Invalid null mask, expected to be {} bytes but was {}", + expected_bytes, + actual_bytes ))] - UnsupportedArray { - data_type: DataType, - backtrace: Backtrace, + InvalidNullMask { + expected_bytes: usize, + actual_bytes: usize, }, - #[snafu(display( - "Failed to cast nanosecond to millisecond, data_type:{}. err:{}", - data_type, - source, - ))] - CastTimestamp { - data_type: DataType, - source: datafusion::error::DataFusionError, - }, - - #[snafu(display("Operation not yet implemented."))] - NotImplemented, -} - -pub type Result = std::result::Result; - -#[derive(Debug)] -pub struct NullColumn(NullArray); - -impl NullColumn { - fn new_null(rows: usize) -> Self { - Self(NullArray::new(rows)) - } - - /// Only the first datum of NullColumn is considered not duplicated. - #[inline] - pub fn dedup(&self, selected: &mut [bool]) { - if !self.0.is_empty() { - selected[0] = true; - } - } -} - -// TODO(yingwen): Builder for columns. - -macro_rules! define_numeric_column { - ($($Kind: ident), *) => { - $(paste! { - #[derive(Debug)] - pub struct [<$Kind Column>]([<$Kind Array>]); - - #[inline] - fn [](array: &[<$Kind Array>], index: usize) -> Datum { - let value = array.value(index); - Datum::$Kind(value) - } - - #[inline] - fn [](array: &[<$Kind Array>], index: usize) -> DatumView { - let value = array.value(index); - DatumView::$Kind(value) - } - })* - } -} - -define_numeric_column!( - Float, Double, UInt64, UInt32, UInt16, UInt8, Int64, Int32, Int16, Int8, Boolean -); - -#[derive(Debug)] -pub struct TimestampColumn(TimestampMillisecondArray); - -#[derive(Debug)] -pub struct VarbinaryColumn(BinaryArray); - -#[derive(Debug)] -pub struct StringColumn(StringArray); - -#[derive(Debug)] -pub struct DateColumn(DateArray); - -#[derive(Debug)] -pub struct TimeColumn(TimeArray); - -#[inline] -fn get_null_datum_view(_array: &NullArray, _index: usize) -> DatumView { - DatumView::Null -} - -#[inline] -fn get_timestamp_datum_view(array: &TimestampMillisecondArray, index: usize) -> DatumView { - let value = array.value(index); - DatumView::Timestamp(Timestamp::new(value)) -} - -#[inline] -fn get_varbinary_datum_view(array: &BinaryArray, index: usize) -> DatumView { - let value = array.value(index); - DatumView::Varbinary(value) -} - -#[inline] -fn get_string_datum_view(array: &StringArray, index: usize) -> DatumView { - let value = array.value(index); - DatumView::String(value) -} - -#[inline] -fn get_date_datum_view(array: &DateArray, index: usize) -> DatumView { - let value = array.value(index); - DatumView::Date(value) -} - -#[inline] -fn get_time_datum_view(array: &TimeArray, index: usize) -> DatumView { - let value = array.value(index); - DatumView::Time(value) -} - -#[inline] -fn get_null_datum(_array: &NullArray, _index: usize) -> Datum { - Datum::Null -} - -#[inline] -fn get_timestamp_datum(array: &TimestampMillisecondArray, index: usize) -> Datum { - let value = array.value(index); - Datum::Timestamp(Timestamp::new(value)) -} - -// TODO(yingwen): Avoid clone of data. -// Require a clone. -#[inline] -fn get_varbinary_datum(array: &BinaryArray, index: usize) -> Datum { - let value = array.value(index); - Datum::Varbinary(Bytes::copy_from_slice(value)) -} - -// TODO(yingwen): Avoid clone of data. -// Require a clone. -#[inline] -fn get_string_datum(array: &StringArray, index: usize) -> Datum { - let value = array.value(index); - Datum::String(StringBytes::copy_from_str(value)) -} - -#[inline] -fn get_date_datum(array: &DateArray, index: usize) -> Datum { - let value = array.value(index); - Datum::Date(value) -} - -#[inline] -fn get_time_datum(array: &TimeArray, index: usize) -> Datum { - let value = array.value(index); - Datum::Time(value) -} - -macro_rules! impl_column { - ($Column: ident, $get_datum: expr, $get_datum_view: expr) => { - impl $Column { - /// Get datum by index. - pub fn datum_opt(&self, index: usize) -> Option { - // Do bound check. - if index >= self.0.len() { - return None; - } - - Some(self.datum(index)) - } - - pub fn datum_view_opt(&self, index: usize) -> Option { - if index >= self.0.len() { - return None; - } - - Some(self.datum_view(index)) - } - - pub fn datum_view(&self, index: usize) -> DatumView { - // If this datum is null. - if self.0.is_null(index) { - return DatumView::Null; - } - - $get_datum_view(&self.0, index) - } - - pub fn datum(&self, index: usize) -> Datum { - // If this datum is null. - if self.0.is_null(index) { - return Datum::Null; - } - - $get_datum(&self.0, index) - } - - #[inline] - pub fn num_rows(&self) -> usize { - self.0.len() - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.num_rows() == 0 - } - } - }; -} - -impl_column!(NullColumn, get_null_datum, get_null_datum_view); -impl_column!( - TimestampColumn, - get_timestamp_datum, - get_timestamp_datum_view -); -impl_column!( - VarbinaryColumn, - get_varbinary_datum, - get_varbinary_datum_view -); -impl_column!(StringColumn, get_string_datum, get_string_datum_view); - -macro_rules! impl_dedup { - ($Column: ident) => { - impl $Column { - /// If datum i is not equal to previous datum i - 1, mark `selected[i]` to - /// true. - /// - /// The first datum is marked to true. - /// - /// The size of selected must equal to the size of this column and - /// initialized to false. - #[allow(clippy::float_cmp)] - pub fn dedup(&self, selected: &mut [bool]) { - if self.0.is_empty() { - return; - } - - selected[0] = true; - for i in 1..self.0.len() { - let current = self.0.value(i); - let prev = self.0.value(i - 1); - - if current != prev { - selected[i] = true; - } - } - } - } - }; -} - -impl_dedup!(TimestampColumn); -impl_dedup!(VarbinaryColumn); -impl_dedup!(StringColumn); - -macro_rules! impl_new_null { - ($Column: ident, $Builder: ident) => { - impl $Column { - /// Create a column that all values are null. - fn new_null(num_rows: usize) -> Self { - let mut builder = $Builder::with_capacity(num_rows); - for _ in 0..num_rows { - builder.append_null(); - } - let array = builder.finish(); - - Self(array) - } - } - }; -} - -impl_new_null!(TimestampColumn, TimestampMillisecondBuilder); - -macro_rules! impl_from_array_and_slice { - ($Column: ident, $ArrayType: ident) => { - impl From<$ArrayType> for $Column { - fn from(array: $ArrayType) -> Self { - Self(array) - } - } - - impl From<&$ArrayType> for $Column { - fn from(array_ref: &$ArrayType) -> Self { - // We need to clone the [arrow::array::ArrayData], which clones - // the underlying vector of [arrow::buffer::Buffer] and Bitmap (also - // holds a Buffer), thus require some allocation. However, the Buffer is - // managed by Arc, so cloning the buffer is not too expensive. - let array_data = array_ref.into_data(); - let array = $ArrayType::from(array_data); - - Self(array) - } - } - - impl $Column { - fn to_arrow_array(&self) -> $ArrayType { - // Clone the array data. - let array_data = self.0.clone().into_data(); - $ArrayType::from(array_data) - } - - /// Returns a zero-copy slice of this array with the indicated offset and - /// length. - /// - /// Panics if offset with length is greater than column length. - fn slice(&self, offset: usize, length: usize) -> Self { - let array_slice = self.0.slice(offset, length); - // Clone the slice data. - let array_data = array_slice.into_data(); - let array = $ArrayType::from(array_data); - - Self(array) - } - } - }; -} - -impl_from_array_and_slice!(NullColumn, NullArray); -impl_from_array_and_slice!(TimestampColumn, TimestampMillisecondArray); -impl_from_array_and_slice!(VarbinaryColumn, BinaryArray); -impl_from_array_and_slice!(StringColumn, StringArray); - -macro_rules! impl_iter { - ($Column: ident, $Value: ident) => { - impl $Column { - /// Iter column values. - pub fn iter(&self) -> impl Iterator> + '_ { - self.0.iter() - } - } - }; -} - -macro_rules! impl_iter_map { - ($Column: ident, $Value: ident) => { - impl $Column { - /// Iter column values. - pub fn iter(&self) -> impl Iterator> + '_ { - self.0.iter().map(|v| v.map($Value::from)) - } - } - }; -} - -impl_iter_map!(TimestampColumn, Timestamp); - -impl VarbinaryColumn { - fn new_null(num_rows: usize) -> Self { - let mut builder = BinaryBuilder::with_capacity(num_rows, 0usize); - for _ in 0..num_rows { - builder.append_null(); - } - let array = builder.finish(); - - Self(array) - } + #[snafu(display("Internal MUB error constructing Arrow Array: {}", source))] + CreatingArrowArray { source: ArrowError }, } -impl StringColumn { - /// Create a column that all values are null. - fn new_null(num_rows: usize) -> Self { - let mut builder = StringBuilder::with_capacity(num_rows, 0usize); - for _ in 0..num_rows { - builder.append_null(); - } - let array = builder.finish(); +/// A specialized `Error` for [`Column`] errors +pub type Result = std::result::Result; - Self(array) - } +/// Stores the actual data for columns in a chunk along with summary +/// statistics +#[derive(Debug, Clone)] +pub struct Column { + pub(crate) datum_kind: DatumKind, + pub(crate) valid: BitSet, + pub(crate) data: ColumnData, } -macro_rules! impl_numeric_column { - ($(($Kind: ident, $type: ty)), *) => { - $( - paste! { - impl_column!([<$Kind Column>], [], []); - impl_from_array_and_slice!([<$Kind Column>], [<$Kind Array>]); - impl_new_null!([<$Kind Column>], [<$Kind Builder>]); - impl_iter!([<$Kind Column>], $type); - impl_dedup!([<$Kind Column>]); - } - )* - } +/// The data for a column +#[derive(Debug, Clone)] +#[allow(missing_docs)] +pub enum ColumnData { + F64(Vec), + I64(Vec), + U64(Vec), + String(Vec), + Bool(BitSet), } -impl_numeric_column!( - (Double, f64), - (Float, f32), - (UInt64, u64), - (UInt32, u32), - (UInt16, u16), - (UInt8, u8), - (Int64, i64), - (Int32, i32), - (Int16, i16), - (Int8, i8), - (Boolean, bool), - (Date, i32), - (Time, i64) -); - -macro_rules! impl_numeric_value { - ($Column: ident, $Value: ident) => { - impl $Column { - /// Get value at index. - pub fn value(&self, index: usize) -> Option<$Value> { - if self.0.is_valid(index) { - unsafe { Some(self.0.value_unchecked(index)) } - } else { - None - } - } - } - }; -} - -macro_rules! batch_impl_numeric_value { - ($(($Kind: ident, $type: ty)), *) => { - $( - paste! { - impl_numeric_value!([<$Kind Column>], $type); - } - )* - } -} - -batch_impl_numeric_value!( - (Timestamp, i64), - (Double, f64), - (Float, f32), - (UInt64, u64), - (UInt32, u32), - (UInt16, u16), - (UInt8, u8), - (Int64, i64), - (Int32, i32), - (Int16, i16), - (Int8, i8), - (Boolean, bool), - (Date, i32), - (Time, i64) -); - -impl VarbinaryColumn { - pub fn iter(&self) -> impl Iterator> + '_ { - self.0.iter() - } - - pub fn value(&self, index: usize) -> Option<&[u8]> { - if self.0.is_valid(index) { - unsafe { Some(self.0.value_unchecked(index)) } - } else { - None - } - } -} - -impl StringColumn { - pub fn iter(&self) -> impl Iterator> + '_ { - self.0.iter() - } - - pub fn value(&self, index: usize) -> Option<&str> { - if self.0.is_valid(index) { - unsafe { Some(self.0.value_unchecked(index)) } - } else { - None +impl std::fmt::Display for ColumnData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::F64(col_data) => write!(f, "F64({})", col_data.len()), + Self::I64(col_data) => write!(f, "I64({})", col_data.len()), + Self::U64(col_data) => write!(f, "U64({})", col_data.len()), + Self::String(col_data) => write!(f, "String({})", col_data.len()), + Self::Bool(col_data) => write!(f, "Bool({})", col_data.len()), } } } -macro_rules! impl_column_block { - ($($Kind: ident), *) => { - impl ColumnBlock { - pub fn datum_kind(&self) -> DatumKind { - match self { - $(ColumnBlock::$Kind(_) => DatumKind::$Kind,)* - } - } - - pub fn datum_opt(&self, index: usize) -> Option { - match self { - $(ColumnBlock::$Kind(col) => col.datum_opt(index),)* - } - } - - pub fn datum_view_opt(&self, index: usize) -> Option { - match self { - $(ColumnBlock::$Kind(col) => col.datum_view_opt(index),)* - } - } - - /// Panic if index is out fo bound. - pub fn datum_view(&self, index: usize) -> DatumView { - match self { - $(ColumnBlock::$Kind(col) => col.datum_view(index),)* - } - } - - /// Panic if index is out fo bound. - pub fn datum(&self, index: usize) -> Datum { - match self { - $(ColumnBlock::$Kind(col) => col.datum(index),)* - } - } - - pub fn num_rows(&self) -> usize { - match self { - $(ColumnBlock::$Kind(col) => col.num_rows(),)* - } - } +impl Column { + #[allow(dead_code)] + pub(crate) fn new(row_count: usize, datum_kind: DatumKind) -> Self { + let mut valid = BitSet::new(); + valid.append_unset(row_count); - pub fn to_arrow_array_ref(&self) -> ArrayRef { - match self { - $(ColumnBlock::$Kind(col) => Arc::new(col.to_arrow_array()),)* - } + let data = match datum_kind { + DatumKind::Boolean => { + let mut data = BitSet::new(); + data.append_unset(row_count); + ColumnData::Bool(data) } + DatumKind::UInt64 => ColumnData::U64(vec![0; row_count]), + DatumKind::Double => ColumnData::F64(vec![0.0; row_count]), + DatumKind::Int64 | DatumKind::Timestamp => ColumnData::I64(vec![0; row_count]), + DatumKind::String => ColumnData::String(vec!["".to_string(); row_count]), + _ => todo!(), + }; - /// If datum i is not equal to previous datum i - 1, mark `selected[i]` to true. - /// - /// The first datum is not marked to true. - pub fn dedup(&self, selected: &mut [bool]) { - match self { - $(ColumnBlock::$Kind(col) => col.dedup(selected),)* - } - } - - /// Returns a zero-copy slice of this array with the indicated offset and length. - /// - /// Panics if offset with length is greater than column length. - #[must_use] - pub fn slice(&self, offset: usize, length: usize) -> Self { - match self { - $(ColumnBlock::$Kind(col) => ColumnBlock::$Kind(col.slice(offset, length)),)* - } - } + Self { + datum_kind, + valid, + data, } - - $(paste! { - impl From<[<$Kind Column>]> for ColumnBlock { - fn from(column: [<$Kind Column>]) -> Self { - Self::$Kind(column) - } - } - })* - }; -} - -impl_column_block!( - Null, Timestamp, Double, Float, Varbinary, String, UInt64, UInt32, UInt16, UInt8, Int64, Int32, - Int16, Int8, Boolean, Date, Time -); - -// TODO(yingwen): We can add a unsafe function that don't do bound check. - -macro_rules! define_column_block { - ($($Kind: ident), *) => { - paste! { - #[derive(Debug)] - pub enum ColumnBlock { - Null(NullColumn), - $( - $Kind([<$Kind Column>]), - )* - } - - impl ColumnBlock { - pub fn try_from_arrow_array_ref(datum_kind: &DatumKind, array: &ArrayRef) -> Result { - let column = match datum_kind { - DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(array.len())), - $( - DatumKind::$Kind => { - let mills_array; - let cast_column = match array.data_type() { - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - mills_array = cast_nanosecond_to_mills(array)?; - cast_array(datum_kind, &mills_array)? - }, - _ => { - cast_array(datum_kind, array)? - } - }; - - ColumnBlock::$Kind([<$Kind Column>]::from(cast_column)) - } - )* - }; - Ok(column) - } - - pub fn new_null_with_type(kind: &DatumKind, rows: usize) -> Result { - let block = match kind { - DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(rows)), - $( - DatumKind::$Kind => ColumnBlock::$Kind([<$Kind Column>]::new_null(rows)), - )* - }; - - Ok(block) - } - } - } - } -} - -// Define column blocks, Null is defined explicitly in macro. -define_column_block!( - Timestamp, Double, Float, Varbinary, String, UInt64, UInt32, UInt16, UInt8, Int64, Int32, - Int16, Int8, Boolean, Date, Time -); - -impl ColumnBlock { - pub fn try_cast_arrow_array_ref(array: &ArrayRef) -> Result { - let datum_kind = - DatumKind::from_data_type(array.data_type()).with_context(|| UnsupportedArray { - data_type: array.data_type().clone(), - })?; - Self::try_from_arrow_array_ref(&datum_kind, array) } - pub fn new_null(rows: usize) -> Self { - Self::Null(NullColumn::new_null(rows)) + /// Returns the [`DatumKind`] of this column + pub fn datum_kine(&self) -> DatumKind { + self.datum_kind } - pub fn as_timestamp(&self) -> Option<&TimestampColumn> { - match self { - ColumnBlock::Timestamp(c) => Some(c), - _ => None, - } + /// Returns the validity bitmask of this column + pub fn valid_mask(&self) -> &BitSet { + &self.valid } -} - -// TODO: This is a temp workaround to support nanoseconds, a better way -// is to support nanoseconds natively. -// This is also required for influxql. -pub fn cast_nanosecond_to_mills(array: &ArrayRef) -> Result> { - let column = ColumnarValue::Array(array.clone()); - let mills_column = cast_column( - &column, - &DataType::Timestamp(TimeUnit::Millisecond, None), - &DEFAULT_DATAFUSION_CAST_OPTIONS, - ) - .with_context(|| CastTimestamp { - data_type: DataType::Timestamp(TimeUnit::Millisecond, None), - })?; - match mills_column { - ColumnarValue::Array(array) => Ok(array), - _ => Err(Error::NotImplemented), + /// Returns a reference to this column's data + pub fn data(&self) -> &ColumnData { + &self.data } -} -fn cast_array<'a, T: 'static>(datum_kind: &DatumKind, array: &'a ArrayRef) -> Result<&'a T> { - array - .as_any() - .downcast_ref::() - .with_context(|| InvalidArrayType { - datum_kind: *datum_kind, - data_type: array.data_type().clone(), - }) -} - -macro_rules! append_datum { - ($Kind: ident, $builder: ident, $DatumType: ident, $datum: ident) => { - match $datum { - $DatumType::Null => Ok($builder.append_null()), - $DatumType::$Kind(v) => Ok($builder.append_value(v)), - _ => ConflictType { - expect: DatumKind::$Kind, - given: $datum.kind(), - } - .fail(), + /// Ensures that the total length of this column is `len` rows, + /// padding it with trailing NULLs if necessary + #[allow(dead_code)] + pub(crate) fn push_nulls_to_len(&mut self, len: usize) { + if self.valid.len() == len { + return; } - }; -} - -macro_rules! append_datum_into { - ($Kind: ident, $builder: ident, $DatumType: ident, $datum: ident) => { - match $datum { - $DatumType::Null => Ok($builder.append_null()), - $DatumType::$Kind(v) => Ok($builder.append_value(v.into())), - _ => ConflictType { - expect: DatumKind::$Kind, - given: $datum.kind(), - } - .fail(), - } - }; -} + assert!(len > self.valid.len(), "cannot shrink column"); + let delta = len - self.valid.len(); + self.valid.append_unset(delta); -macro_rules! append_block { - ($Kind: ident, $builder: ident, $BlockType: ident, $block: ident, $start: ident, $len: ident) => { - match $block { - $BlockType::Null(v) => { - let end = std::cmp::min($start + $len, v.num_rows()); - for _ in $start..end { - $builder.append_null(); - } - Ok(()) + match &mut self.data { + ColumnData::F64(data) => { + data.resize(len, 0.); } - $BlockType::$Kind(v) => { - // There is no convenient api to copy a range of data from array to builder, so - // we still need to clone value one by one using a for loop. - let end = std::cmp::min($start + $len, v.num_rows()); - for i in $start..end { - let value_opt = v.value(i); - match value_opt { - Some(value) => { - $builder.append_value(value); - } - None => { - $builder.append_null(); - } - } - } - Ok(()) + ColumnData::I64(data) => { + data.resize(len, 0); } - _ => ConflictType { - expect: DatumKind::$Kind, - given: $block.datum_kind(), + ColumnData::U64(data) => { + data.resize(len, 0); } - .fail(), - } - }; -} - -macro_rules! define_column_block_builder { - ($(($Kind: ident, $Builder: ident)), *) => { - paste! { - #[derive(Debug)] - pub enum ColumnBlockBuilder { - Null { rows: usize }, - Timestamp(TimestampMillisecondBuilder), - Varbinary(BinaryBuilder), - String(StringBuilder), - Date(DateBuilder), - Time(TimeBuilder), - $( - $Kind($Builder), - )* + ColumnData::String(data) => { + data.resize(len, "".to_string()); } - - impl ColumnBlockBuilder { - /// Create by data type with initial capacity - pub fn with_capacity(data_type: &DatumKind, item_capacity: usize) -> Self { - match data_type { - DatumKind::Null => Self::Null { rows: 0 }, - DatumKind::Timestamp => Self::Timestamp(TimestampMillisecondBuilder::with_capacity(item_capacity)), - // The data_capacity is set as 1024, because the item is variable-size type. - DatumKind::Varbinary => Self::Varbinary(BinaryBuilder::with_capacity(item_capacity, 1024)), - DatumKind::String => Self::String(StringBuilder::with_capacity(item_capacity, 1024)), - DatumKind::Date => Self::Date(DateBuilder::with_capacity(item_capacity)), - DatumKind::Time => Self::Time(TimeBuilder::with_capacity(item_capacity)), - $( - DatumKind::$Kind => Self::$Kind($Builder::with_capacity(item_capacity)), - )* - } - } - - /// Append the datum into the builder, the datum should have same the data - /// type of builder - pub fn append(&mut self, datum: Datum) -> Result<()> { - let given = datum.kind(); - match self { - Self::Null { rows } => match datum { - Datum::Null => { - *rows += 1; - Ok(()) - } - _ => ConflictType { - expect: DatumKind::Null, - given, - } - .fail(), - }, - Self::Timestamp(builder) => append_datum_into!(Timestamp, builder, Datum, datum), - Self::Varbinary(builder) => append_datum!(Varbinary, builder, Datum, datum), - Self::String(builder) => append_datum!(String, builder, Datum, datum), - Self::Date(builder) => append_datum!(Date, builder, Datum, datum), - Self::Time(builder) => append_datum!(Time, builder, Datum, datum), - $( - Self::$Kind(builder) => append_datum!($Kind, builder, Datum, datum), - )* - } - } - - /// Append the [DatumView] into the builder, the datum view should have same the data - /// type of builder - pub fn append_view<'a>(&mut self, datum: DatumView<'a>) -> Result<()> { - let given = datum.kind(); - match self { - Self::Null { rows } => match datum { - DatumView::Null => { - *rows += 1; - Ok(()) - } - _ => ConflictType { - expect: DatumKind::Null, - given, - } - .fail(), - }, - Self::Timestamp(builder) => append_datum_into!(Timestamp, builder, DatumView, datum), - Self::Varbinary(builder) => append_datum!(Varbinary, builder, DatumView, datum), - Self::String(builder) => append_datum!(String, builder, DatumView, datum), - Self::Date(builder) => append_datum!(Date, builder, DatumView, datum), - Self::Time(builder) => append_datum!(Time, builder, DatumView, datum), - $( - Self::$Kind(builder) => append_datum!($Kind, builder, DatumView, datum), - )* - } - } - - /// Append rows in [start..start + len) from `block` to the builder. - /// - /// Returns rows actually appended. - pub fn append_block_range(&mut self, block: &ColumnBlock, start: usize, len: usize) -> Result<()> { - match self { - Self::Null { rows } => { - if start + len >= block.num_rows() { - *rows += block.num_rows() - start; - } else { - *rows += len; - } - Ok(()) - }, - Self::Timestamp(builder) => append_block!(Timestamp, builder, ColumnBlock, block, start, len), - Self::Varbinary(builder) => append_block!(Varbinary, builder, ColumnBlock, block, start, len), - Self::String(builder) => append_block!(String, builder, ColumnBlock, block, start, len), - Self::Date(builder) => append_block!(Date, builder, ColumnBlock, block, start, len), - Self::Time(builder) => append_block!(Time, builder, ColumnBlock, block, start, len), - $( - Self::$Kind(builder) => append_block!($Kind, builder, ColumnBlock, block, start, len), - )* - } - } - - pub fn len(&self) -> usize { - match &self { - Self::Null { rows } => *rows, - Self::Timestamp(builder) => builder.len(), - Self::Varbinary(builder) => builder.len(), - Self::String(builder) => builder.len(), - Self::Date(builder) => builder.len(), - Self::Time(builder) => builder.len(), - $( - Self::$Kind(builder) => builder.len(), - )* - } - } - - // Build and reset the builder. - pub fn build(&mut self) -> ColumnBlock { - match self { - Self::Null { rows } => { - let block = ColumnBlock::new_null(*rows); - *rows = 0; - block - } - Self::Timestamp(builder) => TimestampColumn::from(builder.finish()).into(), - Self::Varbinary(builder) => VarbinaryColumn::from(builder.finish()).into(), - Self::String(builder) => StringColumn::from(builder.finish()).into(), - Self::Date(builder) => DateColumn::from(builder.finish()).into(), - Self::Time(builder) => TimeColumn::from(builder.finish()).into(), - $( - Self::$Kind(builder) => [<$Kind Column>]::from(builder.finish()).into(), - )* - } - } + ColumnData::Bool(data) => { + data.append_unset(delta); } } } -} - -// Define column block builders, Null and Timestamp are defined explicitly in -// macro. -define_column_block_builder!( - (Double, DoubleBuilder), - (Float, FloatBuilder), - (UInt64, UInt64Builder), - (UInt32, UInt32Builder), - (UInt16, UInt16Builder), - (UInt8, UInt8Builder), - (Int64, Int64Builder), - (Int32, Int32Builder), - (Int16, Int16Builder), - (Int8, Int8Builder), - (Boolean, BooleanBuilder) -); -impl ColumnBlockBuilder { - /// Create by data type - pub fn new(data_type: &DatumKind) -> Self { - Self::with_capacity(data_type, 0) + /// Returns the number of rows in this column + pub fn len(&self) -> usize { + self.valid.len() } + /// Returns true if this column contains no rows pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Clear the builder by calling `build()` and drop the built result. - pub fn clear(&mut self) { - let _ = self.build(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::tests::{build_rows, build_schema}; - - #[test] - fn test_column_block_builder() { - let schema = build_schema(); - let rows = build_rows(); - // DatumKind::Varbinary - let column = schema.column(0); - let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); - - // append - builder.append(rows[0][0].clone()).unwrap(); - let ret = builder.append(rows[0][1].clone()); - assert!(ret.is_err()); - - // append_view - builder.append_view(rows[1][0].as_view()).unwrap(); - let ret = builder.append_view(rows[0][1].as_view()); - assert!(ret.is_err()); - - let column_block = builder.build(); - assert_eq!(column_block.num_rows(), 2); - let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); - - // append_block_range - builder.append_block_range(&column_block, 0, 1).unwrap(); - builder.append_block_range(&column_block, 1, 1).unwrap(); - - let column_block = builder.build(); - assert_eq!(column_block.num_rows(), 2); - assert_eq!( - column_block.datum(0), - Datum::Varbinary(Bytes::copy_from_slice(b"binary key")) - ); - assert_eq!( - column_block.datum(1), - Datum::Varbinary(Bytes::copy_from_slice(b"binary key1")) - ); + self.valid.is_empty() + } + + /// The approximate memory size of the data in the column. + /// + /// This includes the size of `self`. + pub fn size(&self) -> usize { + let data_size = match &self.data { + ColumnData::F64(v) => mem::size_of::() * v.capacity(), + ColumnData::I64(v) => mem::size_of::() * v.capacity(), + ColumnData::U64(v) => mem::size_of::() * v.capacity(), + ColumnData::Bool(v) => v.byte_len(), + ColumnData::String(v) => { + v.iter().map(|s| s.len()).sum::() + + (v.capacity() - v.len()) * mem::size_of::() + } + }; + mem::size_of::() + data_size + self.valid.byte_len() + } + + /// The approximate memory size of the data in the column, not counting for + /// stats or self or whatever extra space has been allocated for the + /// vecs + pub fn size_data(&self) -> usize { + match &self.data { + ColumnData::F64(_) => mem::size_of::() * self.len(), + ColumnData::I64(_) => mem::size_of::() * self.len(), + ColumnData::U64(_) => mem::size_of::() * self.len(), + ColumnData::Bool(_) => mem::size_of::() * self.len(), + ColumnData::String(v) => v.iter().map(|s| s.len()).sum(), + } + } + + /// Converts this column to an arrow [`ArrayRef`] + pub fn to_arrow(&self) -> Result { + let nulls = Some(NullBuffer::new(self.valid.to_arrow())); + + let data: ArrayRef = match &self.data { + ColumnData::F64(data) => { + let data = ArrayDataBuilder::new(DataType::Float64) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .nulls(nulls) + .build() + .context(CreatingArrowArray)?; + Arc::new(Float64Array::from(data)) + } + ColumnData::I64(data) => match self.datum_kind { + DatumKind::Timestamp => { + let data = ArrayDataBuilder::new(DatumKind::Timestamp.to_arrow_data_type()) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .nulls(nulls) + .build() + .context(CreatingArrowArray)?; + Arc::new(TimestampNanosecondArray::from(data)) + } + + DatumKind::Int64 => { + let data = ArrayDataBuilder::new(DataType::Int64) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .nulls(nulls) + .build() + .context(CreatingArrowArray)?; + Arc::new(Int64Array::from(data)) + } + _ => unreachable!(), + }, + ColumnData::U64(data) => { + let data = ArrayDataBuilder::new(DataType::UInt64) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .nulls(nulls) + .build() + .context(CreatingArrowArray)?; + Arc::new(UInt64Array::from(data)) + } + ColumnData::String(data) => { + let data = + StringArray::from(data.iter().map(|s| Some(s.as_str())).collect::>()); + Arc::new(data) + } + ColumnData::Bool(data) => { + let data = ArrayDataBuilder::new(DataType::Boolean) + .len(data.len()) + .add_buffer(data.to_arrow().into_inner()) + .nulls(nulls) + .build() + .context(CreatingArrowArray)?; + Arc::new(BooleanArray::from(data)) + } + }; + + assert_eq!(data.len(), self.len()); + + Ok(data) } } diff --git a/common_types/src/column_block.rs b/common_types/src/column_block.rs new file mode 100644 index 0000000000..cb661e7c5d --- /dev/null +++ b/common_types/src/column_block.rs @@ -0,0 +1,1018 @@ +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Column +use std::sync::Arc; + +use arrow::{ + array::{ + Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, + Date32Array as DateArray, Date32Builder as DateBuilder, Float32Array as FloatArray, + Float32Builder as FloatBuilder, Float64Array as DoubleArray, + Float64Builder as DoubleBuilder, Int16Array, Int16Builder, Int32Array, Int32Builder, + Int64Array, Int64Builder, Int8Array, Int8Builder, NullArray, StringArray, StringBuilder, + Time64NanosecondArray as TimeArray, Time64NanosecondBuilder as TimeBuilder, + TimestampMillisecondArray, TimestampMillisecondBuilder, UInt16Array, UInt16Builder, + UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder, + }, + datatypes::{DataType, TimeUnit}, + error::ArrowError, +}; +use datafusion::physical_plan::{ + expressions::{cast_column, DEFAULT_DATAFUSION_CAST_OPTIONS}, + ColumnarValue, +}; +use paste::paste; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; + +use crate::{ + bytes::Bytes, + datum::{Datum, DatumKind, DatumView}, + string::StringBytes, + time::Timestamp, +}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display( + "Invalid array type, datum_kind:{:?}, data_type:{:?}.\nBacktrace:\n{}", + datum_kind, + data_type, + backtrace + ))] + InvalidArrayType { + datum_kind: DatumKind, + data_type: DataType, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to append value, err:{}.\nBacktrace:\n{}", source, backtrace))] + Append { + source: ArrowError, + backtrace: Backtrace, + }, + + #[snafu(display( + "Data type conflict, expect:{:?}, given:{:?}.\nBacktrace:\n{}", + expect, + given, + backtrace + ))] + ConflictType { + expect: DatumKind, + given: DatumKind, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to convert arrow data type, data_type:{}.\nBacktrace:\n{}", + data_type, + backtrace + ))] + UnsupportedArray { + data_type: DataType, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to cast nanosecond to millisecond, data_type:{}. err:{}", + data_type, + source, + ))] + CastTimestamp { + data_type: DataType, + source: datafusion::error::DataFusionError, + }, + + #[snafu(display("Operation not yet implemented."))] + NotImplemented, +} + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct NullColumn(NullArray); + +impl NullColumn { + fn new_null(rows: usize) -> Self { + Self(NullArray::new(rows)) + } + + /// Only the first datum of NullColumn is considered not duplicated. + #[inline] + pub fn dedup(&self, selected: &mut [bool]) { + if !self.0.is_empty() { + selected[0] = true; + } + } +} + +// TODO(yingwen): Builder for columns. + +macro_rules! define_numeric_column { + ($($Kind: ident), *) => { + $(paste! { + #[derive(Debug)] + pub struct [<$Kind Column>]([<$Kind Array>]); + + #[inline] + fn [](array: &[<$Kind Array>], index: usize) -> Datum { + let value = array.value(index); + Datum::$Kind(value) + } + + #[inline] + fn [](array: &[<$Kind Array>], index: usize) -> DatumView { + let value = array.value(index); + DatumView::$Kind(value) + } + })* + } +} + +define_numeric_column!( + Float, Double, UInt64, UInt32, UInt16, UInt8, Int64, Int32, Int16, Int8, Boolean +); + +#[derive(Debug)] +pub struct TimestampColumn(TimestampMillisecondArray); + +#[derive(Debug)] +pub struct VarbinaryColumn(BinaryArray); + +#[derive(Debug)] +pub struct StringColumn(StringArray); + +#[derive(Debug)] +pub struct DateColumn(DateArray); + +#[derive(Debug)] +pub struct TimeColumn(TimeArray); + +#[inline] +fn get_null_datum_view(_array: &NullArray, _index: usize) -> DatumView { + DatumView::Null +} + +#[inline] +fn get_timestamp_datum_view(array: &TimestampMillisecondArray, index: usize) -> DatumView { + let value = array.value(index); + DatumView::Timestamp(Timestamp::new(value)) +} + +#[inline] +fn get_varbinary_datum_view(array: &BinaryArray, index: usize) -> DatumView { + let value = array.value(index); + DatumView::Varbinary(value) +} + +#[inline] +fn get_string_datum_view(array: &StringArray, index: usize) -> DatumView { + let value = array.value(index); + DatumView::String(value) +} + +#[inline] +fn get_date_datum_view(array: &DateArray, index: usize) -> DatumView { + let value = array.value(index); + DatumView::Date(value) +} + +#[inline] +fn get_time_datum_view(array: &TimeArray, index: usize) -> DatumView { + let value = array.value(index); + DatumView::Time(value) +} + +#[inline] +fn get_null_datum(_array: &NullArray, _index: usize) -> Datum { + Datum::Null +} + +#[inline] +fn get_timestamp_datum(array: &TimestampMillisecondArray, index: usize) -> Datum { + let value = array.value(index); + Datum::Timestamp(Timestamp::new(value)) +} + +// TODO(yingwen): Avoid clone of data. +// Require a clone. +#[inline] +fn get_varbinary_datum(array: &BinaryArray, index: usize) -> Datum { + let value = array.value(index); + Datum::Varbinary(Bytes::copy_from_slice(value)) +} + +// TODO(yingwen): Avoid clone of data. +// Require a clone. +#[inline] +fn get_string_datum(array: &StringArray, index: usize) -> Datum { + let value = array.value(index); + Datum::String(StringBytes::copy_from_str(value)) +} + +#[inline] +fn get_date_datum(array: &DateArray, index: usize) -> Datum { + let value = array.value(index); + Datum::Date(value) +} + +#[inline] +fn get_time_datum(array: &TimeArray, index: usize) -> Datum { + let value = array.value(index); + Datum::Time(value) +} + +macro_rules! impl_column { + ($Column: ident, $get_datum: expr, $get_datum_view: expr) => { + impl $Column { + /// Get datum by index. + pub fn datum_opt(&self, index: usize) -> Option { + // Do bound check. + if index >= self.0.len() { + return None; + } + + Some(self.datum(index)) + } + + pub fn datum_view_opt(&self, index: usize) -> Option { + if index >= self.0.len() { + return None; + } + + Some(self.datum_view(index)) + } + + pub fn datum_view(&self, index: usize) -> DatumView { + // If this datum is null. + if self.0.is_null(index) { + return DatumView::Null; + } + + $get_datum_view(&self.0, index) + } + + pub fn datum(&self, index: usize) -> Datum { + // If this datum is null. + if self.0.is_null(index) { + return Datum::Null; + } + + $get_datum(&self.0, index) + } + + #[inline] + pub fn num_rows(&self) -> usize { + self.0.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } + } + }; +} + +impl_column!(NullColumn, get_null_datum, get_null_datum_view); +impl_column!( + TimestampColumn, + get_timestamp_datum, + get_timestamp_datum_view +); +impl_column!( + VarbinaryColumn, + get_varbinary_datum, + get_varbinary_datum_view +); +impl_column!(StringColumn, get_string_datum, get_string_datum_view); + +macro_rules! impl_dedup { + ($Column: ident) => { + impl $Column { + /// If datum i is not equal to previous datum i - 1, mark `selected[i]` to + /// true. + /// + /// The first datum is marked to true. + /// + /// The size of selected must equal to the size of this column and + /// initialized to false. + #[allow(clippy::float_cmp)] + pub fn dedup(&self, selected: &mut [bool]) { + if self.0.is_empty() { + return; + } + + selected[0] = true; + for i in 1..self.0.len() { + let current = self.0.value(i); + let prev = self.0.value(i - 1); + + if current != prev { + selected[i] = true; + } + } + } + } + }; +} + +impl_dedup!(TimestampColumn); +impl_dedup!(VarbinaryColumn); +impl_dedup!(StringColumn); + +macro_rules! impl_new_null { + ($Column: ident, $Builder: ident) => { + impl $Column { + /// Create a column that all values are null. + fn new_null(num_rows: usize) -> Self { + let mut builder = $Builder::with_capacity(num_rows); + for _ in 0..num_rows { + builder.append_null(); + } + let array = builder.finish(); + + Self(array) + } + } + }; +} + +impl_new_null!(TimestampColumn, TimestampMillisecondBuilder); + +macro_rules! impl_from_array_and_slice { + ($Column: ident, $ArrayType: ident) => { + impl From<$ArrayType> for $Column { + fn from(array: $ArrayType) -> Self { + Self(array) + } + } + + impl From<&$ArrayType> for $Column { + fn from(array_ref: &$ArrayType) -> Self { + // We need to clone the [arrow::array::ArrayData], which clones + // the underlying vector of [arrow::buffer::Buffer] and Bitmap (also + // holds a Buffer), thus require some allocation. However, the Buffer is + // managed by Arc, so cloning the buffer is not too expensive. + let array_data = array_ref.into_data(); + let array = $ArrayType::from(array_data); + + Self(array) + } + } + + impl $Column { + fn to_arrow_array(&self) -> $ArrayType { + // Clone the array data. + let array_data = self.0.clone().into_data(); + $ArrayType::from(array_data) + } + + /// Returns a zero-copy slice of this array with the indicated offset and + /// length. + /// + /// Panics if offset with length is greater than column length. + fn slice(&self, offset: usize, length: usize) -> Self { + let array_slice = self.0.slice(offset, length); + // Clone the slice data. + let array_data = array_slice.into_data(); + let array = $ArrayType::from(array_data); + + Self(array) + } + } + }; +} + +impl_from_array_and_slice!(NullColumn, NullArray); +impl_from_array_and_slice!(TimestampColumn, TimestampMillisecondArray); +impl_from_array_and_slice!(VarbinaryColumn, BinaryArray); +impl_from_array_and_slice!(StringColumn, StringArray); + +macro_rules! impl_iter { + ($Column: ident, $Value: ident) => { + impl $Column { + /// Iter column values. + pub fn iter(&self) -> impl Iterator> + '_ { + self.0.iter() + } + } + }; +} + +macro_rules! impl_iter_map { + ($Column: ident, $Value: ident) => { + impl $Column { + /// Iter column values. + pub fn iter(&self) -> impl Iterator> + '_ { + self.0.iter().map(|v| v.map($Value::from)) + } + } + }; +} + +impl_iter_map!(TimestampColumn, Timestamp); + +impl VarbinaryColumn { + fn new_null(num_rows: usize) -> Self { + let mut builder = BinaryBuilder::with_capacity(num_rows, 0usize); + for _ in 0..num_rows { + builder.append_null(); + } + let array = builder.finish(); + + Self(array) + } +} + +impl StringColumn { + /// Create a column that all values are null. + fn new_null(num_rows: usize) -> Self { + let mut builder = StringBuilder::with_capacity(num_rows, 0usize); + for _ in 0..num_rows { + builder.append_null(); + } + let array = builder.finish(); + + Self(array) + } +} + +macro_rules! impl_numeric_column { + ($(($Kind: ident, $type: ty)), *) => { + $( + paste! { + impl_column!([<$Kind Column>], [], []); + impl_from_array_and_slice!([<$Kind Column>], [<$Kind Array>]); + impl_new_null!([<$Kind Column>], [<$Kind Builder>]); + impl_iter!([<$Kind Column>], $type); + impl_dedup!([<$Kind Column>]); + } + )* + } +} + +impl_numeric_column!( + (Double, f64), + (Float, f32), + (UInt64, u64), + (UInt32, u32), + (UInt16, u16), + (UInt8, u8), + (Int64, i64), + (Int32, i32), + (Int16, i16), + (Int8, i8), + (Boolean, bool), + (Date, i32), + (Time, i64) +); + +macro_rules! impl_numeric_value { + ($Column: ident, $Value: ident) => { + impl $Column { + /// Get value at index. + pub fn value(&self, index: usize) -> Option<$Value> { + if self.0.is_valid(index) { + unsafe { Some(self.0.value_unchecked(index)) } + } else { + None + } + } + } + }; +} + +macro_rules! batch_impl_numeric_value { + ($(($Kind: ident, $type: ty)), *) => { + $( + paste! { + impl_numeric_value!([<$Kind Column>], $type); + } + )* + } +} + +batch_impl_numeric_value!( + (Timestamp, i64), + (Double, f64), + (Float, f32), + (UInt64, u64), + (UInt32, u32), + (UInt16, u16), + (UInt8, u8), + (Int64, i64), + (Int32, i32), + (Int16, i16), + (Int8, i8), + (Boolean, bool), + (Date, i32), + (Time, i64) +); + +impl VarbinaryColumn { + pub fn iter(&self) -> impl Iterator> + '_ { + self.0.iter() + } + + pub fn value(&self, index: usize) -> Option<&[u8]> { + if self.0.is_valid(index) { + unsafe { Some(self.0.value_unchecked(index)) } + } else { + None + } + } +} + +impl StringColumn { + pub fn iter(&self) -> impl Iterator> + '_ { + self.0.iter() + } + + pub fn value(&self, index: usize) -> Option<&str> { + if self.0.is_valid(index) { + unsafe { Some(self.0.value_unchecked(index)) } + } else { + None + } + } +} + +macro_rules! impl_column_block { + ($($Kind: ident), *) => { + impl ColumnBlock { + pub fn datum_kind(&self) -> DatumKind { + match self { + $(ColumnBlock::$Kind(_) => DatumKind::$Kind,)* + } + } + + pub fn datum_opt(&self, index: usize) -> Option { + match self { + $(ColumnBlock::$Kind(col) => col.datum_opt(index),)* + } + } + + pub fn datum_view_opt(&self, index: usize) -> Option { + match self { + $(ColumnBlock::$Kind(col) => col.datum_view_opt(index),)* + } + } + + /// Panic if index is out fo bound. + pub fn datum_view(&self, index: usize) -> DatumView { + match self { + $(ColumnBlock::$Kind(col) => col.datum_view(index),)* + } + } + + /// Panic if index is out fo bound. + pub fn datum(&self, index: usize) -> Datum { + match self { + $(ColumnBlock::$Kind(col) => col.datum(index),)* + } + } + + pub fn num_rows(&self) -> usize { + match self { + $(ColumnBlock::$Kind(col) => col.num_rows(),)* + } + } + + pub fn to_arrow_array_ref(&self) -> ArrayRef { + match self { + $(ColumnBlock::$Kind(col) => Arc::new(col.to_arrow_array()),)* + } + } + + /// If datum i is not equal to previous datum i - 1, mark `selected[i]` to true. + /// + /// The first datum is not marked to true. + pub fn dedup(&self, selected: &mut [bool]) { + match self { + $(ColumnBlock::$Kind(col) => col.dedup(selected),)* + } + } + + /// Returns a zero-copy slice of this array with the indicated offset and length. + /// + /// Panics if offset with length is greater than column length. + #[must_use] + pub fn slice(&self, offset: usize, length: usize) -> Self { + match self { + $(ColumnBlock::$Kind(col) => ColumnBlock::$Kind(col.slice(offset, length)),)* + } + } + } + + $(paste! { + impl From<[<$Kind Column>]> for ColumnBlock { + fn from(column: [<$Kind Column>]) -> Self { + Self::$Kind(column) + } + } + })* + }; +} + +impl_column_block!( + Null, Timestamp, Double, Float, Varbinary, String, UInt64, UInt32, UInt16, UInt8, Int64, Int32, + Int16, Int8, Boolean, Date, Time +); + +// TODO(yingwen): We can add a unsafe function that don't do bound check. + +macro_rules! define_column_block { + ($($Kind: ident), *) => { + paste! { + #[derive(Debug)] + pub enum ColumnBlock { + Null(NullColumn), + $( + $Kind([<$Kind Column>]), + )* + } + + impl ColumnBlock { + pub fn try_from_arrow_array_ref(datum_kind: &DatumKind, array: &ArrayRef) -> Result { + let column = match datum_kind { + DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(array.len())), + $( + DatumKind::$Kind => { + let mills_array; + let cast_column = match array.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + mills_array = cast_nanosecond_to_mills(array)?; + cast_array(datum_kind, &mills_array)? + }, + _ => { + cast_array(datum_kind, array)? + } + }; + + ColumnBlock::$Kind([<$Kind Column>]::from(cast_column)) + } + )* + }; + Ok(column) + } + + pub fn new_null_with_type(kind: &DatumKind, rows: usize) -> Result { + let block = match kind { + DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(rows)), + $( + DatumKind::$Kind => ColumnBlock::$Kind([<$Kind Column>]::new_null(rows)), + )* + }; + + Ok(block) + } + } + } + } +} + +// Define column blocks, Null is defined explicitly in macro. +define_column_block!( + Timestamp, Double, Float, Varbinary, String, UInt64, UInt32, UInt16, UInt8, Int64, Int32, + Int16, Int8, Boolean, Date, Time +); + +impl ColumnBlock { + pub fn try_cast_arrow_array_ref(array: &ArrayRef) -> Result { + let datum_kind = + DatumKind::from_data_type(array.data_type()).with_context(|| UnsupportedArray { + data_type: array.data_type().clone(), + })?; + Self::try_from_arrow_array_ref(&datum_kind, array) + } + + pub fn new_null(rows: usize) -> Self { + Self::Null(NullColumn::new_null(rows)) + } + + pub fn as_timestamp(&self) -> Option<&TimestampColumn> { + match self { + ColumnBlock::Timestamp(c) => Some(c), + _ => None, + } + } +} + +// TODO: This is a temp workaround to support nanoseconds, a better way +// is to support nanoseconds natively. +// This is also required for influxql. +pub fn cast_nanosecond_to_mills(array: &ArrayRef) -> Result> { + let column = ColumnarValue::Array(array.clone()); + let mills_column = cast_column( + &column, + &DataType::Timestamp(TimeUnit::Millisecond, None), + &DEFAULT_DATAFUSION_CAST_OPTIONS, + ) + .with_context(|| CastTimestamp { + data_type: DataType::Timestamp(TimeUnit::Millisecond, None), + })?; + + match mills_column { + ColumnarValue::Array(array) => Ok(array), + _ => Err(Error::NotImplemented), + } +} + +fn cast_array<'a, T: 'static>(datum_kind: &DatumKind, array: &'a ArrayRef) -> Result<&'a T> { + array + .as_any() + .downcast_ref::() + .with_context(|| InvalidArrayType { + datum_kind: *datum_kind, + data_type: array.data_type().clone(), + }) +} + +macro_rules! append_datum { + ($Kind: ident, $builder: ident, $DatumType: ident, $datum: ident) => { + match $datum { + $DatumType::Null => Ok($builder.append_null()), + $DatumType::$Kind(v) => Ok($builder.append_value(v)), + _ => ConflictType { + expect: DatumKind::$Kind, + given: $datum.kind(), + } + .fail(), + } + }; +} + +macro_rules! append_datum_into { + ($Kind: ident, $builder: ident, $DatumType: ident, $datum: ident) => { + match $datum { + $DatumType::Null => Ok($builder.append_null()), + $DatumType::$Kind(v) => Ok($builder.append_value(v.into())), + _ => ConflictType { + expect: DatumKind::$Kind, + given: $datum.kind(), + } + .fail(), + } + }; +} + +macro_rules! append_block { + ($Kind: ident, $builder: ident, $BlockType: ident, $block: ident, $start: ident, $len: ident) => { + match $block { + $BlockType::Null(v) => { + let end = std::cmp::min($start + $len, v.num_rows()); + for _ in $start..end { + $builder.append_null(); + } + Ok(()) + } + $BlockType::$Kind(v) => { + // There is no convenient api to copy a range of data from array to builder, so + // we still need to clone value one by one using a for loop. + let end = std::cmp::min($start + $len, v.num_rows()); + for i in $start..end { + let value_opt = v.value(i); + match value_opt { + Some(value) => { + $builder.append_value(value); + } + None => { + $builder.append_null(); + } + } + } + Ok(()) + } + _ => ConflictType { + expect: DatumKind::$Kind, + given: $block.datum_kind(), + } + .fail(), + } + }; +} + +macro_rules! define_column_block_builder { + ($(($Kind: ident, $Builder: ident)), *) => { + paste! { + #[derive(Debug)] + pub enum ColumnBlockBuilder { + Null { rows: usize }, + Timestamp(TimestampMillisecondBuilder), + Varbinary(BinaryBuilder), + String(StringBuilder), + Date(DateBuilder), + Time(TimeBuilder), + $( + $Kind($Builder), + )* + } + + impl ColumnBlockBuilder { + /// Create by data type with initial capacity + pub fn with_capacity(data_type: &DatumKind, item_capacity: usize) -> Self { + match data_type { + DatumKind::Null => Self::Null { rows: 0 }, + DatumKind::Timestamp => Self::Timestamp(TimestampMillisecondBuilder::with_capacity(item_capacity)), + // The data_capacity is set as 1024, because the item is variable-size type. + DatumKind::Varbinary => Self::Varbinary(BinaryBuilder::with_capacity(item_capacity, 1024)), + DatumKind::String => Self::String(StringBuilder::with_capacity(item_capacity, 1024)), + DatumKind::Date => Self::Date(DateBuilder::with_capacity(item_capacity)), + DatumKind::Time => Self::Time(TimeBuilder::with_capacity(item_capacity)), + $( + DatumKind::$Kind => Self::$Kind($Builder::with_capacity(item_capacity)), + )* + } + } + + /// Append the datum into the builder, the datum should have same the data + /// type of builder + pub fn append(&mut self, datum: Datum) -> Result<()> { + let given = datum.kind(); + match self { + Self::Null { rows } => match datum { + Datum::Null => { + *rows += 1; + Ok(()) + } + _ => ConflictType { + expect: DatumKind::Null, + given, + } + .fail(), + }, + Self::Timestamp(builder) => append_datum_into!(Timestamp, builder, Datum, datum), + Self::Varbinary(builder) => append_datum!(Varbinary, builder, Datum, datum), + Self::String(builder) => append_datum!(String, builder, Datum, datum), + Self::Date(builder) => append_datum!(Date, builder, Datum, datum), + Self::Time(builder) => append_datum!(Time, builder, Datum, datum), + $( + Self::$Kind(builder) => append_datum!($Kind, builder, Datum, datum), + )* + } + } + + /// Append the [DatumView] into the builder, the datum view should have same the data + /// type of builder + pub fn append_view<'a>(&mut self, datum: DatumView<'a>) -> Result<()> { + let given = datum.kind(); + match self { + Self::Null { rows } => match datum { + DatumView::Null => { + *rows += 1; + Ok(()) + } + _ => ConflictType { + expect: DatumKind::Null, + given, + } + .fail(), + }, + Self::Timestamp(builder) => append_datum_into!(Timestamp, builder, DatumView, datum), + Self::Varbinary(builder) => append_datum!(Varbinary, builder, DatumView, datum), + Self::String(builder) => append_datum!(String, builder, DatumView, datum), + Self::Date(builder) => append_datum!(Date, builder, DatumView, datum), + Self::Time(builder) => append_datum!(Time, builder, DatumView, datum), + $( + Self::$Kind(builder) => append_datum!($Kind, builder, DatumView, datum), + )* + } + } + + /// Append rows in [start..start + len) from `block` to the builder. + /// + /// Returns rows actually appended. + pub fn append_block_range(&mut self, block: &ColumnBlock, start: usize, len: usize) -> Result<()> { + match self { + Self::Null { rows } => { + if start + len >= block.num_rows() { + *rows += block.num_rows() - start; + } else { + *rows += len; + } + Ok(()) + }, + Self::Timestamp(builder) => append_block!(Timestamp, builder, ColumnBlock, block, start, len), + Self::Varbinary(builder) => append_block!(Varbinary, builder, ColumnBlock, block, start, len), + Self::String(builder) => append_block!(String, builder, ColumnBlock, block, start, len), + Self::Date(builder) => append_block!(Date, builder, ColumnBlock, block, start, len), + Self::Time(builder) => append_block!(Time, builder, ColumnBlock, block, start, len), + $( + Self::$Kind(builder) => append_block!($Kind, builder, ColumnBlock, block, start, len), + )* + } + } + + pub fn len(&self) -> usize { + match &self { + Self::Null { rows } => *rows, + Self::Timestamp(builder) => builder.len(), + Self::Varbinary(builder) => builder.len(), + Self::String(builder) => builder.len(), + Self::Date(builder) => builder.len(), + Self::Time(builder) => builder.len(), + $( + Self::$Kind(builder) => builder.len(), + )* + } + } + + // Build and reset the builder. + pub fn build(&mut self) -> ColumnBlock { + match self { + Self::Null { rows } => { + let block = ColumnBlock::new_null(*rows); + *rows = 0; + block + } + Self::Timestamp(builder) => TimestampColumn::from(builder.finish()).into(), + Self::Varbinary(builder) => VarbinaryColumn::from(builder.finish()).into(), + Self::String(builder) => StringColumn::from(builder.finish()).into(), + Self::Date(builder) => DateColumn::from(builder.finish()).into(), + Self::Time(builder) => TimeColumn::from(builder.finish()).into(), + $( + Self::$Kind(builder) => [<$Kind Column>]::from(builder.finish()).into(), + )* + } + } + } + } + } +} + +// Define column block builders, Null and Timestamp are defined explicitly in +// macro. +define_column_block_builder!( + (Double, DoubleBuilder), + (Float, FloatBuilder), + (UInt64, UInt64Builder), + (UInt32, UInt32Builder), + (UInt16, UInt16Builder), + (UInt8, UInt8Builder), + (Int64, Int64Builder), + (Int32, Int32Builder), + (Int16, Int16Builder), + (Int8, Int8Builder), + (Boolean, BooleanBuilder) +); + +impl ColumnBlockBuilder { + /// Create by data type + pub fn new(data_type: &DatumKind) -> Self { + Self::with_capacity(data_type, 0) + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Clear the builder by calling `build()` and drop the built result. + pub fn clear(&mut self) { + let _ = self.build(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::{build_rows, build_schema}; + + #[test] + fn test_column_block_builder() { + let schema = build_schema(); + let rows = build_rows(); + // DatumKind::Varbinary + let column = schema.column(0); + let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); + + // append + builder.append(rows[0][0].clone()).unwrap(); + let ret = builder.append(rows[0][1].clone()); + assert!(ret.is_err()); + + // append_view + builder.append_view(rows[1][0].as_view()).unwrap(); + let ret = builder.append_view(rows[0][1].as_view()); + assert!(ret.is_err()); + + let column_block = builder.build(); + assert_eq!(column_block.num_rows(), 2); + let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); + + // append_block_range + builder.append_block_range(&column_block, 0, 1).unwrap(); + builder.append_block_range(&column_block, 1, 1).unwrap(); + + let column_block = builder.build(); + assert_eq!(column_block.num_rows(), 2); + assert_eq!( + column_block.datum(0), + Datum::Varbinary(Bytes::copy_from_slice(b"binary key")) + ); + assert_eq!( + column_block.datum(1), + Datum::Varbinary(Bytes::copy_from_slice(b"binary key1")) + ); + } +} diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index a796fb207c..eb2d740589 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -1,11 +1,13 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Contains common types +pub mod bitset; pub mod bytes; -#[cfg(feature = "arrow")] pub mod column; #[cfg(feature = "arrow")] +pub mod column_block; +#[cfg(feature = "arrow")] pub mod column_schema; pub mod datum; pub mod hash; diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index a7a73c9381..4cc35b691d 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -15,7 +15,7 @@ use arrow_ext::operation; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ - column::{cast_nanosecond_to_mills, ColumnBlock, ColumnBlockBuilder}, + column_block::{cast_nanosecond_to_mills, ColumnBlock, ColumnBlockBuilder}, datum::DatumKind, projected_schema::{ProjectedSchema, RowProjector}, row::{ @@ -31,7 +31,7 @@ pub enum Error { SchemaLen { backtrace: Backtrace }, #[snafu(display("Failed to create column block, err:{}", source))] - CreateColumnBlock { source: crate::column::Error }, + CreateColumnBlock { source: crate::column_block::Error }, #[snafu(display( "Failed to create arrow record batch, err:{}.\nBacktrace:\n{}", @@ -47,7 +47,7 @@ pub enum Error { IterateDatum { source: crate::row::Error }, #[snafu(display("Failed to append datum, err:{}", source))] - AppendDatum { source: crate::column::Error }, + AppendDatum { source: crate::column_block::Error }, #[snafu(display( "Column not in schema with key, column_name:{}.\nBacktrace:\n{}", diff --git a/df_operator/src/functions.rs b/df_operator/src/functions.rs index 8d918d42b5..5ce7923429 100644 --- a/df_operator/src/functions.rs +++ b/df_operator/src/functions.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Functions. @@ -8,7 +8,7 @@ use std::{ }; use arrow::datatypes::DataType; -use common_types::{column::ColumnBlock, datum::DatumKind}; +use common_types::{column_block::ColumnBlock, datum::DatumKind}; use common_util::{define_result, error::GenericError}; use datafusion::{ error::DataFusionError, @@ -31,7 +31,9 @@ const FUNC_ARG_NUM: usize = 5; #[snafu(visibility(pub(crate)))] pub enum Error { #[snafu(display("Failed to convert array to ColumnarValue, err:{}", source))] - InvalidArray { source: common_types::column::Error }, + InvalidArray { + source: common_types::column_block::Error, + }, #[snafu(display("Invalid function arguments, err:{}", source))] InvalidArguments { source: GenericError }, diff --git a/df_operator/src/udfs/time_bucket.rs b/df_operator/src/udfs/time_bucket.rs index 1ea693d954..5e2af92181 100644 --- a/df_operator/src/udfs/time_bucket.rs +++ b/df_operator/src/udfs/time_bucket.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! time_bucket UDF. @@ -6,7 +6,7 @@ use std::time::Duration; use chrono::{Datelike, FixedOffset, TimeZone}; use common_types::{ - column::{ColumnBlock, ColumnBlockBuilder, TimestampColumn}, + column_block::{ColumnBlock, ColumnBlockBuilder, TimestampColumn}, datum::{Datum, DatumKind}, time::Timestamp, }; @@ -56,7 +56,9 @@ pub enum Error { TruncateTimestamp { timestamp: i64, period: Period }, #[snafu(display("Failed to build result column, err:{}", source))] - BuildColumn { source: common_types::column::Error }, + BuildColumn { + source: common_types::column_block::Error, + }, } define_result!(Error); diff --git a/interpreters/src/insert.rs b/interpreters/src/insert.rs index 95bc47a0cb..63a9f02e21 100644 --- a/interpreters/src/insert.rs +++ b/interpreters/src/insert.rs @@ -11,7 +11,7 @@ use std::{ use arrow::{array::ArrayRef, error::ArrowError, record_batch::RecordBatch}; use async_trait::async_trait; use common_types::{ - column::{ColumnBlock, ColumnBlockBuilder}, + column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema::ColumnId, datum::Datum, hash::hash64, @@ -69,13 +69,17 @@ pub enum Error { }, #[snafu(display("Failed to convert arrow array to column block, err:{}", source))] - ConvertColumnBlock { source: common_types::column::Error }, + ConvertColumnBlock { + source: common_types::column_block::Error, + }, #[snafu(display("Failed to find input columns of expr, column_name:{}", column_name))] FindExpressionInput { column_name: String }, #[snafu(display("Failed to build column block, err:{}", source))] - BuildColumnBlock { source: common_types::column::Error }, + BuildColumnBlock { + source: common_types::column_block::Error, + }, } define_result!(Error); diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 4ef9ebeecf..7a48ab114d 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -334,7 +334,7 @@ impl RecordConverter { mod tests { use common_types::{ - column::{ColumnBlock, ColumnBlockBuilder}, + column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema, datum::{Datum, DatumKind}, row::Row, diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs index 58cba675ab..40cacefd99 100644 --- a/proxy/src/influxdb/types.rs +++ b/proxy/src/influxdb/types.rs @@ -604,7 +604,7 @@ mod tests { use arrow::datatypes::{Field as ArrowField, Fields, Schema as ArrowSchema}; use common_types::{ - column::{ColumnBlock, ColumnBlockBuilder}, + column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema, datum::DatumKind, schema, diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 4e79a291fb..67fcd3785d 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -12,7 +12,7 @@ use std::{ use async_trait::async_trait; use common_types::{ - column::{ColumnBlock, ColumnBlockBuilder}, + column_block::{ColumnBlock, ColumnBlockBuilder}, datum::{Datum, DatumKind}, record_batch::RecordBatch, row::{Row, RowGroup},