Skip to content

Commit

Permalink
update to use partition-aware changes from master
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 10, 2019
1 parent c2de281 commit c11962e
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

//! Parquet Data source

use std::cell::RefCell;
use std::fs::File;
use std::rc::Rc;
use std::string::String;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use arrow::array::Array;
use arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -33,7 +31,7 @@ use parquet::data_type::ByteArray;
use parquet::file::reader::*;
use parquet::schema::types::Type;

use crate::datasource::{RecordBatchIterator, Table};
use crate::datasource::{RecordBatchIterator, ScanResult, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::builder::BooleanBuilder;
use arrow::builder::Int64Builder;
Expand Down Expand Up @@ -66,10 +64,10 @@ impl Table for ParquetTable {
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
) -> Result<Rc<RefCell<RecordBatchIterator>>> {
) -> Result<Vec<ScanResult>> {
let file = File::open(self.filename.clone()).unwrap();
let parquet_file = ParquetFile::open(file, projection.clone()).unwrap();
Ok(Rc::new(RefCell::new(parquet_file)))
Ok(vec![Arc::new(Mutex::new(parquet_file))])
}
}

Expand Down Expand Up @@ -482,7 +480,7 @@ mod tests {

let projection = None;
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(11, batch.num_columns());
Expand All @@ -495,7 +493,7 @@ mod tests {

let projection = Some(vec![1]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
Expand Down Expand Up @@ -523,7 +521,7 @@ mod tests {

let projection = Some(vec![0]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
Expand All @@ -548,7 +546,7 @@ mod tests {

let projection = Some(vec![10]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
Expand All @@ -573,7 +571,7 @@ mod tests {

let projection = Some(vec![6]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
Expand Down Expand Up @@ -601,7 +599,7 @@ mod tests {

let projection = Some(vec![7]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
Expand Down Expand Up @@ -629,7 +627,7 @@ mod tests {

let projection = Some(vec![9]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
Expand Down Expand Up @@ -658,7 +656,7 @@ mod tests {

let projection = Some(vec![0]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let mut it = scan[0].lock().unwrap();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
Expand Down

0 comments on commit c11962e

Please sign in to comment.