|
17 | 17 |
|
18 | 18 | use std::collections::HashMap; |
19 | 19 | use std::ffi::CString; |
20 | | -use std::sync::Arc; |
| 20 | +use std::sync::{Arc, Mutex}; |
21 | 21 |
|
22 | 22 | use arrow::array::{new_null_array, RecordBatch, RecordBatchIterator, RecordBatchReader}; |
23 | 23 | use arrow::compute::can_cast_types; |
@@ -284,31 +284,31 @@ impl PyParquetColumnOptions { |
284 | 284 | /// A PyDataFrame is a representation of a logical plan and an API to compose statements. |
285 | 285 | /// Use it to build a plan and `.collect()` to execute the plan and collect the result. |
286 | 286 | /// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment. |
287 | | -// TODO: Not frozen because batches don't currently handle interior mutability |
288 | | -#[pyclass(name = "DataFrame", module = "datafusion", subclass)] |
| 287 | +#[pyclass(frozen, name = "DataFrame", module = "datafusion", subclass)] |
289 | 288 | #[derive(Clone)] |
290 | 289 | pub struct PyDataFrame { |
291 | 290 | df: Arc<DataFrame>, |
292 | 291 |
|
293 | 292 | // In IPython environment cache batches between __repr__ and _repr_html_ calls. |
294 | | - batches: Option<(Vec<RecordBatch>, bool)>, |
| 293 | + #[allow(clippy::type_complexity)] // Currently only used once |
| 294 | + batches: Arc<Mutex<Option<(Vec<RecordBatch>, bool)>>>, |
295 | 295 | } |
296 | 296 |
|
297 | 297 | impl PyDataFrame { |
298 | 298 | /// creates a new PyDataFrame |
299 | 299 | pub fn new(df: DataFrame) -> Self { |
300 | 300 | Self { |
301 | 301 | df: Arc::new(df), |
302 | | - batches: None, |
| 302 | + batches: Arc::new(Mutex::new(None)), |
303 | 303 | } |
304 | 304 | } |
305 | 305 |
|
306 | | - fn prepare_repr_string(&mut self, py: Python, as_html: bool) -> PyDataFusionResult<String> { |
| 306 | + fn prepare_repr_string(&self, py: Python, as_html: bool) -> PyDataFusionResult<String> { |
307 | 307 | // Get the Python formatter and config |
308 | 308 | let PythonFormatter { formatter, config } = get_python_formatter_with_config(py)?; |
309 | 309 |
|
310 | | - let should_cache = *is_ipython_env(py) && self.batches.is_none(); |
311 | | - let (batches, has_more) = match self.batches.take() { |
| 310 | + let should_cache = *is_ipython_env(py) && self.batches.lock().unwrap().is_none(); |
| 311 | + let (batches, has_more) = match self.batches.lock().unwrap().take() { |
312 | 312 | Some(b) => b, |
313 | 313 | None => wait_for_future( |
314 | 314 | py, |
@@ -347,7 +347,7 @@ impl PyDataFrame { |
347 | 347 | let html_str: String = html_result.extract()?; |
348 | 348 |
|
349 | 349 | if should_cache { |
350 | | - self.batches = Some((batches, has_more)); |
| 350 | + *self.batches.lock().unwrap() = Some((batches, has_more)); |
351 | 351 | } |
352 | 352 |
|
353 | 353 | Ok(html_str) |
@@ -377,7 +377,7 @@ impl PyDataFrame { |
377 | 377 | } |
378 | 378 | } |
379 | 379 |
|
380 | | - fn __repr__(&mut self, py: Python) -> PyDataFusionResult<String> { |
| 380 | + fn __repr__(&self, py: Python) -> PyDataFusionResult<String> { |
381 | 381 | self.prepare_repr_string(py, false) |
382 | 382 | } |
383 | 383 |
|
@@ -412,7 +412,7 @@ impl PyDataFrame { |
412 | 412 | Ok(format!("DataFrame()\n{batches_as_displ}{additional_str}")) |
413 | 413 | } |
414 | 414 |
|
415 | | - fn _repr_html_(&mut self, py: Python) -> PyDataFusionResult<String> { |
| 415 | + fn _repr_html_(&self, py: Python) -> PyDataFusionResult<String> { |
416 | 416 | self.prepare_repr_string(py, true) |
417 | 417 | } |
418 | 418 |
|
@@ -875,7 +875,7 @@ impl PyDataFrame { |
875 | 875 |
|
876 | 876 | #[pyo3(signature = (requested_schema=None))] |
877 | 877 | fn __arrow_c_stream__<'py>( |
878 | | - &'py mut self, |
| 878 | + &'py self, |
879 | 879 | py: Python<'py>, |
880 | 880 | requested_schema: Option<Bound<'py, PyCapsule>>, |
881 | 881 | ) -> PyDataFusionResult<Bound<'py, PyCapsule>> { |
|
0 commit comments