Skip to content

Commit

Permalink
feat: support columar encoding for datums
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Aug 17, 2023
1 parent b59e07e commit 6ec5606
Show file tree
Hide file tree
Showing 12 changed files with 681 additions and 24 deletions.
42 changes: 27 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.10"
ceresdbproto = { git = "https://github.com/ShiKaiWi/ceresdbproto.git", rev = "776dc2ee44bcc8217742268fef78a31a215235da" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ impl<'a> Writer<'a> {
// mismatch during replaying
schema: Some(schema_pb::TableSchema::from(&self.table_data.schema())),
rows: encoded_rows,
cols: vec![],
};

// Encode payload
Expand Down
89 changes: 82 additions & 7 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,11 @@ pub enum DatumView<'a> {
}

impl<'a> DatumView<'a> {
#[inline]
pub fn is_null(&self) -> bool {
matches!(self, DatumView::Null)
}

/// Return the kind of datum
pub fn kind(&self) -> DatumKind {
match self {
Expand Down Expand Up @@ -1170,13 +1175,6 @@ impl<'a> DatumView<'a> {
}
}

pub fn as_str(&self) -> Option<&str> {
match self {
DatumView::String(v) => Some(v),
_ => None,
}
}

pub fn to_datum(&self) -> Datum {
match self {
DatumView::Null => Datum::Null,
Expand All @@ -1198,6 +1196,83 @@ impl<'a> DatumView<'a> {
DatumView::Time(v) => Datum::Time(*v),
}
}

pub fn as_i8(&self) -> Option<i8> {
match self {
DatumView::Int8(v) => Some(*v),
_ => None,
}
}

pub fn as_i16(&self) -> Option<i16> {
match self {
DatumView::Int16(v) => Some(*v),
_ => None,
}
}

pub fn as_i32(&self) -> Option<i32> {
match self {
DatumView::Int32(v) => Some(*v),
_ => None,
}
}

pub fn as_i64(&self) -> Option<i64> {
match self {
DatumView::Int64(v) => Some(*v),
_ => None,
}
}

pub fn as_u8(&self) -> Option<u8> {
match self {
DatumView::UInt8(v) => Some(*v),
_ => None,
}
}

pub fn as_u16(&self) -> Option<u16> {
match self {
DatumView::UInt16(v) => Some(*v),
_ => None,
}
}

pub fn as_u32(&self) -> Option<u32> {
match self {
DatumView::UInt32(v) => Some(*v),
_ => None,
}
}

pub fn as_u64(&self) -> Option<u64> {
match self {
DatumView::UInt64(v) => Some(*v),
_ => None,
}
}

pub fn as_timestamp(&self) -> Option<Timestamp> {
match self {
DatumView::Timestamp(v) => Some(*v),
_ => None,
}
}

pub fn as_bytes(&self) -> Option<&[u8]> {
match self {
DatumView::Varbinary(v) => Some(*v),
_ => None,
}
}

pub fn as_str(&self) -> Option<&str> {
match self {
DatumView::String(v) => Some(*v),
_ => None,
}
}
}

impl<'a> std::hash::Hash for DatumView<'a> {
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
time::Timestamp,
};

pub(crate) mod bitset;
pub mod bitset;
pub mod contiguous;

#[derive(Debug, Snafu)]
Expand Down
13 changes: 13 additions & 0 deletions components/codec/src/columnar/float.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
63 changes: 63 additions & 0 deletions components/codec/src/columnar/int.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes_ext::{Buf, BufMut};

use crate::columnar::{Result, ValuesDecoder, ValuesEncoder};

pub struct I32ValuesEncoder;

impl ValuesEncoder for I32ValuesEncoder {
type ValueType = i32;

fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = i32>,
{
for v in values {
buf.put_i32(v);
}

Ok(())
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
where
I: Iterator<Item = i32>,
{
let (lower, higher) = values.size_hint();
let num = lower.max(higher.unwrap_or_default());
num * std::mem::size_of::<i32>()
}
}

pub struct I32ValuesDecoder;

impl ValuesDecoder for I32ValuesDecoder {
type ValueType = i32;

fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(i32) -> Result<()>,
{
while buf.remaining() > 0 {
let v = buf.get_i32();
f(v)?;
}

Ok(())
}
}
Loading

0 comments on commit 6ec5606

Please sign in to comment.