Skip to content

Commit a14e12c

Browse files
committed
update pyo3 deprecate
1 parent fe56502 commit a14e12c

File tree

4 files changed

+103
-116
lines changed

4 files changed

+103
-116
lines changed

python/src/functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525

2626
/// Expression representing a column on the existing plan.
2727
#[pyfunction]
28-
#[text_signature = "(name)"]
28+
#[pyo3(text_signature = "(name)")]
2929
fn col(name: &str) -> expression::Expression {
3030
expression::Expression {
3131
expr: logical_plan::col(name),
@@ -34,7 +34,7 @@ fn col(name: &str) -> expression::Expression {
3434

3535
/// Expression representing a constant value
3636
#[pyfunction]
37-
#[text_signature = "(value)"]
37+
#[pyo3(text_signature = "(value)")]
3838
fn lit(value: i32) -> expression::Expression {
3939
expression::Expression {
4040
expr: logical_plan::lit(value),

python/src/to_py.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion::arrow::array::ArrayRef;
19+
use datafusion::arrow::record_batch::RecordBatch;
1820
use libc::uintptr_t;
1921
use pyo3::prelude::*;
22+
use pyo3::types::PyList;
2023
use pyo3::PyErr;
21-
2224
use std::convert::From;
2325

24-
use datafusion::arrow::array::ArrayRef;
25-
use datafusion::arrow::record_batch::RecordBatch;
26-
2726
use crate::errors;
2827

2928
pub fn to_py_array(array: &ArrayRef, py: Python) -> PyResult<PyObject> {
@@ -64,15 +63,13 @@ fn to_py_batch<'a>(
6463

6564
/// Converts a &[RecordBatch] into a Vec<RecordBatch> represented in PyArrow
6665
pub fn to_py(batches: &[RecordBatch]) -> PyResult<PyObject> {
67-
let gil = pyo3::Python::acquire_gil();
68-
let py = gil.python();
69-
let pyarrow = PyModule::import(py, "pyarrow")?;
70-
let builtins = PyModule::import(py, "builtins")?;
71-
72-
let mut py_batches = vec![];
73-
for batch in batches {
74-
py_batches.push(to_py_batch(batch, py, pyarrow)?);
75-
}
76-
let result = builtins.call1("list", (py_batches,))?;
77-
Ok(PyObject::from(result))
66+
Python::with_gil(|py| {
67+
let pyarrow = PyModule::import(py, "pyarrow")?;
68+
let mut py_batches = vec![];
69+
for batch in batches {
70+
py_batches.push(to_py_batch(batch, py, pyarrow)?);
71+
}
72+
let list = PyList::new(py, py_batches);
73+
Ok(PyObject::from(list))
74+
})
7875
}

python/src/udaf.rs

Lines changed: 64 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,17 @@ impl PyAccumulator {
4444

4545
impl Accumulator for PyAccumulator {
4646
fn state(&self) -> Result<Vec<datafusion::scalar::ScalarValue>> {
47-
let gil = pyo3::Python::acquire_gil();
48-
let py = gil.python();
49-
50-
let state = self
51-
.accum
52-
.as_ref(py)
53-
.call_method0("to_scalars")
54-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?
55-
.extract::<Vec<Scalar>>()
56-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
57-
58-
Ok(state.into_iter().map(|v| v.scalar).collect::<Vec<_>>())
47+
Python::with_gil(|py| {
48+
let state = self
49+
.accum
50+
.as_ref(py)
51+
.call_method0("to_scalars")
52+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?
53+
.extract::<Vec<Scalar>>()
54+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
55+
56+
Ok(state.into_iter().map(|v| v.scalar).collect::<Vec<_>>())
57+
})
5958
}
6059

6160
fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {
@@ -69,79 +68,72 @@ impl Accumulator for PyAccumulator {
6968
}
7069

7170
fn evaluate(&self) -> Result<datafusion::scalar::ScalarValue> {
72-
// get GIL
73-
let gil = pyo3::Python::acquire_gil();
74-
let py = gil.python();
75-
76-
let value = self
77-
.accum
78-
.as_ref(py)
79-
.call_method0("evaluate")
80-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
81-
82-
to_rust_scalar(value)
83-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
71+
Python::with_gil(|py| {
72+
let value = self
73+
.accum
74+
.as_ref(py)
75+
.call_method0("evaluate")
76+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
77+
78+
to_rust_scalar(value)
79+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
80+
})
8481
}
8582

8683
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
87-
// get GIL
88-
let gil = pyo3::Python::acquire_gil();
89-
let py = gil.python();
90-
91-
// 1. cast args to Pyarrow array
92-
// 2. call function
93-
94-
// 1.
95-
let py_args = values
96-
.iter()
97-
.map(|arg| {
98-
// remove unwrap
99-
to_py_array(arg, py).unwrap()
100-
})
101-
.collect::<Vec<_>>();
102-
let py_args = PyTuple::new(py, py_args);
103-
104-
// update accumulator
105-
self.accum
106-
.as_ref(py)
107-
.call_method1("update", py_args)
108-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
109-
110-
Ok(())
84+
Python::with_gil(|py| {
85+
// 1. cast args to Pyarrow array
86+
// 2. call function
87+
88+
// 1.
89+
let py_args = values
90+
.iter()
91+
.map(|arg| {
92+
// remove unwrap
93+
to_py_array(arg, py).unwrap()
94+
})
95+
.collect::<Vec<_>>();
96+
let py_args = PyTuple::new(py, py_args);
97+
98+
// update accumulator
99+
self.accum
100+
.as_ref(py)
101+
.call_method1("update", py_args)
102+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
103+
104+
Ok(())
105+
})
111106
}
112107

113108
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
114-
// get GIL
115-
let gil = pyo3::Python::acquire_gil();
116-
let py = gil.python();
117-
118-
// 1. cast states to Pyarrow array
119-
// 2. merge
120-
let state = &states[0];
121-
122-
let state = to_py_array(state, py)
123-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
124-
125-
// 2.
126-
self.accum
127-
.as_ref(py)
128-
.call_method1("merge", (state,))
129-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
130-
131-
Ok(())
109+
Python::with_gil(|py| {
110+
// 1. cast states to Pyarrow array
111+
// 2. merge
112+
let state = &states[0];
113+
114+
let state = to_py_array(state, py)
115+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
116+
117+
// 2.
118+
self.accum
119+
.as_ref(py)
120+
.call_method1("merge", (state,))
121+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
122+
123+
Ok(())
124+
})
132125
}
133126
}
134127

135128
pub fn array_udaf(
136129
accumulator: PyObject,
137130
) -> Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync> {
138131
Arc::new(move || -> Result<Box<dyn Accumulator>> {
139-
let gil = pyo3::Python::acquire_gil();
140-
let py = gil.python();
141-
142-
let accumulator = accumulator
143-
.call0(py)
144-
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
132+
let accumulator = Python::with_gil(|py| {
133+
accumulator
134+
.call0(py)
135+
.map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
136+
})?;
145137
Ok(Box::new(PyAccumulator::new(accumulator)))
146138
})
147139
}

python/src/udf.rs

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,33 +30,31 @@ use crate::to_rust::to_rust;
3030
pub fn array_udf(func: PyObject) -> ScalarFunctionImplementation {
3131
make_scalar_function(
3232
move |args: &[array::ArrayRef]| -> Result<array::ArrayRef, DataFusionError> {
33-
// get GIL
34-
let gil = pyo3::Python::acquire_gil();
35-
let py = gil.python();
36-
37-
// 1. cast args to Pyarrow arrays
38-
// 2. call function
39-
// 3. cast to arrow::array::Array
40-
41-
// 1.
42-
let py_args = args
43-
.iter()
44-
.map(|arg| {
45-
// remove unwrap
46-
to_py_array(arg, py).unwrap()
47-
})
48-
.collect::<Vec<_>>();
49-
let py_args = PyTuple::new(py, py_args);
50-
51-
// 2.
52-
let value = func.as_ref(py).call(py_args, None);
53-
let value = match value {
54-
Ok(n) => Ok(n),
55-
Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
56-
}?;
57-
58-
let array = to_rust(value).unwrap();
59-
Ok(array)
33+
Python::with_gil(|py| {
34+
// 1. cast args to Pyarrow arrays
35+
// 2. call function
36+
// 3. cast to arrow::array::Array
37+
38+
// 1.
39+
let py_args = args
40+
.iter()
41+
.map(|arg| {
42+
// remove unwrap
43+
to_py_array(arg, py).unwrap()
44+
})
45+
.collect::<Vec<_>>();
46+
let py_args = PyTuple::new(py, py_args);
47+
48+
// 2.
49+
let value = func.as_ref(py).call(py_args, None);
50+
let value = match value {
51+
Ok(n) => Ok(n),
52+
Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
53+
}?;
54+
55+
let array = to_rust(value).unwrap();
56+
Ok(array)
57+
})
6058
},
6159
)
6260
}

0 commit comments

Comments
 (0)