From eed2461a3bfce3b155e778653aab4bc84c50395f Mon Sep 17 00:00:00 2001 From: desmondcheongzx Date: Thu, 10 Oct 2024 17:36:34 -0700 Subject: [PATCH] Implement hive partitioned reads --- Cargo.lock | 13 +++ Cargo.toml | 29 +++--- daft/daft/__init__.pyi | 1 + daft/io/_csv.py | 3 + daft/io/_json.py | 3 + daft/io/_parquet.py | 3 + daft/io/common.py | 2 + src/daft-hive/Cargo.toml | 15 +++ src/daft-hive/src/lib.rs | 96 +++++++++++++++++++ src/daft-plan/src/builder.rs | 86 +++++++++-------- src/daft-scan/Cargo.toml | 1 + src/daft-scan/src/glob.rs | 86 ++++++++++++----- src/daft-scan/src/lib.rs | 60 +++++------- src/daft-scan/src/python.rs | 3 + .../src/table_provider/read_parquet.rs | 2 + 15 files changed, 289 insertions(+), 114 deletions(-) create mode 100644 src/daft-hive/Cargo.toml create mode 100644 src/daft-hive/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 0dc52e53bd..1b97beeafe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1856,6 +1856,18 @@ dependencies = [ "typetag", ] +[[package]] +name = "daft-hive" +version = "0.3.0-dev0" +dependencies = [ + "arrow2", + "common-error", + "daft-core", + "daft-decoding", + "daft-schema", + "daft-table", +] + [[package]] name = "daft-image" version = "0.3.0-dev0" @@ -2101,6 +2113,7 @@ dependencies = [ "daft-core", "daft-csv", "daft-dsl", + "daft-hive", "daft-io", "daft-json", "daft-parquet", diff --git a/Cargo.toml b/Cargo.toml index dde3543e70..8eff035275 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,32 +112,33 @@ tikv-jemallocator = {version = "0.5.4", features = [ [workspace] members = [ "src/arrow2", - "src/parquet2", + "src/common/daft-config", "src/common/display", "src/common/error", "src/common/io-config", - "src/common/treenode", - "src/common/daft-config", "src/common/system-info", + "src/common/treenode", "src/daft-core", - "src/daft-local-execution", - "src/daft-io", - "src/daft-image", - "src/daft-parquet", "src/daft-csv", - "src/daft-json", "src/daft-dsl", - "src/daft-table", - "src/daft-plan", - "src/daft-physical-plan", + "src/daft-functions", + "src/daft-functions-json", + "src/daft-hive", + "src/daft-image", + "src/daft-io", + "src/daft-json", + "src/daft-local-execution", "src/daft-micropartition", + "src/daft-parquet", + "src/daft-physical-plan", + "src/daft-plan", "src/daft-scan", "src/daft-scheduler", "src/daft-sketch", - "src/daft-functions", - "src/daft-functions-json", "src/daft-sql", - "src/hyperloglog" + "src/daft-table", + "src/hyperloglog", + "src/parquet2" ] [workspace.dependencies] diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 798632d51f..966e3f934e 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -780,6 +780,7 @@ class ScanOperatorHandle: glob_path: list[str], file_format_config: FileFormatConfig, storage_config: StorageConfig, + hive_partitioning: bool, infer_schema: bool, schema: PySchema | None = None, file_path_column: str | None = None, diff --git a/daft/io/_csv.py b/daft/io/_csv.py index 2ef2227ed8..855f027d4f 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -31,6 +31,7 @@ def read_csv( allow_variable_columns: bool = False, io_config: Optional["IOConfig"] = None, file_path_column: Optional[str] = None, + hive_partitioning: bool = False, use_native_downloader: bool = True, schema_hints: Optional[Dict[str, DataType]] = None, _buffer_size: Optional[int] = None, @@ -56,6 +57,7 @@ def read_csv( allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns io_config (IOConfig): Config to be used with the native downloader file_path_column: Include the source path(s) as a column with this name. Defaults to None. + hive_partitioning: Whether to use hive-style partitioning when reading glob files. Defaults to False. use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. @@ -100,5 +102,6 @@ def read_csv( file_format_config=file_format_config, storage_config=storage_config, file_path_column=file_path_column, + hive_partitioning=hive_partitioning, ) return DataFrame(builder) diff --git a/daft/io/_json.py b/daft/io/_json.py index 5dd3e8b3a5..a7064e0dcc 100644 --- a/daft/io/_json.py +++ b/daft/io/_json.py @@ -24,6 +24,7 @@ def read_json( schema: Optional[Dict[str, DataType]] = None, io_config: Optional["IOConfig"] = None, file_path_column: Optional[str] = None, + hive_partitioning: bool = False, use_native_downloader: bool = True, schema_hints: Optional[Dict[str, DataType]] = None, _buffer_size: Optional[int] = None, @@ -43,6 +44,7 @@ def read_json( schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred. io_config (IOConfig): Config to be used with the native downloader file_path_column: Include the source path(s) as a column with this name. Defaults to None. + hive_partitioning: Whether to use hive-style partitioning when reading glob files. Defaults to False. use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. @@ -77,5 +79,6 @@ def read_json( file_format_config=file_format_config, storage_config=storage_config, file_path_column=file_path_column, + hive_partitioning=hive_partitioning, ) return DataFrame(builder) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index 3bb8d3730f..dcb0389a11 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -25,6 +25,7 @@ def read_parquet( schema: Optional[Dict[str, DataType]] = None, io_config: Optional["IOConfig"] = None, file_path_column: Optional[str] = None, + hive_partitioning: bool = False, use_native_downloader: bool = True, coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None, schema_hints: Optional[Dict[str, DataType]] = None, @@ -47,6 +48,7 @@ def read_parquet( schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred. io_config (IOConfig): Config to be used with the native downloader file_path_column: Include the source path(s) as a column with this name. Defaults to None. + hive_partitioning: Whether to use hive-style partitioning when reading glob files. Defaults to False. use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None. _multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing @@ -96,5 +98,6 @@ def read_parquet( file_format_config=file_format_config, storage_config=storage_config, file_path_column=file_path_column, + hive_partitioning=hive_partitioning, ) return DataFrame(builder) diff --git a/daft/io/common.py b/daft/io/common.py index 981042004f..d32f600dce 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -24,6 +24,7 @@ def get_tabular_files_scan( file_format_config: FileFormatConfig, storage_config: StorageConfig, file_path_column: str | None = None, + hive_partitioning: bool = False, ) -> LogicalPlanBuilder: """Returns a TabularFilesScan LogicalPlan for a given glob filepath.""" # Glob the path using the Runner @@ -42,6 +43,7 @@ def get_tabular_files_scan( infer_schema=infer_schema, schema=_get_schema_from_dict(schema)._schema if schema is not None else None, file_path_column=file_path_column, + hive_partitioning=hive_partitioning, ) builder = LogicalPlanBuilder.from_tabular_scan( diff --git a/src/daft-hive/Cargo.toml b/src/daft-hive/Cargo.toml new file mode 100644 index 0000000000..ea2193b5d6 --- /dev/null +++ b/src/daft-hive/Cargo.toml @@ -0,0 +1,15 @@ +[dependencies] +arrow2 = {workspace = true, features = ["io_parquet", "io_parquet_compression"]} +common-error = {path = "../common/error", default-features = false} +daft-core = {path = "../daft-core", default-features = false} +daft-decoding = {path = "../daft-decoding", default-features = false} +daft-schema = {path = "../daft-schema", default-features = false} +daft-table = {path = "../daft-table", default-features = false} + +[lints] +workspace = true + +[package] +edition = {workspace = true} +name = "daft-hive" +version = {workspace = true} diff --git a/src/daft-hive/src/lib.rs b/src/daft-hive/src/lib.rs new file mode 100644 index 0000000000..fd66739d5f --- /dev/null +++ b/src/daft-hive/src/lib.rs @@ -0,0 +1,96 @@ +use std::collections::HashMap; + +use common_error::DaftResult; +use daft_core::{datatypes::Utf8Array, series::IntoSeries}; +use daft_decoding::inference::infer; +use daft_schema::{dtype::DaftDataType, field::Field, schema::Schema}; +use daft_table::Table; + +/// Parses hive-style /key=value/ components from a uri. +pub fn parse_hive_partitioning(uri: &str) -> HashMap<&str, &str> { + let mut equality_pos = 0; + let mut partition_start = 0; + let mut valid_partition = true; + let mut partitions = HashMap::new(); + // Loops through the uri looking for valid partitions. Although we consume partitions only when + // encountering a slash separator, we never need to grab a partition key-value pair from the end + // of the uri because uri's are expected to end in either a filename, or in GET parameters. + for (idx, c) in uri.char_indices() { + match c { + // A '?' char denotes the start of GET parameters, so stop parsing hive partitions. + // We also ban '\n' for hive partitions, given all the edge cases that can arise. + '?' | '\n' => break, + // A '=' char denotes that we've finished reading the partition's key, and we're now + // reading the partition's value. + '=' => { + // If we see more than one '=' in the partition, it is not a valid partition. + if equality_pos > partition_start { + valid_partition = false; + } + equality_pos = idx; + } + // A separator char denotes the start of a new partition. + '\\' | '/' => { + if valid_partition && equality_pos > partition_start { + let key = &uri[partition_start..equality_pos]; + let value = &uri[equality_pos + 1..idx]; + partitions.insert(key, value); + } + partition_start = idx + 1; + valid_partition = true; + } + _ => (), + } + } + partitions +} + +/// Takes hive partition key-value pairs as `partitions`, and the schema of the containing table as +/// `table_schema`, and returns a 1-dimensional table containing the partition keys as columns, and +/// their partition values as the singular row of values. +pub fn hive_partitions_to_1d_table( + partitions: &HashMap<&str, &str>, + table_schema: &Schema, +) -> DaftResult { + // Take the partition keys and values, and convert them into a 1D table with UTF8 fields. + let uncasted_fields = partitions + .keys() + .map(|&key| Field::new(key, daft_schema::dtype::DataType::Utf8)) + .collect(); + let uncasted_schema = Schema::new(uncasted_fields)?; + let uncasted_series = partitions + .iter() + .map(|(&key, &value)| { + let arrow_array = arrow2::array::Utf8Array::from_iter_values(std::iter::once(&value)); + let daft_array = Utf8Array::from((key, Box::new(arrow_array))); + daft_array.into_series() + }) + .collect::>(); + let uncast_table = Table::new_unchecked(uncasted_schema, uncasted_series, /*num_rows=*/ 1); + + // Using the given table schema, get the expected schema for the partition fields, and cast the + // table of UTF8 partition values accordingly into their expected types. + let partition_fields = table_schema + .fields + .clone() + .into_iter() + .map(|(_, field)| field) + .filter(|field| partitions.contains_key(&field.name.as_str())) + .collect(); + let partition_schema = Schema::new(partition_fields)?; + // TODO(desmond): There's probably a better way to do this, instead of creating a UTF8 table + // then casting it. We should be able to create this table directly. + let casted_table = uncast_table.cast_to_schema(&partition_schema)?; + Ok(casted_table) +} + +/// Turns hive partition key-value pairs into a schema with the partitions' keys as field names, and +/// inferring field types from the partitions' values. We don't do schema type inference here as the +/// user is expected to provide the schema for hive-partitioned fields. +pub fn hive_partitions_to_schema(partitions: &HashMap<&str, &str>) -> DaftResult { + let partition_fields: Vec = partitions + .iter() + .map(|(&key, &value)| Field::new(key, DaftDataType::from(&infer(value.as_bytes())))) + .collect(); + Schema::new(partition_fields) +} diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 1a6fbb8503..3d3403d3d4 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -5,7 +5,7 @@ use std::{ use common_daft_config::DaftPlanningConfig; use common_display::mermaid::MermaidDisplayOptions; -use common_error::DaftResult; +use common_error::{DaftError, DaftResult}; use common_file_formats::{FileFormat, FileFormatConfig, ParquetSourceConfig}; use common_io_config::IOConfig; use daft_core::{ @@ -19,7 +19,6 @@ use daft_scan::{ PhysicalScanInfo, Pushdowns, ScanOperatorRef, }; use daft_schema::{ - dtype::DataType, field::Field, schema::{Schema, SchemaRef}, }; @@ -194,46 +193,42 @@ impl LogicalPlanBuilder { partitioning_keys.into(), pushdowns.clone().unwrap_or_default(), )); - // If column selection (projection) pushdown is specified, prune unselected columns from the schema. - // If file path column is specified, add it to the schema. - let output_schema = match (&pushdowns, &scan_operator.0.file_path_column()) { - ( - Some(Pushdowns { - columns: Some(columns), - .. - }), - file_path_column_opt, - ) if columns.len() < schema.fields.len() => { - let pruned_fields = schema - .fields - .iter() - .filter(|(name, _)| columns.contains(name)) - .map(|(_, field)| field.clone()); - - let finalized_fields = match file_path_column_opt { - Some(file_path_column) => pruned_fields - .chain(std::iter::once(Field::new( - (*file_path_column).to_string(), - DataType::Utf8, - ))) - .collect::>(), - None => pruned_fields.collect::>(), - }; - Arc::new(Schema::new(finalized_fields)?) - } - (None, Some(file_path_column)) => { - let schema_with_file_path = schema - .fields - .values() - .cloned() - .chain(std::iter::once(Field::new( - (*file_path_column).to_string(), - DataType::Utf8, - ))) - .collect::>(); - Arc::new(Schema::new(schema_with_file_path)?) + // If file path column is specified, check that it doesn't conflict with any column names in the schema. + if let Some(file_path_column) = &scan_operator.0.file_path_column() { + if schema.names().contains(&(*file_path_column).to_string()) { + return Err(DaftError::ValueError(format!( + "Attempting to make a Schema with a file path column name that already exists: {}", + file_path_column + ))); } - _ => schema, + } + // Add partitioning keys to the schema. + let schema_and_partitions = { + let partitioning_fields = partitioning_keys + .iter() + .map(|partition_field| partition_field.clone_field()) + .collect(); + let partitioning_schema = Schema::new(partitioning_fields)?; + // We use the non-distinct union here because some scan operators have table schema information that + // already contain partitioned fields. For example,the deltalake scan operator takes the table schema. + Arc::new(schema.non_distinct_union(&partitioning_schema)) + }; + // If column selection (projection) pushdown is specified, prune unselected columns from the schema. + let output_schema = if let Some(Pushdowns { + columns: Some(columns), + .. + }) = &pushdowns + && columns.len() < schema_and_partitions.fields.len() + { + let pruned_upstream_schema = schema_and_partitions + .fields + .iter() + .filter(|&(name, _)| columns.contains(name)) + .map(|(_, field)| field.clone()) + .collect::>(); + Arc::new(Schema::new(pruned_upstream_schema)?) + } else { + schema_and_partitions }; let logical_plan: LogicalPlan = logical_ops::Source::new(output_schema, source_info.into()).into(); @@ -611,6 +606,7 @@ pub struct ParquetScanBuilder { pub multithreaded: bool, pub schema: Option, pub file_path_column: Option, + pub hive_partitioning: bool, } impl ParquetScanBuilder { @@ -632,6 +628,7 @@ impl ParquetScanBuilder { schema: None, io_config: None, file_path_column: None, + hive_partitioning: false, } } pub fn infer_schema(mut self, infer_schema: bool) -> Self { @@ -664,6 +661,7 @@ impl ParquetScanBuilder { self.multithreaded = multithreaded; self } + pub fn schema(mut self, schema: SchemaRef) -> Self { self.schema = Some(schema); self @@ -674,6 +672,11 @@ impl ParquetScanBuilder { self } + pub fn hive_partitioning(mut self, hive_partitioning: bool) -> Self { + self.hive_partitioning = hive_partitioning; + self + } + pub fn finish(self) -> DaftResult { let cfg = ParquetSourceConfig { coerce_int96_timestamp_unit: self.coerce_int96_timestamp_unit, @@ -691,6 +694,7 @@ impl ParquetScanBuilder { self.infer_schema, self.schema, self.file_path_column, + self.hive_partitioning, )?); LogicalPlanBuilder::table_scan(ScanOperatorRef(operator), None) diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index d4c5e5a230..ba1402849f 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -8,6 +8,7 @@ common-py-serde = {path = "../common/py-serde", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-csv = {path = "../daft-csv", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} +daft-hive = {path = "../daft-hive", default-features = false} daft-io = {path = "../daft-io", default-features = false} daft-json = {path = "../daft-json", default-features = false} daft-parquet = {path = "../daft-parquet", default-features = false} diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 72f6307184..1dd7611a89 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -4,9 +4,14 @@ use common_error::{DaftError, DaftResult}; use common_file_formats::{CsvSourceConfig, FileFormat, FileFormatConfig, ParquetSourceConfig}; use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::CsvParseOptions; +use daft_hive::{hive_partitions_to_1d_table, hive_partitions_to_schema, parse_hive_partitioning}; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef, RuntimeRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; -use daft_schema::{dtype::DataType, field::Field, schema::SchemaRef}; +use daft_schema::{ + dtype::DataType, + field::Field, + schema::{Schema, SchemaRef}, +}; use daft_stats::PartitionSpec; use daft_table::Table; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; @@ -23,6 +28,7 @@ pub struct GlobScanOperator { schema: SchemaRef, storage_config: Arc, file_path_column: Option, + hive_partitioning: bool, partitioning_keys: Vec, } @@ -136,6 +142,7 @@ impl GlobScanOperator { infer_schema: bool, schema: Option, file_path_column: Option, + hive_partitioning: bool, ) -> DaftResult { let first_glob_path = match glob_paths.first() { None => Err(DaftError::ValueError( @@ -168,13 +175,28 @@ impl GlobScanOperator { } .into()), }?; - let partitioning_keys = if let Some(fp_col) = &file_path_column { + let mut partitioning_keys = if let Some(fp_col) = &file_path_column { let partition_field = PartitionField::new(Field::new(fp_col, DataType::Utf8), None, None)?; vec![partition_field; 1] } else { vec![] }; + // If hive partitioning is set, extend the partition keys with hive partition keys. + if hive_partitioning { + let hive_partitions = parse_hive_partitioning(&first_filepath); + let hive_partition_schema = hive_partitions_to_schema(&hive_partitions)?; + let hive_partition_schema = match schema.clone() { + Some(hint) => hive_partition_schema.apply_hints(&hint)?, + None => hive_partition_schema, + }; + let hive_partitioning_keys = hive_partition_schema + .fields + .into_iter() + .map(|(_, field)| PartitionField::new(field, None, None)) + .collect::, _>>()?; + partitioning_keys.extend(hive_partitioning_keys); + } let schema = match infer_schema { true => { @@ -261,6 +283,7 @@ impl GlobScanOperator { schema, storage_config, file_path_column, + hive_partitioning, partitioning_keys, }) } @@ -352,7 +375,14 @@ impl ScanOperator for GlobScanOperator { None }; let file_path_column = self.file_path_column.clone(); - // Create one ScanTask per file + let hive_partitioning = self.hive_partitioning; + let partition_fields = self + .partitioning_keys + .iter() + .map(|partition_spec| partition_spec.clone_field()) + .collect(); + let partition_schema = Schema::new(partition_fields)?; + // Create one ScanTask per file. Ok(Box::new(files.enumerate().filter_map(move |(idx, f)| { let scan_task_result = (|| { let FileMetadata { @@ -360,33 +390,39 @@ impl ScanOperator for GlobScanOperator { size: size_bytes, .. } = f?; - let partition_spec = if let Some(fp_col) = &file_path_column { + // Create a table of partition values based on whether a file_path_column is set + // (this column is inherently a partition). + let mut partition_values = if let Some(fp_col) = &file_path_column { let trimmed = path.trim_start_matches("file://"); let file_paths_column_series = Utf8Array::from_iter(fp_col, std::iter::once(Some(trimmed))).into_series(); - let file_paths_table = - Table::from_nonempty_columns(vec![file_paths_column_series; 1])?; - - if let Some(ref partition_filters) = pushdowns.partition_filters { - let eval_pred = - file_paths_table.eval_expression_list(&[partition_filters.clone()])?; - assert_eq!(eval_pred.num_columns(), 1); - let series = eval_pred.get_column_by_index(0)?; - assert_eq!(series.data_type(), &daft_core::datatypes::DataType::Boolean); - let boolean = series.bool()?; - assert_eq!(boolean.len(), 1); - let value = boolean.get(0); - match value { - None | Some(false) => return Ok(None), - Some(true) => {} - } - } - Some(PartitionSpec { - keys: file_paths_table, - }) + Table::from_nonempty_columns(vec![file_paths_column_series; 1])? } else { - None + Table::empty(None)? }; + // Extend the partition values with hive partitions. + if hive_partitioning { + let hive_partitions = parse_hive_partitioning(&path); + let hive_partition_values = + hive_partitions_to_1d_table(&hive_partitions, &partition_schema)?; + partition_values = if partition_values.is_empty() { + hive_partition_values + } else { + partition_values.union(&hive_partition_values)? + }; + } + // Check if the partition values satisfy the partition filters, if any. + if let Some(partition_filters) = pushdowns.clone().partition_filters { + let filter_result = partition_values.filter(&[partition_filters])?; + if filter_result.is_empty() { + // Skip the current file since it does not satisfy the partition filters. + return Ok(None); + } + } + let partition_spec = Some(PartitionSpec { + keys: partition_values, + }); + let row_group = row_groups .as_ref() .and_then(|rgs| rgs.get(idx).cloned()) diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 7fce95da7f..38dc7b720b 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -12,7 +12,6 @@ use common_error::{DaftError, DaftResult}; use common_file_formats::FileFormatConfig; use daft_dsl::ExprRef; use daft_schema::{ - dtype::DataType, field::Field, schema::{Schema, SchemaRef}, }; @@ -487,41 +486,29 @@ impl ScanTask { #[must_use] pub fn materialized_schema(&self) -> SchemaRef { - match (&self.pushdowns.columns, &self.file_path_column) { - (None, None) => self.schema.clone(), - (Some(columns), file_path_column_opt) => { - let filtered_fields = self - .schema - .fields - .clone() - .into_iter() - .filter(|(name, _)| columns.contains(name)); - - let fields = match file_path_column_opt { - Some(file_path_column) => filtered_fields - .chain(std::iter::once(( - file_path_column.to_string(), - Field::new(file_path_column.to_string(), DataType::Utf8), - ))) - .collect(), - None => filtered_fields.collect(), - }; - - Arc::new(Schema { fields }) + let mut fields = self.schema.fields.clone(); + // Extend the schema with the partition spec. + if let Some(source) = self.sources.first() { + if let Some(partition_spec) = source.get_partition_spec() { + fields.extend( + partition_spec + .keys + .schema + .fields + .iter() + .map(|(name, field)| (name.clone(), field.clone())), + ); } - (None, Some(file_path_column)) => Arc::new(Schema { - fields: self - .schema - .fields - .clone() - .into_iter() - .chain(std::iter::once(( - file_path_column.to_string(), - Field::new(file_path_column.to_string(), DataType::Utf8), - ))) - .collect(), - }), } + // Filter the schema based on the pushdown column filters. + fields = match &self.pushdowns.columns { + None => fields, + Some(columns) => fields + .into_iter() + .filter(|(name, _)| columns.contains(name)) + .collect(), + }; + Arc::new(Schema { fields }) } /// Obtain an accurate, exact num_rows from the ScanTask, or `None` if this is not possible @@ -755,6 +742,10 @@ impl PartitionField { }), } } + + pub fn clone_field(&self) -> Field { + self.field.clone() + } } impl Display for PartitionField { @@ -1102,6 +1093,7 @@ mod test { false, Some(Arc::new(Schema::empty())), None, + false, ) .unwrap(); diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 79db895ec1..22f9b97f06 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -101,11 +101,13 @@ pub mod pylib { } #[staticmethod] + #[allow(clippy::too_many_arguments)] pub fn glob_scan( py: Python, glob_path: Vec, file_format_config: PyFileFormatConfig, storage_config: PyStorageConfig, + hive_partitioning: bool, infer_schema: bool, schema: Option, file_path_column: Option, @@ -118,6 +120,7 @@ pub mod pylib { infer_schema, schema.map(|s| s.schema), file_path_column, + hive_partitioning, )?); Ok(Self { scan_op: ScanOperatorRef(operator), diff --git a/src/daft-sql/src/table_provider/read_parquet.rs b/src/daft-sql/src/table_provider/read_parquet.rs index 7bdc191536..38138b5c0c 100644 --- a/src/daft-sql/src/table_provider/read_parquet.rs +++ b/src/daft-sql/src/table_provider/read_parquet.rs @@ -32,6 +32,7 @@ impl TryFrom for ParquetScanBuilder { let chunk_size = args.try_get_named("chunk_size")?; let file_path_column = args.try_get_named("file_path_column")?; let multithreaded = args.try_get_named("multithreaded")?.unwrap_or(true); + let hive_partitioning = args.try_get_named("hive_partitioning")?.unwrap_or(false); let field_id_mapping = None; // TODO let row_groups = None; // TODO @@ -49,6 +50,7 @@ impl TryFrom for ParquetScanBuilder { multithreaded, schema, file_path_column, + hive_partitioning, }) } }