Skip to content

Commit

Permalink
feat: implement boolean columnar encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Aug 23, 2023
1 parent 080c824 commit 658884a
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 16 deletions.
7 changes: 7 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,13 @@ impl<'a> DatumView<'a> {
}
}

pub fn as_bool(&self) -> Option<bool> {
match self {
DatumView::Boolean(v) => Some(*v),
_ => None,
}
}

pub fn as_timestamp(&self) -> Option<Timestamp> {
match self {
DatumView::Timestamp(v) => Some(*v),
Expand Down
30 changes: 30 additions & 0 deletions common_types/src/row/bitset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ impl BitSet {
}
}

/// Create a u8 according to a given 8bits array.
///
/// The values in the `bits` whose index is greater than 8 will be ignored.
pub fn one_byte(bits: &[bool]) -> u8 {
let mut v = 0u8;
for (idx, set) in bits.iter().take(8).map(|v| *v as u8).enumerate() {
let (_, bit_idx) = RoBitSet::compute_byte_bit_index(idx);
v |= set << bit_idx
}

v
}

/// Initialize a [`BitSet`] with all bits set.
pub fn all_set(num_bits: usize) -> Self {
Self {
Expand Down Expand Up @@ -228,4 +241,21 @@ mod tests {
assert!(BitSet::try_from_raw(raw_bytes.clone(), 40).is_some());
assert!(BitSet::try_from_raw(raw_bytes, 1).is_some());
}

#[test]
fn test_one_byte() {
let bits = [false, false, false, false, false, false];
assert_eq!(0, BitSet::one_byte(&bits));

let bits = [true, false, false, false, false, false];
assert_eq!(1, BitSet::one_byte(&bits));

let bits = [false, false, false, true, false, false, true, true];
assert_eq!(128 + 64 + 8, BitSet::one_byte(&bits));

let bits = [
false, false, false, false, false, false, true, true, true, true,
];
assert_eq!(128 + 64, BitSet::one_byte(&bits));
}
}
247 changes: 247 additions & 0 deletions components/codec/src/columnar/bool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes_ext::{Buf, BufMut};
use common_types::row::bitset::{BitSet, RoBitSet};
use snafu::{ensure, OptionExt};

use super::{
DecodeContext, InvalidBooleanValue, InvalidCompression, Result, ValuesDecoder,
ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl,
};
use crate::columnar::{InvalidBitSetBuf, InvalidVersion, NotEnoughBytes};

/// The layout for the boolean columnar encoding:
/// ```plaintext
/// +-------------+-----------------+------------+-----------------+
/// | version(u8) | num_values(u32) | data_block | compression(u8) |
/// +-------------+-----------------+------------+-----------------+
/// ```
/// Notes:
/// - If the data_block is too long, it will be compressed as bit set.
/// - The `num_values` field is optional, and it is only needed when compression
/// is enabled.
struct Encoding;

/// The compression for [`Encoding`].
///
/// It is not allowed to be modified and only allowed to be appended with a new
/// variant.
#[derive(Clone, Copy, Default)]
#[repr(C)]
enum Compression {
#[default]
None = 0,
BitSet = 1,
}

impl Encoding {
const COMPRESSION_SIZE: usize = 1;
/// The overhead for compression is 5B, so it is not good to always enable
/// the compression.
const COMPRESS_THRESHOLD: usize = 10;
const NUM_VALUES_SIZE: usize = 4;
const VERSION: u8 = 0;
const VERSION_SIZE: usize = 1;

fn need_compress(num_values: usize) -> bool {
num_values > Self::COMPRESS_THRESHOLD
}

fn decode_compression(flag: u8) -> Result<Compression> {
let compression = match flag {
0 => Compression::None,
1 => Compression::BitSet,
_ => InvalidCompression { flag }.fail()?,
};

Ok(compression)
}

fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = bool> + Clone,
{
buf.put_u8(Self::VERSION);

let num_values = values.clone().count();
if Self::need_compress(num_values) {
Self::encode_with_compression(buf, num_values, values)
} else {
Self::encode_without_compression(buf, values)
}
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
where
I: Iterator<Item = bool>,
{
let num_values = values.count();
if Self::need_compress(num_values) {
BitSet::num_bytes(num_values)
+ Self::COMPRESSION_SIZE
+ Self::NUM_VALUES_SIZE
+ Self::VERSION_SIZE
} else {
num_values + Self::VERSION_SIZE + Self::COMPRESSION_SIZE
}
}

fn decode<B, F>(&self, buf: &mut B, f: F) -> Result<()>
where
B: Buf,
F: FnMut(bool) -> Result<()>,
{
let buf = buf.chunk();
ensure!(
buf.len() > Self::VERSION_SIZE + Self::COMPRESSION_SIZE,
NotEnoughBytes { len: buf.len() }
);

// Decode the version.
let version = buf[0];
ensure!(version == Self::VERSION, InvalidVersion { version });

// Decode the compression.
let compression_index = buf.len() - 1;
match Self::decode_compression(buf[compression_index])? {
Compression::None => Self::decode_without_compression(buf, f)?,
Compression::BitSet => Self::decode_with_compression(buf, f)?,
}

Ok(())
}

fn encode_without_compression<B, I>(buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = bool>,
{
for v in values {
buf.put_u8(v as u8);
}

buf.put_u8(Compression::None as u8);

Ok(())
}

fn decode_without_compression<F>(buf: &[u8], mut f: F) -> Result<()>
where
F: FnMut(bool) -> Result<()>,
{
let data_block_start = Self::VERSION_SIZE;
let data_block_end = buf.len() - Self::COMPRESSION_SIZE;
let data_block = &buf[data_block_start..data_block_end];
for v in data_block {
match *v {
0 => f(false),
1 => f(true),
_ => InvalidBooleanValue { value: *v }.fail(),
}?
}

Ok(())
}

fn encode_with_compression<B, I>(buf: &mut B, num_values: usize, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = bool>,
{
buf.put_u32(num_values as u32);

let mut one_byte_bits = [false; 8];
let mut offset = 0;
for v in values {
one_byte_bits[offset] = v;
offset += 1;
if offset == 8 {
let bit_set = BitSet::one_byte(&one_byte_bits);
buf.put_u8(bit_set);

// Reset the offset and the bits buf.
offset = 0;
one_byte_bits = [false; 8];
}
}

// Put the remaining bits.
if offset > 0 {
let bit_set = BitSet::one_byte(&one_byte_bits);
buf.put_u8(bit_set);
}

buf.put_u8(Compression::BitSet as u8);
Ok(())
}

fn decode_with_compression<F>(buf: &[u8], mut f: F) -> Result<()>
where
F: FnMut(bool) -> Result<()>,
{
let expected_len = Self::VERSION_SIZE + Self::NUM_VALUES_SIZE + Self::COMPRESSION_SIZE;
ensure!(buf.len() >= expected_len, NotEnoughBytes { len: buf.len() });

let num_values = {
let start = buf.len() - Self::COMPRESSION_SIZE - Self::NUM_VALUES_SIZE;
let end = buf.len() - Self::COMPRESSION_SIZE;
let mut num_buf = &buf[start..end];
num_buf.get_u32() as usize
};

let start = Self::VERSION_SIZE;
let end = buf.len() - Self::COMPRESSION_SIZE - Self::NUM_VALUES_SIZE;
let bit_set_buf = &buf[start..end];
let bit_set = RoBitSet::try_new(bit_set_buf, num_values).context(InvalidBitSetBuf)?;

for i in 0..num_values {
if bit_set.is_set(i).context(InvalidBitSetBuf)? {
f(true)
} else {
f(false)
}?
}

Ok(())
}
}

impl ValuesEncoder<bool> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = bool> + Clone,
{
Encoding.encode(buf, values)
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
where
I: Iterator<Item = bool>,
{
Encoding.estimated_encoded_size(values)
}
}

impl ValuesDecoder<bool> for ValuesDecoderImpl {
fn decode<B, F>(&self, _ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()>
where
B: Buf,
F: FnMut(bool) -> Result<()>,
{
Encoding.decode(buf, f)
}
}
16 changes: 7 additions & 9 deletions components/codec/src/columnar/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ impl Encoding {
if data_block_len > threshold {
Compression::Lz4
} else {
Compression::NoCompression
Compression::None
}
}

fn decode_compression(&self, v: u8) -> Result<Compression> {
let version = match v {
0 => Compression::NoCompression,
let compression = match v {
0 => Compression::None,
1 => Compression::Lz4,
_ => InvalidCompression { flag: v }.fail()?,
};

Ok(version)
Ok(compression)
}

fn encode<'a, B, I>(
Expand Down Expand Up @@ -92,7 +92,7 @@ impl Encoding {
// Encode the `data_block`.
let compression = Self::decide_compression(data_block_len, data_block_compress_threshold);
match compression {
Compression::NoCompression => {
Compression::None => {
for v in values {
buf.put_slice(v);
}
Expand Down Expand Up @@ -157,9 +157,7 @@ impl Encoding {
let data_block = &chunk[length_block_end..length_block_len_offset];

match compression {
Compression::NoCompression => {
self.decode_without_compression(&mut length_block, data_block, f)
}
Compression::None => self.decode_without_compression(&mut length_block, data_block, f),
Compression::Lz4 => self.decode_with_compression(length_block, data_block, ctx.buf, f),
}
}
Expand Down Expand Up @@ -227,7 +225,7 @@ impl Encoding {
#[repr(C)]
enum Compression {
#[default]
NoCompression = 0,
None = 0,
Lz4 = 1,
}

Expand Down
Loading

0 comments on commit 658884a

Please sign in to comment.