Skip to content

Commit

Permalink
rewrite array_append/array_prepend to remove deplicate codes (#8108)
Browse files Browse the repository at this point in the history
* rewrite `array_append/array_prepend` to remove deplicate codes

Signed-off-by: veeupup <code@tanweime.com>

* reimplemented array_append with MutableArrayData

Signed-off-by: veeupup <code@tanweime.com>

* reimplemented array_prepend with MutableArrayData

Signed-off-by: veeupup <code@tanweime.com>

---------

Signed-off-by: veeupup <code@tanweime.com>
  • Loading branch information
Veeupup authored Nov 10, 2023
1 parent e727bbf commit 4e8777d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 124 deletions.
20 changes: 8 additions & 12 deletions datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,12 +1536,10 @@ mod test {
.unwrap()
.resolve(&schema)
.unwrap();
let r4 = apache_avro::to_value(serde_json::json!({
"col1": null
}))
.unwrap()
.resolve(&schema)
.unwrap();
let r4 = apache_avro::to_value(serde_json::json!({ "col1": null }))
.unwrap()
.resolve(&schema)
.unwrap();

let mut w = apache_avro::Writer::new(&schema, vec![]);
w.append(r1).unwrap();
Expand Down Expand Up @@ -1600,12 +1598,10 @@ mod test {
}"#,
)
.unwrap();
let r1 = apache_avro::to_value(serde_json::json!({
"col1": null
}))
.unwrap()
.resolve(&schema)
.unwrap();
let r1 = apache_avro::to_value(serde_json::json!({ "col1": null }))
.unwrap()
.resolve(&schema)
.unwrap();
let r2 = apache_avro::to_value(serde_json::json!({
"col1": {
"col2": "hello"
Expand Down
190 changes: 78 additions & 112 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,58 +577,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
)
}

macro_rules! append {
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
let mut offsets: Vec<i32> = vec![0];
let mut values =
downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone();

let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
for (arr, el) in $ARRAY.iter().zip(element.iter()) {
let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
DataFusionError::Internal(format!("offsets should not be empty"))
})?;
match arr {
Some(arr) => {
let child_array = downcast_arg!(arr, $ARRAY_TYPE);
values = downcast_arg!(
compute::concat(&[
&values,
child_array,
&$ARRAY_TYPE::from(vec![el])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + child_array.len() as i32 + 1i32);
}
None => {
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el.clone()])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + 1i32);
}
}
}

let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true));

Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)?)
}};
}

/// Array_append SQL function
pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
let arr = as_list_array(&args[0])?;
Expand All @@ -639,68 +587,51 @@ pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
append!(arr, element, $ARRAY_TYPE)
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(0, 0, original_data.len());
mutable.extend(1, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}
call_array_function!(data_type, false)
}
};

Ok(res)
}

macro_rules! prepend {
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
let mut offsets: Vec<i32> = vec![0];
let mut values =
downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone();
let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
for (arr, el) in $ARRAY.iter().zip(element.iter()) {
let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
DataFusionError::Internal(format!("offsets should not be empty"))
})?;
match arr {
Some(arr) => {
let child_array = downcast_arg!(arr, $ARRAY_TYPE);
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el]),
child_array
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + child_array.len() as i32 + 1i32);
}
None => {
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el.clone()])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + 1i32);
}
}
Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
}
};

let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true));

Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)?)
}};
Ok(res)
}

/// Array_prepend SQL function
Expand All @@ -713,12 +644,47 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
prepend!(arr, element, $ARRAY_TYPE)
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(1, row_index, row_index + 1);
mutable.extend(0, 0, original_data.len());
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}
call_array_function!(data_type, false)

let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
}
};

Expand Down

0 comments on commit 4e8777d

Please sign in to comment.