diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index c8d52a03d4..1a83ffb651 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -1253,6 +1253,13 @@ impl<'a> DatumView<'a> { } } + pub fn as_bool(&self) -> Option { + match self { + DatumView::Boolean(v) => Some(*v), + _ => None, + } + } + pub fn as_timestamp(&self) -> Option { match self { DatumView::Timestamp(v) => Some(*v), diff --git a/common_types/src/row/bitset.rs b/common_types/src/row/bitset.rs index 82be35ab8a..044b0d6a48 100644 --- a/common_types/src/row/bitset.rs +++ b/common_types/src/row/bitset.rs @@ -28,6 +28,24 @@ const UNSET_BIT_MASK: [u8; 8] = [ 255 - 128, ]; +/// A bit set representing at most 8 bits with a underlying u8. +pub struct OneByteBitSet(pub u8); + +impl OneByteBitSet { + /// Create from a given boolean slice. + /// + /// The values in the `bits` whose index is greater than 8 will be ignored. + pub fn from_slice(bits: &[bool]) -> Self { + let mut v = 0u8; + for (idx, set) in bits.iter().take(8).map(|v| *v as u8).enumerate() { + let (_, bit_idx) = RoBitSet::compute_byte_bit_index(idx); + v |= set << bit_idx + } + + Self(v) + } +} + /// A basic implementation supporting read/write. #[derive(Debug, Default, Clone)] pub struct BitSet { @@ -172,6 +190,7 @@ mod tests { use std::assert_eq; use super::BitSet; + use crate::row::bitset::OneByteBitSet; #[test] fn test_set_op() { @@ -228,4 +247,21 @@ mod tests { assert!(BitSet::try_from_raw(raw_bytes.clone(), 40).is_some()); assert!(BitSet::try_from_raw(raw_bytes, 1).is_some()); } + + #[test] + fn test_one_byte() { + let bits = [false, false, false, false, false, false]; + assert_eq!(0, OneByteBitSet::from_slice(&bits).0); + + let bits = [true, false, false, false, false, false]; + assert_eq!(1, OneByteBitSet::from_slice(&bits).0); + + let bits = [false, false, false, true, false, false, true, true]; + assert_eq!(128 + 64 + 8, OneByteBitSet::from_slice(&bits).0); + + let bits = [ + false, false, false, false, false, false, true, true, true, true, + ]; + assert_eq!(128 + 64, OneByteBitSet::from_slice(&bits).0); + } } diff --git a/components/codec/src/columnar/bool.rs b/components/codec/src/columnar/bool.rs new file mode 100644 index 0000000000..77aa93f368 --- /dev/null +++ b/components/codec/src/columnar/bool.rs @@ -0,0 +1,245 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes_ext::{Buf, BufMut}; +use common_types::row::bitset::{BitSet, OneByteBitSet, RoBitSet}; +use snafu::{ensure, OptionExt}; + +use super::{ + DecodeContext, InvalidBooleanValue, InvalidCompression, Result, ValuesDecoder, + ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, +}; +use crate::columnar::{InvalidBitSetBuf, InvalidVersion, NotEnoughBytes}; + +/// The layout for the boolean columnar encoding: +/// ```plaintext +/// +-------------+-----------------+------------+-----------------+ +/// | version(u8) | num_values(u32) | data_block | compression(u8) | +/// +-------------+-----------------+------------+-----------------+ +/// ``` +/// Notes: +/// - If the data_block is too long, it will be compressed as bit set. +/// - The `num_values` field is optional, and it is only needed when compression +/// is enabled. +struct Encoding; + +/// The compression for [`Encoding`]. +/// +/// It is not allowed to be modified and only allowed to be appended with a new +/// variant. +#[derive(Clone, Copy, Default)] +#[repr(C)] +enum Compression { + #[default] + None = 0, + BitSet = 1, +} + +impl Encoding { + const COMPRESSION_SIZE: usize = 1; + /// The overhead for compression is 4B, so it is not good to always enable + /// the compression. + const COMPRESS_THRESHOLD: usize = 10; + const NUM_VALUES_SIZE: usize = 4; + const VERSION: u8 = 0; + const VERSION_SIZE: usize = 1; + + fn need_compress(num_values: usize) -> bool { + num_values > Self::COMPRESS_THRESHOLD + } + + fn decode_compression(flag: u8) -> Result { + let compression = match flag { + 0 => Compression::None, + 1 => Compression::BitSet, + _ => InvalidCompression { flag }.fail()?, + }; + + Ok(compression) + } + + fn encode(&self, buf: &mut B, values: I) -> Result<()> + where + B: BufMut, + I: Iterator + Clone, + { + buf.put_u8(Self::VERSION); + + let num_values = values.clone().count(); + if Self::need_compress(num_values) { + Self::encode_with_compression(buf, num_values, values) + } else { + Self::encode_without_compression(buf, values) + } + } + + fn estimated_encoded_size(&self, values: I) -> usize + where + I: Iterator, + { + let num_values = values.count(); + if Self::need_compress(num_values) { + BitSet::num_bytes(num_values) + + Self::COMPRESSION_SIZE + + Self::NUM_VALUES_SIZE + + Self::VERSION_SIZE + } else { + num_values + Self::VERSION_SIZE + Self::COMPRESSION_SIZE + } + } + + fn decode(&self, buf: &mut B, f: F) -> Result<()> + where + B: Buf, + F: FnMut(bool) -> Result<()>, + { + let buf = buf.chunk(); + ensure!( + buf.len() > Self::VERSION_SIZE + Self::COMPRESSION_SIZE, + NotEnoughBytes { len: buf.len() } + ); + + // Decode the version. + let version = buf[0]; + ensure!(version == Self::VERSION, InvalidVersion { version }); + + // Decode the compression. + let compression_index = buf.len() - 1; + match Self::decode_compression(buf[compression_index])? { + Compression::None => Self::decode_without_compression(buf, f)?, + Compression::BitSet => Self::decode_with_compression(buf, f)?, + } + + Ok(()) + } + + fn encode_without_compression(buf: &mut B, values: I) -> Result<()> + where + B: BufMut, + I: Iterator, + { + for v in values { + buf.put_u8(v as u8); + } + + buf.put_u8(Compression::None as u8); + + Ok(()) + } + + fn decode_without_compression(buf: &[u8], mut f: F) -> Result<()> + where + F: FnMut(bool) -> Result<()>, + { + let data_block_start = Self::VERSION_SIZE; + let data_block_end = buf.len() - Self::COMPRESSION_SIZE; + let data_block = &buf[data_block_start..data_block_end]; + for v in data_block { + match *v { + 0 => f(false), + 1 => f(true), + _ => InvalidBooleanValue { value: *v }.fail(), + }? + } + + Ok(()) + } + + fn encode_with_compression(buf: &mut B, num_values: usize, values: I) -> Result<()> + where + B: BufMut, + I: Iterator, + { + buf.put_u32(num_values as u32); + + let mut one_byte_bits = [false; 8]; + let mut offset = 0; + for v in values { + one_byte_bits[offset] = v; + offset += 1; + if offset == 8 { + let bit_set = OneByteBitSet::from_slice(&one_byte_bits); + buf.put_u8(bit_set.0); + + // Reset the offset and the bits buf. + offset = 0; + one_byte_bits = [false; 8]; + } + } + + // Put the remaining bits. + if offset > 0 { + let bit_set = OneByteBitSet::from_slice(&one_byte_bits); + buf.put_u8(bit_set.0); + } + + buf.put_u8(Compression::BitSet as u8); + Ok(()) + } + + fn decode_with_compression(buf: &[u8], mut f: F) -> Result<()> + where + F: FnMut(bool) -> Result<()>, + { + let expected_len = Self::VERSION_SIZE + Self::NUM_VALUES_SIZE + Self::COMPRESSION_SIZE; + ensure!(buf.len() >= expected_len, NotEnoughBytes { len: buf.len() }); + + let bit_set_start = Self::VERSION_SIZE + Self::NUM_VALUES_SIZE; + let num_values = { + let mut num_buf = &buf[Self::VERSION_SIZE..bit_set_start]; + num_buf.get_u32() as usize + }; + + let bit_set_end = buf.len() - Self::COMPRESSION_SIZE; + let bit_set_buf = &buf[bit_set_start..bit_set_end]; + let bit_set = RoBitSet::try_new(bit_set_buf, num_values).context(InvalidBitSetBuf)?; + + for i in 0..num_values { + if bit_set.is_set(i).context(InvalidBitSetBuf)? { + f(true) + } else { + f(false) + }? + } + + Ok(()) + } +} + +impl ValuesEncoder for ValuesEncoderImpl { + fn encode(&self, buf: &mut B, values: I) -> Result<()> + where + B: BufMut, + I: Iterator + Clone, + { + Encoding.encode(buf, values) + } + + fn estimated_encoded_size(&self, values: I) -> usize + where + I: Iterator, + { + Encoding.estimated_encoded_size(values) + } +} + +impl ValuesDecoder for ValuesDecoderImpl { + fn decode(&self, _ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()> + where + B: Buf, + F: FnMut(bool) -> Result<()>, + { + Encoding.decode(buf, f) + } +} diff --git a/components/codec/src/columnar/bytes.rs b/components/codec/src/columnar/bytes.rs index 126c38e64b..fe553c8123 100644 --- a/components/codec/src/columnar/bytes.rs +++ b/components/codec/src/columnar/bytes.rs @@ -52,18 +52,18 @@ impl Encoding { if data_block_len > threshold { Compression::Lz4 } else { - Compression::NoCompression + Compression::None } } fn decode_compression(&self, v: u8) -> Result { - let version = match v { - 0 => Compression::NoCompression, + let compression = match v { + 0 => Compression::None, 1 => Compression::Lz4, _ => InvalidCompression { flag: v }.fail()?, }; - Ok(version) + Ok(compression) } fn encode<'a, B, I>( @@ -92,7 +92,7 @@ impl Encoding { // Encode the `data_block`. let compression = Self::decide_compression(data_block_len, data_block_compress_threshold); match compression { - Compression::NoCompression => { + Compression::None => { for v in values { buf.put_slice(v); } @@ -157,9 +157,7 @@ impl Encoding { let data_block = &chunk[length_block_end..length_block_len_offset]; match compression { - Compression::NoCompression => { - self.decode_without_compression(&mut length_block, data_block, f) - } + Compression::None => self.decode_without_compression(&mut length_block, data_block, f), Compression::Lz4 => self.decode_with_compression(length_block, data_block, ctx.buf, f), } } @@ -227,7 +225,7 @@ impl Encoding { #[repr(C)] enum Compression { #[default] - NoCompression = 0, + None = 0, Lz4 = 1, } diff --git a/components/codec/src/columnar/mod.rs b/components/codec/src/columnar/mod.rs index dd0e661875..5f23a198c3 100644 --- a/components/codec/src/columnar/mod.rs +++ b/components/codec/src/columnar/mod.rs @@ -30,6 +30,7 @@ use snafu::{self, ensure, Backtrace, OptionExt, ResultExt, Snafu}; use crate::varint; +mod bool; mod bytes; mod float; mod int; @@ -44,11 +45,14 @@ pub enum Error { #[snafu(display("Invalid compression flag:{flag}.\nBacktrace:\n{backtrace}"))] InvalidCompression { flag: u8, backtrace: Backtrace }, + #[snafu(display("Invalid boolean value:{value}.\nBacktrace:\n{backtrace}"))] + InvalidBooleanValue { value: u8, backtrace: Backtrace }, + #[snafu(display("Invalid datum kind, err:{source}"))] InvalidDatumKind { source: common_types::datum::Error }, - #[snafu(display("No enough bytes to compose the nulls bit set.\nBacktrace:\n{backtrace}"))] - InvalidNullsBitSet { backtrace: Backtrace }, + #[snafu(display("No enough bytes to compose the bit set.\nBacktrace:\n{backtrace}"))] + InvalidBitSetBuf { backtrace: Backtrace }, #[snafu(display( "Datums is not enough, expect:{expect}, found:{found}.\nBacktrace:\n{backtrace}" @@ -294,7 +298,9 @@ impl ColumnarEncoder { enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i16())) } DatumKind::Int8 => enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i8())), - DatumKind::Boolean => todo!(), + DatumKind::Boolean => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_bool())) + } DatumKind::Date => todo!(), DatumKind::Time => todo!(), }; @@ -331,7 +337,7 @@ impl ColumnarEncoder { DatumKind::Int32 => enc.encode(buf, datums.filter_map(|v| v.as_i32())), DatumKind::Int16 => enc.encode(buf, datums.filter_map(|v| v.as_i16())), DatumKind::Int8 => enc.encode(buf, datums.filter_map(|v| v.as_i8())), - DatumKind::Boolean => todo!(), + DatumKind::Boolean => enc.encode(buf, datums.filter_map(|v| v.as_bool())), DatumKind::Date => todo!(), DatumKind::Time => todo!(), } @@ -388,12 +394,12 @@ impl ColumnarDecoder { datum_kind: DatumKind, ) -> Result> { let chunk = buf.chunk(); - let bit_set = RoBitSet::try_new(chunk, num_datums).context(InvalidNullsBitSet)?; + let bit_set = RoBitSet::try_new(chunk, num_datums).context(InvalidBitSetBuf)?; let mut datums = Vec::with_capacity(num_datums); let with_datum = |datum: Datum| { let idx = datums.len(); - let null = bit_set.is_unset(idx).context(InvalidNullsBitSet)?; + let null = bit_set.is_unset(idx).context(InvalidBitSetBuf)?; if null { datums.push(Datum::Null); } @@ -508,7 +514,10 @@ impl ColumnarDecoder { }; ValuesDecoderImpl.decode(ctx, buf, with_i8) } - DatumKind::Boolean => todo!(), + DatumKind::Boolean => { + let with_bool = |v: bool| f(Datum::from(v)); + ValuesDecoderImpl.decode(ctx, buf, with_bool) + } DatumKind::Date => todo!(), DatumKind::Time => todo!(), } @@ -663,6 +672,26 @@ mod tests { check_encode_end_decode(10, datums, DatumKind::String); } + #[test] + fn test_boolean() { + let datums = vec![ + Datum::from(false), + Datum::from(false), + Datum::from(true), + Datum::Null, + Datum::from(false), + ]; + + check_encode_end_decode(10, datums.clone(), DatumKind::Boolean); + + let mut massive_datums = Vec::with_capacity(10 * datums.len()); + for _ in 0..10 { + massive_datums.append(&mut datums.clone()); + } + + check_encode_end_decode(10, massive_datums, DatumKind::Boolean); + } + #[test] fn test_massive_string() { let sample_datums = vec![