Skip to content

Commit

Permalink
Address review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Nov 5, 2024
1 parent 18b9994 commit 2d191ef
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
41 changes: 22 additions & 19 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;

use crate::{
hive::{hive_partitions_to_schema, hive_partitions_to_series, parse_hive_partitioning},
hive::{hive_partitions_to_fields, hive_partitions_to_series, parse_hive_partitioning},
storage_config::StorageConfig,
ChunkSpec, DataSource, PartitionField, Pushdowns, ScanOperator, ScanTask, ScanTaskRef,
};
Expand Down Expand Up @@ -176,30 +176,33 @@ impl GlobScanOperator {
}
.into()),
}?;
// If hive partitioning is set, create partition keys from the hive partition keys.
let (mut partitioning_keys, mut generated_fields) = if hive_partitioning {
// If hive partitioning is set, create partition fields from the hive partitions.
let mut partition_fields = if hive_partitioning {
let hive_partitions = parse_hive_partitioning(&first_filepath)?;
let hive_partition_schema = hive_partitions_to_schema(&hive_partitions)?;
let hive_partition_schema = match user_provided_schema.clone() {
Some(hint) => hive_partition_schema.apply_hints(&hint)?,
None => hive_partition_schema,
hive_partitions_to_fields(&hive_partitions)
} else {
vec![]
};
// If file path column is set, extend the partition fields.
if let Some(fp_col) = &file_path_column {
let fp_field = Field::new(fp_col, DataType::Utf8);
partition_fields.push(fp_field);
}
let (partitioning_keys, generated_fields) = if partition_fields.is_empty() {
(vec![], Schema::empty())
} else {
let generated_fields = Schema::new(partition_fields)?;
let generated_fields = match user_provided_schema.clone() {
Some(hint) => generated_fields.apply_hints(&hint)?,
None => generated_fields,
};
let hive_partitioning_keys = (&hive_partition_schema.fields)
// Extract partitioning keys only after the user's schema hints have been applied.
let partitioning_keys = (&generated_fields.fields)
.into_iter()
.map(|(_, field)| PartitionField::new(field.clone(), None, None))
.collect::<Result<Vec<_>, _>>()?;
(hive_partitioning_keys, hive_partition_schema)
} else {
(vec![], Schema::empty())
(partitioning_keys, generated_fields)
};
// If file path column is set, extend the partition keys.
if let Some(fp_col) = &file_path_column {
let partition_field =
PartitionField::new(Field::new(fp_col, DataType::Utf8), None, None)?;
generated_fields = generated_fields
.non_distinct_union(&Schema::new(vec![partition_field.field.clone()])?);
partitioning_keys.push(partition_field);
}

let schema = match infer_schema {
true => {
Expand Down
12 changes: 5 additions & 7 deletions src/daft-scan/src/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,10 @@ pub fn hive_partitions_to_series(
.collect::<DaftResult<Vec<_>>>()
}

/// Turns hive partition key-value pairs into a schema with the partitions' keys as field names, and
/// inferring field types from the partitions' values. We don't do schema type inference here as the
/// user is expected to provide the schema for hive-partitioned fields.
pub fn hive_partitions_to_schema(partitions: &IndexMap<String, String>) -> DaftResult<Schema> {
let partition_fields: Vec<Field> = partitions
/// Turns hive partition key-value pairs into a vector of fields with the partitions' keys as field names, and
/// inferring field types from the partitions' values.
pub fn hive_partitions_to_fields(partitions: &IndexMap<String, String>) -> Vec<Field> {
partitions
.iter()
.map(|(key, value)| {
let inferred_type = infer(value.as_bytes());
Expand All @@ -98,8 +97,7 @@ pub fn hive_partitions_to_schema(partitions: &IndexMap<String, String>) -> DaftR
};
Field::new(key, DaftDataType::from(&inferred_type))
})
.collect();
Schema::new(partition_fields)
.collect()
}

#[cfg(test)]
Expand Down

0 comments on commit 2d191ef

Please sign in to comment.