Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support TPC-H Q9 #761

Merged
merged 10 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix decimal scale
Signed-off-by: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
wangrunji0408 committed Jan 9, 2023
commit 927f46fd0a6aabc0fb9b0177e89604f7c2243c9d
5 changes: 5 additions & 0 deletions src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ impl DataChunk {
&self.arrays[idx]
}

/// Get the mutable reference of array by index.
pub fn array_mut_at(&mut self, idx: usize) -> &mut ArrayImpl {
&mut Arc::get_mut(&mut self.arrays).unwrap()[idx]
}

/// Get all arrays.
pub fn arrays(&self) -> &[ArrayImpl] {
&self.arrays
Expand Down
29 changes: 9 additions & 20 deletions src/array/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,30 +287,19 @@ impl ArrayImpl {
},
Self::Float64(a) => match data_type {
Type::Bool => Self::new_bool(unary_op(a.as_ref(), |&f| f != 0.0)),
Type::Int32 => Self::new_int32(try_unary_op(a.as_ref(), |&b| match b.to_i32() {
Some(d) => Ok(d),
None => Err(ConvertError::Overflow(DataValue::Float64(b), Type::Int32)),
Type::Int32 => Self::new_int32(try_unary_op(a.as_ref(), |&b| {
b.to_i32()
.ok_or(ConvertError::Overflow(DataValue::Float64(b), Type::Int32))
})?),
Type::Int64 => Self::new_int64(try_unary_op(a.as_ref(), |&b| match b.to_i64() {
Some(d) => Ok(d),
None => Err(ConvertError::Overflow(DataValue::Float64(b), Type::Int64)),
Type::Int64 => Self::new_int64(try_unary_op(a.as_ref(), |&b| {
b.to_i64()
.ok_or(ConvertError::Overflow(DataValue::Float64(b), Type::Int64))
})?),
Type::Float64 => Self::Float64(a.clone()),
Type::String => Self::new_utf8(Utf8Array::from_iter_display(a.iter())),
Type::Decimal(_, scale) => {
Self::new_decimal(try_unary_op(
a.as_ref(),
|&f| match Decimal::from_f64_retain(f.0) {
Some(mut d) => {
if let Some(s) = scale {
d.rescale(*s as u32);
}
Ok(d)
}
None => Err(ConvertError::ToDecimalError(DataValue::Float64(f))),
},
)?)
}
Type::Decimal(_, _) => Self::new_decimal(unary_op(a.as_ref(), |&f| {
Decimal::from_f64_retain(f.0).unwrap()
})),
Type::Null | Type::Date | Type::Interval | Type::Blob | Type::Struct(_) => {
return Err(ConvertError::NoCast("DOUBLE", data_type.clone()));
}
Expand Down
10 changes: 10 additions & 0 deletions src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::iter::FromIterator;
use std::mem;

use bitvec::vec::BitVec;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};

use super::ops::BitVecExt;
Expand Down Expand Up @@ -188,6 +189,15 @@ impl PrimitiveArray<bool> {
}
}

impl PrimitiveArray<Decimal> {
/// Rescale the decimals.
pub fn rescale(&mut self, scale: u8) {
for v in &mut self.data {
v.rescale(scale as u32);
}
}
}

pub fn clear_null(mut array: BoolArray) -> BoolArray {
use std::simd::ToBitMask;
let mut valid = Vec::with_capacity(array.valid.as_raw_slice().len() * 64);
Expand Down
30 changes: 18 additions & 12 deletions src/executor_v2/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::fs::File;
use std::io::BufReader;

use indicatif::{ProgressBar, ProgressStyle};
use itertools::izip;
use tokio::sync::mpsc::Sender;

use super::*;
use crate::array::DataChunkBuilder;
use crate::array::{ArrayImpl, DataChunkBuilder};
use crate::binder_v2::copy::{ExtSource, FileFormat};
use crate::types::DataTypeKind;

/// The executor of loading file data.
pub struct CopyFromFileExecutor {
Expand All @@ -23,12 +23,21 @@ const IMPORT_PROGRESS_BAR_LIMIT: u64 = 1024 * 1024;
impl CopyFromFileExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self) {
let types = self.types.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
// # Cancellation
// When this stream is dropped, the `rx` is dropped, the spawned task will fail to send to
// `tx`, then the task will finish.
let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx));
while let Some(chunk) = rx.recv().await {
while let Some(mut chunk) = rx.recv().await {
// rescale decimals
for (i, ty) in types.iter().enumerate() {
if let (ArrayImpl::Decimal(a), DataTypeKind::Decimal(_, Some(scale))) =
(chunk.array_mut_at(i), ty.kind())
{
Arc::get_mut(a).unwrap().rescale(scale);
}
}
yield chunk;
}
handle.await.unwrap()?;
Expand Down Expand Up @@ -88,18 +97,15 @@ impl CopyFromFileExecutor {
});
}

let str_row_data: Result<Vec<&str>, _> = izip!(record.iter(), &self.types)
.map(|(v, ty)| {
if !ty.nullable && v.is_empty() {
return Err(ExecutorError::NotNullable);
}
Ok(v)
})
.collect();
for (v, ty) in record.iter().zip(&self.types) {
if !ty.nullable && v.is_empty() {
return Err(ExecutorError::NotNullable);
}
}
size_count += record.as_slice().as_bytes().len();

// push a raw str row and send it if necessary
if let Some(chunk) = chunk_builder.push_str_row(str_row_data?)? {
if let Some(chunk) = chunk_builder.push_str_row(record.iter())? {
bar.set_position(size_count as u64);
tx.blocking_send(chunk).map_err(|_| ExecutorError::Abort)?;
}
Expand Down
11 changes: 11 additions & 0 deletions src/planner/rules/type_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ pub fn analyze_type(enode: &Expr, x: impl Fn(&Id) -> Type, catalog: &RootCatalog
merge(enode, [x(a)?, x(b)?], |[a, b]| {
match if a > b { (b, a) } else { (a, b) } {
(Kind::Null, _) => Some(Kind::Null),
(Kind::Decimal(Some(p1), Some(s1)), Kind::Decimal(Some(p2), Some(s2))) => {
match enode {
Add(_) | Sub(_) => Some(Kind::Decimal(
Some((p1 - s1).max(p2 - s2) + s1.max(s2) + 1),
Some(s1.max(s2)),
)),
Mul(_) => Some(Kind::Decimal(Some(p1 + p2), Some(s1 + s2))),
Div(_) | Mod(_) => Some(Kind::Decimal(None, None)),
_ => unreachable!(),
}
}
(a, b) if a.is_number() && b.is_number() => Some(b),
(Kind::Date, Kind::Interval) => Some(Kind::Date),
_ => None,
Expand Down
14 changes: 12 additions & 2 deletions src/v1/executor/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use itertools::izip;
use tokio::sync::mpsc::Sender;

use super::*;
use crate::array::DataChunkBuilder;
use crate::array::{ArrayImpl, DataChunkBuilder};
use crate::types::DataTypeKind;
use crate::v1::binder::FileFormat;
use crate::v1::optimizer::plan_nodes::PhysicalCopyFromFile;

Expand All @@ -23,12 +24,21 @@ const IMPORT_PROGRESS_BAR_LIMIT: u64 = 1024 * 1024;
impl CopyFromFileExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self) {
let types = self.plan.logical().column_types().to_vec();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
// # Cancellation
// When this stream is dropped, the `rx` is dropped, the spawned task will fail to send to
// `tx`, then the task will finish.
let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx));
while let Some(chunk) = rx.recv().await {
while let Some(mut chunk) = rx.recv().await {
// rescale decimals
for (i, ty) in types.iter().enumerate() {
if let (ArrayImpl::Decimal(a), DataTypeKind::Decimal(_, Some(scale))) =
(chunk.array_mut_at(i), ty.kind())
{
Arc::get_mut(a).unwrap().rescale(scale);
}
}
yield chunk;
}
handle.await.unwrap()?;
Expand Down
8 changes: 4 additions & 4 deletions tests/sql/tpch/_q1.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ order by
l_returnflag,
l_linestatus;
----
A F 37474 37569624.64 35676192.0970 37101416.222424 25.354533152909336941813261164 25419.231826792963464140730717 0.0508660351826792963464140731 1478
N F 1041 1041301.07 999060.8980 1036450.802280 27.394736842105263157894736842 27402.659736842105263157894737 0.0428947368421052631578947368 38
N O 75823 76040604.76 72270477.1588 75140545.284463 25.564059339177343223196223871 25637.425745111260957518543493 0.0498246797033041132838840189 2966
R F 36511 36570841.24 34738472.8758 36169060.112193 25.059025394646533973919011668 25100.096938915579958819492107 0.0500274536719286204529855868 1457
A F 37474.00 37569624.64 35676192.0970 37101416.222424 25.354533152909336941813261164 25419.231826792963464140730717 0.0508660351826792963464140731 1478
N F 1041.00 1041301.07 999060.8980 1036450.802280 27.394736842105263157894736842 27402.659736842105263157894737 0.0428947368421052631578947368 38
N O 75823.00 76040604.76 72270477.1588 75140545.284463 25.564059339177343223196223871 25637.425745111260957518543493 0.0498246797033041132838840189 2966
R F 36511.00 36570841.24 34738472.8758 36169060.112193 25.059025394646533973919011668 25100.096938915579958819492107 0.0500274536719286204529855868 1457