Skip to content

Commit

Permalink
enable dataframe streaming across FFI
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 29, 2024
1 parent 3ac27a4 commit fcf0869
Showing 1 changed file with 8 additions and 24 deletions.
32 changes: 8 additions & 24 deletions rerun_py/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use re_chunk_store::{
ComponentColumnDescriptor, ComponentColumnSelector, QueryExpression, SparseFillStrategy,
TimeColumnDescriptor, TimeColumnSelector, VersionPolicy, ViewContentsSelector,
};
use re_dataframe::QueryEngine;
use re_dataframe::{external::re_query::StorageEngine, QueryEngine};
use re_log_types::{EntityPathFilter, ResolvedTimeRange, TimeType};
use re_sdk::{ComponentName, EntityPath, StoreId, StoreKind};

Expand Down Expand Up @@ -737,21 +737,12 @@ impl PyRecordingView {
let metadata = schema.metadata.clone().into_iter().collect();
let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata);

// TODO(jleibs): Need to keep the engine alive
/*
let reader = RecordBatchIterator::new(
query_handle
.into_batch_iter()
.map(|batch| batch.try_to_arrow_record_batch()),
std::sync::Arc::new(schema),
);
*/
let batches = query_handle
.into_batch_iter()
.map(|batch| batch.try_to_arrow_record_batch())
.collect::<Vec<_>>();

let reader = RecordBatchIterator::new(batches.into_iter(), std::sync::Arc::new(schema));

Ok(PyArrowType(Box::new(reader)))
}
Expand Down Expand Up @@ -829,21 +820,12 @@ impl PyRecordingView {
let metadata = schema.metadata.clone().into_iter().collect();
let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata);

// TODO(jleibs): Need to keep the engine alive
/*
let reader = RecordBatchIterator::new(
query_handle
.into_batch_iter()
.map(|batch| batch.try_to_arrow_record_batch()),
std::sync::Arc::new(schema),
);
*/
let batches = query_handle
.into_batch_iter()
.map(|batch| batch.try_to_arrow_record_batch())
.collect::<Vec<_>>();

let reader = RecordBatchIterator::new(batches.into_iter(), std::sync::Arc::new(schema));

Ok(PyArrowType(Box::new(reader)))
}
Expand Down Expand Up @@ -1140,11 +1122,13 @@ impl PyRecordingView {
}

impl PyRecording {
fn engine(&self) -> QueryEngine {
QueryEngine {
store: self.store.clone(),
cache: self.cache.clone(),
}
fn engine(&self) -> QueryEngine<StorageEngine> {
// Safety: this is all happening in the context of a python client using the dataframe API,
// there is no reason to worry about handle leakage whatsoever.
#[allow(unsafe_code)]
let engine = unsafe { StorageEngine::new(self.store.clone(), self.cache.clone()) };

QueryEngine { engine }
}

fn find_best_component(&self, entity_path: &EntityPath, component_name: &str) -> ComponentName {
Expand Down

0 comments on commit fcf0869

Please sign in to comment.