Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Scanning hive partitioned files where hive columns are partially included in the file #18626

Merged
merged 5 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 52 additions & 19 deletions crates/polars-io/src/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use polars_core::series::Series;
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
///
/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`
///
/// # Safety
///
/// num_rows equals the height of the df when the df height is non-zero.
Expand All @@ -15,27 +17,58 @@ pub(crate) fn materialize_hive_partitions<D>(
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
let Some(first) = hive_columns.first() else {
// Insert these hive columns in the order they are stored in the file.
if hive_columns.is_empty() {
return;
};

if reader_schema.index_of(first.name()).is_some() {
// Insert these hive columns in the order they are stored in the file.
for s in hive_columns {
let i = match df.get_columns().binary_search_by_key(
&reader_schema.index_of(s.name()).unwrap_or(usize::MAX),
|s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN),
) {
Ok(i) => i,
Err(i) => i,
};

df.insert_column(i, s.new_from_index(0, num_rows)).unwrap();
}
} else {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}

let hive_columns_iter = hive_columns.iter().map(|s| s.new_from_index(0, num_rows));

if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
// Fast-path - all hive columns are at the end
unsafe { df.get_columns_mut() }.extend(hive_columns_iter);
return;
}

let out_width: usize = df.width() + hive_columns.len();
let df_columns = df.get_columns();
let mut out_columns = Vec::with_capacity(out_width);

// We have a slightly involved algorithm here because `reader_schema` may contain extra
// columns that were excluded from a projection pushdown.

let hive_columns = hive_columns_iter.collect::<Vec<_>>();
// Safety: These are both non-empty at the start
let mut series_arr = [df_columns, hive_columns.as_slice()];
let mut schema_idx_arr = [
reader_schema.index_of(series_arr[0][0].name()).unwrap(),
reader_schema.index_of(series_arr[1][0].name()).unwrap(),
];

loop {
let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] {
0
} else {
1
};

out_columns.push(series_arr[arg_min][0].clone());
series_arr[arg_min] = &series_arr[arg_min][1..];

if series_arr[arg_min].is_empty() {
break;
}

let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else {
break;
};

schema_idx_arr[arg_min] = i;
}

out_columns.extend_from_slice(series_arr[0]);
out_columns.extend_from_slice(series_arr[1]);

*unsafe { df.get_columns_mut() } = out_columns;
}
}
41 changes: 20 additions & 21 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,27 +1068,26 @@ pub(crate) fn maybe_init_projection_excluding_hive(
// Update `with_columns` with a projection so that hive columns aren't loaded from the
// file
let hive_parts = hive_parts?;

let hive_schema = hive_parts.schema();

let (first_hive_name, _) = hive_schema.get_at_index(0)?;

// TODO: Optimize this
let names = match reader_schema {
Either::Left(ref v) => v
.contains(first_hive_name.as_str())
.then(|| v.iter_names_cloned().collect::<Vec<_>>()),
Either::Right(ref v) => v
.contains(first_hive_name.as_str())
.then(|| v.iter_names_cloned().collect()),
};

let names = names?;

Some(
names
.into_iter()
.filter(|x| !hive_schema.contains(x))
.collect::<Arc<[_]>>(),
)
match &reader_schema {
Either::Left(reader_schema) => hive_schema
.iter_names()
.any(|x| reader_schema.contains(x))
.then(|| {
reader_schema
.iter_names_cloned()
.filter(|x| !hive_schema.contains(x))
.collect::<Arc<[_]>>()
}),
Either::Right(reader_schema) => hive_schema
.iter_names()
.any(|x| reader_schema.contains(x))
.then(|| {
reader_schema
.iter_names_cloned()
.filter(|x| !hive_schema.contains(x))
.collect::<Arc<[_]>>()
}),
}
}
5 changes: 4 additions & 1 deletion crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl HivePartitions {
}
}

/// Note: Returned hive partitions are ordered by their position in the `reader_schema`
///
/// # Safety
/// `hive_start_idx <= [min path length]`
pub fn hive_partitions_from_paths(
Expand Down Expand Up @@ -198,10 +200,11 @@ pub fn hive_partitions_from_paths(
}

let mut hive_partitions = Vec::with_capacity(paths.len());
let buffers = buffers
let mut buffers = buffers
.into_iter()
.map(|x| x.into_series())
.collect::<PolarsResult<Vec<_>>>()?;
buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX));

#[allow(clippy::needless_range_loop)]
for i in 0..paths.len() {
Expand Down
36 changes: 36 additions & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,42 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None:
)
assert_with_projections(lf, rhs)

# partial cols in file
partial_path = tmp_path / "a=1/b=2/partial_data.bin"
df = pl.DataFrame(
{"x": 1, "b": 2, "y": 1},
schema={"x": pl.Int32, "b": pl.Int16, "y": pl.Int32},
)
write_func(df, partial_path)

rhs = rhs.select(
pl.col("x").cast(pl.Int32),
pl.col("b").cast(pl.Int16),
pl.col("y").cast(pl.Int32),
pl.col("a").cast(pl.Int64),
)

lf = scan_func(partial_path, hive_partitioning=True) # type: ignore[call-arg]
assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs)
assert_with_projections(lf, rhs)

lf = scan_func( # type: ignore[call-arg]
partial_path,
hive_schema={"a": pl.String, "b": pl.String},
hive_partitioning=True,
)
rhs = rhs.select(
pl.col("x").cast(pl.Int32),
pl.col("b").cast(pl.String),
pl.col("y").cast(pl.Int32),
pl.col("a").cast(pl.String),
)
assert_frame_equal(
lf.collect(projection_pushdown=projection_pushdown),
rhs,
)
assert_with_projections(lf, rhs)


@pytest.mark.write_disk
def test_hive_partition_dates(tmp_path: Path) -> None:
Expand Down