Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support dict encoding #668

Merged
merged 5 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions proto/src/proto/rowset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ message BlockIndex {
RleNullable = 6;
RleFixedChar = 7;
RleVarchar = 8;
Dictionary = 9;
DictNullable = 10;
DictFixedChar = 11;
DictVarchar = 12;
}

// Block offset (in bytes) in the `.col` file.
Expand Down
4 changes: 4 additions & 0 deletions src/storage/secondary/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
mod blob_block_builder;
mod blob_block_iterator;
mod char_block_builder;
mod dict_block_builder;
mod dict_block_iterator;
mod fake_block_iterator;
mod primitive_block_builder;
mod primitive_block_iterator;
Expand All @@ -25,6 +27,8 @@ pub use primitive_nullable_block_builder::*;
use risinglight_proto::rowset::BlockStatistics;
mod char_block_iterator;
pub use char_block_iterator::*;
pub use dict_block_builder::*;
pub use dict_block_iterator::*;
pub use primitive_nullable_block_iterator::*;
pub use rle_block_builder::*;
pub use rle_block_iterator::*;
Expand Down
250 changes: 250 additions & 0 deletions src/storage/secondary/block/dict_block_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0.

use std::collections::HashMap;
use std::hash::Hash;

use bytes::BufMut;
use risinglight_proto::rowset::BlockStatistics;

use super::PlainPrimitiveBlockBuilder;
use crate::array::{Array, I32Array};
use crate::storage::secondary::block::{BlockBuilder, RleBlockBuilder};

pub(crate) const DICT_NULL_VALUE_KEY: i32 = i32::MIN;
/// Encodes fixed-width data into a block with dict encoding. The layout is
/// ```plain
/// | rle_block_length(u64) | dict_count_sum (u32) | rle_block | dict_block |
/// ```
pub struct DictBlockBuilder<A, B>
where
A: Array,
B: BlockBuilder<A>,
A::Item: PartialEq,
<A::Item as ToOwned>::Owned: Eq + Hash,
{
dict_map: HashMap<<A::Item as ToOwned>::Owned, i32>,
data_builder: B,
rle_builder: RleBlockBuilder<I32Array, PlainPrimitiveBlockBuilder<i32>>,
Kikkon marked this conversation as resolved.
Show resolved Hide resolved
cur_index: i32,
}

impl<A, B> DictBlockBuilder<A, B>
where
A: Array,
B: BlockBuilder<A>,
A::Item: PartialEq,
<A::Item as ToOwned>::Owned: Eq + Hash,
{
pub fn new(block_builder: B) -> Self {
let builder = PlainPrimitiveBlockBuilder::new(block_builder.estimated_size());
// create rle_builder to help record dictionary values and compress them using run-length
// encoding
let rle_builder =
RleBlockBuilder::<I32Array, PlainPrimitiveBlockBuilder<i32>>::new(builder);
Self {
dict_map: HashMap::new(),
data_builder: block_builder,
rle_builder,
cur_index: DICT_NULL_VALUE_KEY,
}
}
}

impl<A, B> BlockBuilder<A> for DictBlockBuilder<A, B>
where
A: Array,
B: BlockBuilder<A>,
A::Item: PartialEq + Eq + Hash,
<A::Item as ToOwned>::Owned: Eq + Hash,
{
fn append(&mut self, item: Option<&A::Item>) {
let mut key = DICT_NULL_VALUE_KEY;
if let Some(item) = item {
if let Some(value) = self.dict_map.get(item) {
key = value.to_owned();
} else {
self.cur_index += 1;
key = self.cur_index;
self.data_builder.append(Some(item));
self.dict_map.insert(item.to_owned(), key);
}
}
self.rle_builder.append(Some(&key));
}

fn estimated_size(&self) -> usize {
2 * std::mem::size_of::<u32>()
+ self.rle_builder.estimated_size()
+ self.data_builder.estimated_size()
}

fn get_statistics(&self) -> Vec<BlockStatistics> {
// Tracking issue: https://github.com/risinglightdb/risinglight/issues/674
vec![]
}

fn should_finish(&self, next_item: &Option<&A::Item>) -> bool {
self.data_builder.should_finish(next_item)
|| self
.rle_builder
.should_finish(&Some(&(DICT_NULL_VALUE_KEY)))
}

fn finish(self) -> Vec<u8> {
let mut encoded_data: Vec<u8> = vec![];
let rle_block = self.rle_builder.finish();
encoded_data.put_u64(rle_block.len() as u64);
encoded_data.put_u32(self.dict_map.len() as u32);
encoded_data.extend(rle_block);
encoded_data.extend(self.data_builder.finish());
encoded_data
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;

use crate::array::{I64Array, Utf8Array};
use crate::storage::secondary::block::dict_block_builder::DictBlockBuilder;
use crate::storage::secondary::block::{
BlockBuilder, PlainBlobBlockBuilder, PlainCharBlockBuilder, PlainPrimitiveBlockBuilder,
PlainPrimitiveNullableBlockBuilder,
};

#[test]
fn test_build_dict_primitive_i64() {
let builder = PlainPrimitiveBlockBuilder::<i64>::new(13);
let mut dict_builder =
DictBlockBuilder::<I64Array, PlainPrimitiveBlockBuilder<i64>>::new(builder);
for item in [Some(&(1))].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&(2))].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&(3))].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
// rle_counts_num (u32) | rle_count (u16) | rle_count | data
assert_eq!(
dict_builder.estimated_size(),
4 * 2 + (4 + 2 * 3 + 4 * 3) + 8 * 3
);
assert!(dict_builder.should_finish(&Some(&3)));
assert!(dict_builder.should_finish(&Some(&4)));
dict_builder.finish();
}

#[test]
fn test_build_dict_primitive_nullable_i64() {
let builder = PlainPrimitiveNullableBlockBuilder::new(48);
let mut dict_builder =
DictBlockBuilder::<I64Array, PlainPrimitiveNullableBlockBuilder<i64>>::new(builder);
for item in [None].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&1)].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&2)].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&3)].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&4)].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&5)]
.iter()
.cycle()
.cloned()
.take(u16::MAX as usize * 2)
{
dict_builder.append(item);
}
assert_eq!(dict_builder.estimated_size(), 4 * 2 + (64) + 8 * 5 + 1);
assert!(dict_builder.should_finish(&Some(&5)));
dict_builder.finish();
}

#[test]
fn test_build_dict_char() {
let builder = PlainCharBlockBuilder::new(120, 160);
let mut dict_builder = DictBlockBuilder::new(builder);

let width_40_char = ["2333"].iter().cycle().take(40).join("");

for item in [Some("233")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&width_40_char[..])].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some("2333")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&width_40_char[..])].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
assert_eq!(
dict_builder.estimated_size(),
4 * 2 + (4 + 4 * 2 + 4 * 4) + 4 * 40 * 3
);
assert!(dict_builder.should_finish(&Some(&width_40_char[..])));
assert!(dict_builder.should_finish(&Some("2333333")));
dict_builder.finish();
}

#[test]
fn test_build_dict_varchar() {
let builder = PlainBlobBlockBuilder::new(30);
let mut dict_builder =
DictBlockBuilder::<Utf8Array, PlainBlobBlockBuilder<str>>::new(builder);
let width_40_char = ["2333"].iter().cycle().take(40).join("");
for item in [Some("233")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some("23333")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some("2333333")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&width_40_char[..])].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some("233")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some("23333")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some("2333333")].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
for item in [Some(&width_40_char[..])].iter().cycle().cloned().take(30) {
dict_builder.append(item);
}
assert_eq!(
dict_builder.estimated_size(),
4 * 2 + (4 + 8 * 2 + 8 * 4) + 7 + 9 + 11 + 164
); // 37
assert!(dict_builder.should_finish(&Some("2333333")));
assert!(dict_builder.should_finish(&Some("23333333")));
dict_builder.finish();
}
}
Loading