Skip to content
Merged
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
dcfc189
add round robin shuffle
Dec 10, 2024
e333366
update roundrobin
Dec 10, 2024
6b48a2c
update roundrobin
Dec 10, 2024
689b9c6
update roundrobin
Dec 10, 2024
f0d8737
update roundrobin
Dec 10, 2024
bef0c92
update roundrobin log info
Dec 11, 2024
d44d24a
update roundrobin
Dec 11, 2024
67f994a
update roundrobin
Dec 11, 2024
3bb4dc8
update roundrobin
Dec 12, 2024
cd22bf6
update roundrobin
Dec 12, 2024
de626b4
update roundrobin
Dec 12, 2024
b062173
update roundrobin
Dec 12, 2024
b9af073
update partition proto
Dec 13, 2024
b83338f
update partition proto
Dec 13, 2024
59b0eda
update sort before round robin
Dec 16, 2024
f452a5b
update sort before round robin
Dec 16, 2024
180802f
update round robin
Dec 16, 2024
4763172
update round robin
Dec 16, 2024
bfae9a8
update sort_batch_by_partition_id test
Dec 16, 2024
7ff18e1
update sort_batch_by_partition_id test
Dec 16, 2024
01108e3
update range partition
Dec 25, 2024
dc9bc6e
merge master
Dec 25, 2024
d34f901
merge master
Dec 25, 2024
e7c6a23
update range partition
Dec 26, 2024
8bd4e06
Merge branch 'master' into blaze-repartitioning
Dec 26, 2024
d802c6c
update range partition
Dec 26, 2024
f155fcd
Merge branch 'master' into blaze-repartitioning
Dec 26, 2024
5508034
fix range partition import
Dec 26, 2024
d8a598e
fix merge
Dec 26, 2024
d53d1b3
update range partition
Dec 30, 2024
e37547f
update range partition
Dec 30, 2024
5a942c1
update range partition converter
Dec 30, 2024
7e59a18
update range partition
Dec 31, 2024
b7d6baf
update range partition
Jan 2, 2025
2a56d42
Merge branch 'master' into blaze-repartitioning
Jan 2, 2025
5a00013
fix name
Jan 8, 2025
40c6ff6
cargo fix
Jan 8, 2025
2e9851f
merge master
Jan 8, 2025
1e911d9
fix proto deserialization issue
Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 51 additions & 43 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,53 +1168,61 @@ pub fn parse_protobuf_partitioning(
}

RepartitionType::RangeRepartition(range_part) => {
let sort = range_part.sort_expr.clone().unwrap();
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap();
if range_part.partition_count == 1 {
Ok(Some(Partitioning::SinglePartitioning()))
} else {
let sort = range_part.sort_expr.clone().unwrap();
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap();

let value_list = &range_part.list_value;

let value_list = &range_part.list_value;
let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new(
exprs
.iter()
.map(|expr: &PhysicalSortExpr| {
Ok(SortField::new_with_options(
expr.expr.data_type(&input.schema())?,
expr.options,
))
})
.collect::<Result<Vec<SortField>>>()?,
)?));

let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new(
exprs
let bound_cols: Vec<ArrayRef> = value_list
.iter()
.map(|expr: &PhysicalSortExpr| {
Ok(SortField::new_with_options(
expr.expr.data_type(&input.schema())?,
expr.options,
))
.map(|x| {
let xx = x.clone().value.unwrap();
let values_ref = match xx {
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
datatype: _opt_scalar_type,
} = scalar_list;
let value_vec: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| {
proto_error("partition::from_proto() error")
})?;
ScalarValue::iter_to_array(value_vec)
.map_err(|_| proto_error("partition::from_proto() error"))
}
_ => Err(proto_error(
"partition::from_proto() bound_list type error",
)),
};
values_ref
})
.collect::<Result<Vec<SortField>>>()?,
)?));

let bound_cols: Vec<ArrayRef> = value_list
.iter()
.map(|x| {
let xx = x.clone().value.unwrap();
let values_ref = match xx {
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
datatype: _opt_scalar_type,
} = scalar_list;
let value_vec: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| proto_error("partition::from_proto() error"))?;
ScalarValue::iter_to_array(value_vec)
.map_err(|_| proto_error("partition::from_proto() error"))
}
_ => Err(proto_error("partition::from_proto() bound_list type error")),
};
values_ref
})
.collect::<Result<Vec<ArrayRef>, _>>()?;

let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
exprs,
range_part.partition_count.try_into().unwrap(),
Arc::new(bound_rows),
)))
.collect::<Result<Vec<ArrayRef>, _>>()?;

let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
exprs,
range_part.partition_count.try_into().unwrap(),
Arc::new(bound_rows),
)))
}
}
}
})
Expand Down