Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,9 @@ impl PhysicalPlanner {
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options)
.with_schema_adapter_factory(
Arc::new(CometSchemaAdapterFactory::default()),
);
let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options)
.with_schema_adapter_factory(Arc::new(CometSchemaAdapterFactory::default()));

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
Expand Down
9 changes: 5 additions & 4 deletions native/core/src/execution/datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ impl SchemaAdapter for CometSchemaAdapter {

Ok((
Arc::new(SchemaMapping {
projected_table_schema: self.projected_table_schema.clone(),
projected_table_schema: Arc::<Schema>::clone(&self.projected_table_schema),
field_mappings,
table_schema: self.table_schema.clone(),
table_schema: Arc::<Schema>::clone(&self.table_schema),
}),
projection,
))
Expand Down Expand Up @@ -218,7 +218,7 @@ impl SchemaMapper for SchemaMapping {
// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = self.projected_table_schema.clone();
let schema = Arc::<Schema>::clone(&self.projected_table_schema);
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
Expand Down Expand Up @@ -259,7 +259,8 @@ impl SchemaMapper for SchemaMapping {
EvalMode::Legacy,
"UTC",
false,
)?.into_array(batch_col.len())
)?
.into_array(batch_col.len())
// and if that works, return the field and column.
.map(|new_col| (new_col, table_field.clone()))
})
Expand Down
94 changes: 50 additions & 44 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,10 @@ fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut ParquetRecordBatchReade
Ok(&mut get_batch_context(handle)?.batch_reader)
}

/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReader(
pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReader(
e: JNIEnv,
_jclass: JClass,
file_path: jstring,
Expand All @@ -646,62 +648,66 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReade
.unwrap()
.with_batch_size(8192); // TODO: (ARROW NATIVE) Use batch size configured in JVM

let num_row_groups;
let mut total_rows: i64 = 0;
//TODO: (ARROW NATIVE) if we can get the ParquetMetadata serialized, we need not do this.
let metadata = builder.metadata().clone();

let mut columns_to_read: Vec<usize> = Vec::new();
let columns_to_read_array = JObjectArray::from_raw(required_columns);
let array_len = env.get_array_length(&columns_to_read_array)?;
let mut required_columns: Vec<String> = Vec::new();
for i in 0..array_len {
let p: JString = env
.get_object_array_element(&columns_to_read_array, i)?
.into();
required_columns.push(env.get_string(&p)?.into());
}
for (i, col) in metadata
.file_metadata()
.schema_descr()
.columns()
.iter()
.enumerate()
{
for (_, required) in required_columns.iter().enumerate() {
if col.name().to_uppercase().eq(&required.to_uppercase()) {
columns_to_read.push(i);
break;
let metadata = builder.metadata();

let mut columns_to_read: Vec<usize> = Vec::new();
let columns_to_read_array = JObjectArray::from_raw(required_columns);
let array_len = env.get_array_length(&columns_to_read_array)?;
let mut required_columns: Vec<String> = Vec::new();
for i in 0..array_len {
let p: JString = env
.get_object_array_element(&columns_to_read_array, i)?
.into();
required_columns.push(env.get_string(&p)?.into());
}
for (i, col) in metadata
.file_metadata()
.schema_descr()
.columns()
.iter()
.enumerate()
{
for required in required_columns.iter() {
if col.name().to_uppercase().eq(&required.to_uppercase()) {
columns_to_read.push(i);
break;
}
}
}
}
//TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs)
let mask = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), columns_to_read);
// Set projection mask to read only root columns 1 and 2.
builder = builder.with_projection(mask);

let mut row_groups_to_read: Vec<usize> = Vec::new();
let mut total_rows: i64 = 0;
// get row groups -
for (i, rg) in metadata.row_groups().into_iter().enumerate() {
let rg_start = rg.file_offset().unwrap();
let rg_end = rg_start + rg.compressed_size();
if rg_start >= start && rg_end <= start + length {
row_groups_to_read.push(i);
total_rows += rg.num_rows();
//TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs)
let mask =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), columns_to_read);
// Set projection mask to read only root columns 1 and 2.

let mut row_groups_to_read: Vec<usize> = Vec::new();
// get row groups -
for (i, rg) in metadata.row_groups().iter().enumerate() {
let rg_start = rg.file_offset().unwrap();
let rg_end = rg_start + rg.compressed_size();
if rg_start >= start && rg_end <= start + length {
row_groups_to_read.push(i);
total_rows += rg.num_rows();
}
}
num_row_groups = row_groups_to_read.len();
builder = builder
.with_projection(mask)
.with_row_groups(row_groups_to_read.clone())
}

// Build a sync parquet reader.
let batch_reader = builder
.with_row_groups(row_groups_to_read.clone())
.build()
.unwrap();
let batch_reader = builder.build().unwrap();

let ctx = BatchContext {
batch_reader,
current_batch: None,
reader_state: ParquetReaderState::Init,
num_row_groups: row_groups_to_read.len() as i32,
total_rows: total_rows,
num_row_groups: num_row_groups as i32,
total_rows,
};
let res = Box::new(ctx);
Ok(Box::into_raw(res) as i64)
Expand Down
Loading