Skip to content

Commit

Permalink
feat: make bloom filter optional on column level (apache#615)
Browse files Browse the repository at this point in the history
* feat: make bloom filter optional in both row-group level and column level

* make bloom-filter row_group level required

* fix relative importing

* add UT for conversion of BloomFilter

* fix ut of conversion for bloom filter
  • Loading branch information
ShiKaiWi authored Feb 7, 2023
1 parent a7c91b4 commit 575ec98
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 39 deletions.
2 changes: 1 addition & 1 deletion analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<'a> Reader<'a> {
let filter = RowGroupFilter::try_new(
&schema,
row_groups,
bloom_filter.map(|v| v.filters()),
bloom_filter.map(|v| v.row_group_filters()),
self.predicate.exprs(),
)?;

Expand Down
161 changes: 137 additions & 24 deletions analytic_engine/src/sst/parquet/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use std::{fmt, sync::Arc};
use bytes::Bytes;
use common_types::{schema::Schema, time::TimeRange, SequenceNumber};
use common_util::define_result;
use ethbloom::Bloom;
use ethbloom::{Bloom, Input};
use proto::{common as common_pb, sst as sst_pb};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::sst::writer::MetaData;

Expand All @@ -29,6 +29,13 @@ pub enum Error {
))]
InvalidBloomFilterSize { size: usize, backtrace: Backtrace },

#[snafu(display(
"Unsupported bloom filter version, version:{}.\nBacktrace\n:{}",
version,
backtrace
))]
UnsupportedBloomFilter { version: u32, backtrace: Backtrace },

#[snafu(display("Failed to convert time range, err:{}", source))]
ConvertTimeRange { source: common_types::time::Error },

Expand All @@ -38,68 +45,119 @@ pub enum Error {

define_result!(Error);

const DEFAULT_BLOOM_FILTER_VERSION: u32 = 0;

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RowGroupBloomFilter {
// The column filter can be None if the column is not indexed.
column_filters: Vec<Option<Bloom>>,
}

impl RowGroupBloomFilter {
pub fn with_num_columns(num_columns: usize) -> Self {
Self {
column_filters: vec![None; num_columns],
}
}

/// Accrue the data belonging to one column.
///
/// Caller should ensure the `column_idx` is in the range.
pub fn accrue_column_data(&mut self, column_idx: usize, data: &[u8]) {
if self.column_filters[column_idx].is_none() {
self.column_filters[column_idx] = Some(Bloom::default());
}

let column_filter = self.column_filters[column_idx].as_mut().unwrap();
column_filter.accrue(Input::Raw(data));
}

/// Return None if the column is not indexed.
pub fn contains_column_data(&self, column_idx: usize, data: &[u8]) -> Option<bool> {
self.column_filters[column_idx].map(|v| v.contains_input(Input::Raw(data)))
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct BloomFilter {
// Two level vector means
// 1. row group
// 2. column
filters: Vec<Vec<Bloom>>,
/// Every filter is a row group filter consists of column filters.
row_group_filters: Vec<RowGroupBloomFilter>,
}

impl BloomFilter {
pub fn new(filters: Vec<Vec<Bloom>>) -> Self {
Self { filters }
pub fn new(row_group_filters: Vec<RowGroupBloomFilter>) -> Self {
Self { row_group_filters }
}

#[inline]
pub fn filters(&self) -> &[Vec<Bloom>] {
&self.filters
pub fn row_group_filters(&self) -> &[RowGroupBloomFilter] {
&self.row_group_filters
}
}

impl From<BloomFilter> for sst_pb::SstBloomFilter {
fn from(bloom_filter: BloomFilter) -> Self {
let row_group_filters = bloom_filter
.filters
.row_group_filters
.iter()
.map(|row_group_filter| {
let column_filters = row_group_filter
.column_filters
.iter()
.map(|column_filter| column_filter.data().to_vec())
.map(|column_filter| {
column_filter
.map(|v| v.data().to_vec())
// If the column filter does not exist, use an empty vector for it.
.unwrap_or_default()
})
.collect::<Vec<_>>();
sst_pb::sst_bloom_filter::RowGroupFilter { column_filters }
})
.collect::<Vec<_>>();

sst_pb::SstBloomFilter { row_group_filters }
sst_pb::SstBloomFilter {
version: DEFAULT_BLOOM_FILTER_VERSION,
row_group_filters,
}
}
}

impl TryFrom<sst_pb::SstBloomFilter> for BloomFilter {
type Error = Error;

fn try_from(src: sst_pb::SstBloomFilter) -> Result<Self> {
let filters = src
ensure!(
src.version == DEFAULT_BLOOM_FILTER_VERSION,
UnsupportedBloomFilter {
version: src.version
}
);

let row_group_filters = src
.row_group_filters
.into_iter()
.map(|row_group_filter| {
row_group_filter
let column_filters = row_group_filter
.column_filters
.into_iter()
.map(|encoded_bytes| {
let size = encoded_bytes.len();
let bs: [u8; 256] = encoded_bytes
.try_into()
.ok()
.context(InvalidBloomFilterSize { size })?;
if encoded_bytes.is_empty() {
Ok(None)
} else {
let size = encoded_bytes.len();
let bs: [u8; 256] = encoded_bytes
.try_into()
.ok()
.context(InvalidBloomFilterSize { size })?;

Ok(Bloom::from(bs))
Ok(Some(Bloom::from(bs)))
}
})
.collect::<Result<Vec<_>>>()
.collect::<Result<Vec<_>>>()?;
Ok(RowGroupBloomFilter { column_filters })
})
.collect::<Result<Vec<_>>>()?;

Ok(BloomFilter { filters })
Ok(BloomFilter { row_group_filters })
}
}

Expand Down Expand Up @@ -199,3 +257,58 @@ impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_conversion_sst_bloom_filter() {
let bloom_filter = BloomFilter {
row_group_filters: vec![
RowGroupBloomFilter {
column_filters: vec![None, Some(Bloom::default())],
},
RowGroupBloomFilter {
column_filters: vec![Some(Bloom::default()), None],
},
],
};

let sst_bloom_filter: sst_pb::SstBloomFilter = bloom_filter.clone().into();
assert_eq!(sst_bloom_filter.version, DEFAULT_BLOOM_FILTER_VERSION);
assert_eq!(sst_bloom_filter.row_group_filters.len(), 2);
assert_eq!(
sst_bloom_filter.row_group_filters[0].column_filters.len(),
2
);
assert_eq!(
sst_bloom_filter.row_group_filters[1].column_filters.len(),
2
);
assert!(sst_bloom_filter.row_group_filters[0].column_filters[0].is_empty());
assert_eq!(
sst_bloom_filter.row_group_filters[0].column_filters[1].len(),
256
);
assert_eq!(
sst_bloom_filter.row_group_filters[1].column_filters[0].len(),
256
);
assert!(sst_bloom_filter.row_group_filters[1].column_filters[1].is_empty());

let decoded_bloom_filter = BloomFilter::try_from(sst_bloom_filter).unwrap();
assert_eq!(
decoded_bloom_filter.row_group_filters.len(),
bloom_filter.row_group_filters().len(),
);
assert_eq!(
decoded_bloom_filter.row_group_filters[0].column_filters,
bloom_filter.row_group_filters()[0].column_filters
);
assert_eq!(
decoded_bloom_filter.row_group_filters[1],
bloom_filter.row_group_filters()[1],
);
}
}
17 changes: 10 additions & 7 deletions analytic_engine/src/sst/parquet/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::cmp::Ordering;
use arrow::datatypes::SchemaRef;
use common_types::datum::Datum;
use datafusion::{prelude::Expr, scalar::ScalarValue};
use ethbloom::{Bloom, Input};
use log::debug;
use parquet::file::metadata::RowGroupMetaData;
use parquet_ext::prune::{
Expand All @@ -16,7 +15,10 @@ use parquet_ext::prune::{
};
use snafu::ensure;

use crate::sst::reader::error::{OtherNoCause, Result};
use crate::sst::{
parquet::meta_data::RowGroupBloomFilter,
reader::error::{OtherNoCause, Result},
};

/// A filter to prune row groups according to the provided predicates.
///
Expand All @@ -25,15 +27,15 @@ use crate::sst::reader::error::{OtherNoCause, Result};
pub struct RowGroupFilter<'a> {
schema: &'a SchemaRef,
row_groups: &'a [RowGroupMetaData],
blooms: Option<&'a [Vec<Bloom>]>,
blooms: Option<&'a [RowGroupBloomFilter]>,
predicates: &'a [Expr],
}

impl<'a> RowGroupFilter<'a> {
pub fn try_new(
schema: &'a SchemaRef,
row_groups: &'a [RowGroupMetaData],
blooms: Option<&'a [Vec<Bloom>]>,
blooms: Option<&'a [RowGroupBloomFilter]>,
predicates: &'a [Expr],
) -> Result<Self> {
if let Some(blooms) = blooms {
Expand Down Expand Up @@ -92,12 +94,13 @@ impl<'a> RowGroupFilter<'a> {
}

/// Filter row groups according to the bloom filter.
fn filter_by_bloom(&self, blooms: &[Vec<Bloom>]) -> Vec<usize> {
fn filter_by_bloom(&self, row_group_bloom_filters: &[RowGroupBloomFilter]) -> Vec<usize> {
let is_equal =
|col_pos: ColumnPosition, val: &ScalarValue, negated: bool| -> Option<bool> {
let datum = Datum::from_scalar_value(val)?;
let col_bloom = blooms.get(col_pos.row_group_idx)?.get(col_pos.column_idx)?;
let exist = col_bloom.contains_input(Input::Raw(&datum.to_bytes()));
let exist = row_group_bloom_filters
.get(col_pos.row_group_idx)?
.contains_column_data(col_pos.column_idx, &datum.to_bytes())?;
if exist {
// bloom filter has false positivity, that is to say we are unsure whether this
// value exists even if the bloom filter says it exists.
Expand Down
11 changes: 5 additions & 6 deletions analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::sync::{
use async_trait::async_trait;
use common_types::{record_batch::RecordBatchWithKey, request_id::RequestId};
use datafusion::parquet::basic::Compression;
use ethbloom::{Bloom, Input};
use futures::StreamExt;
use log::debug;
use object_store::{ObjectStoreRef, Path};
Expand All @@ -21,7 +20,7 @@ use crate::{
factory::{ObjectStorePickerRef, SstWriteOptions},
parquet::{
encoding::ParquetEncoder,
meta_data::{BloomFilter, ParquetMetaData},
meta_data::{BloomFilter, ParquetMetaData, RowGroupBloomFilter},
},
writer::{
self, EncodeRecordBatch, MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo,
Expand Down Expand Up @@ -158,20 +157,20 @@ impl RecordBytesReader {
.partitioned_record_batch
.iter()
.map(|row_group_batch| {
let mut row_group_filters =
vec![Bloom::default(); row_group_batch[0].num_columns()];
let mut row_group_filter =
RowGroupBloomFilter::with_num_columns(row_group_batch[0].num_columns());

for partial_batch in row_group_batch {
for (col_idx, column) in partial_batch.columns().iter().enumerate() {
for row in 0..column.num_rows() {
let datum = column.datum(row);
let bytes = datum.to_bytes();
row_group_filters[col_idx].accrue(Input::Raw(&bytes));
row_group_filter.accrue_column_data(col_idx, &bytes);
}
}
}

row_group_filters
row_group_filter
})
.collect::<Vec<_>>();

Expand Down
3 changes: 2 additions & 1 deletion proto/protos/sst.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ message SstBloomFilter {
repeated bytes column_filters = 1;
};

repeated RowGroupFilter row_group_filters = 1;
uint32 version = 1;
repeated RowGroupFilter row_group_filters = 2;
}

message SstMetaData {
Expand Down

0 comments on commit 575ec98

Please sign in to comment.