Skip to content

feat: Parquet modular encryption #16351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e668b99
Initial commit to form PR for datafusion encryption support
corwinjoy May 30, 2025
d38dba4
Add tests for encryption configuration
corwinjoy May 30, 2025
5a2b456
Apply cargo fmt
corwinjoy May 30, 2025
c972676
Add a roundtrip encryption test to the parquet tests.
corwinjoy May 30, 2025
ec3f828
cargo fmt
corwinjoy May 30, 2025
3538a27
Update test to add decryption parameter to called functions.
corwinjoy May 31, 2025
a754992
Try to get DataFrame.write_parquet to work with encryption. Doesn't q…
corwinjoy Jun 3, 2025
e430672
Update datafusion/datasource-parquet/src/opener.rs
corwinjoy Jun 4, 2025
7fcba70
Update datafusion/datasource-parquet/src/source.rs
corwinjoy Jun 4, 2025
d6b1fca
Fix write test in parquet.rs
corwinjoy Jun 4, 2025
3353186
Simplify encryption test. Remove unused imports.
corwinjoy Jun 4, 2025
e4bc0e3
Run cargo fmt.
corwinjoy Jun 4, 2025
f52e79c
Further streamline roundtrip test.
corwinjoy Jun 4, 2025
5615ac8
Change From methods for FileEncryptionProperties and FileDecryptionPr…
corwinjoy Jun 4, 2025
61bc78e
Change encryption config to directly hold column keys using custom co…
corwinjoy Jun 5, 2025
a81855f
Fix generated field names in visit for encryptor and decryptor to use…
corwinjoy Jun 5, 2025
4cf12b3
1. Disable parallel writes with enccryption.
corwinjoy Jun 6, 2025
f29bec3
cargo fmt
corwinjoy Jun 6, 2025
86fe04b
Update datafusion/common/src/file_options/parquet_writer.rs
corwinjoy Jun 6, 2025
d4ea63f
fix variables shown in information schema test.
corwinjoy Jun 6, 2025
0fcc4a5
Merge remote-tracking branch 'origin/parquet_encryption' into parquet…
corwinjoy Jun 6, 2025
86db3a5
Backout bad suggestion from copilot
corwinjoy Jun 6, 2025
b34441a
Remove unused serde reference
corwinjoy Jun 6, 2025
668d728
cargo fmt
corwinjoy Jun 6, 2025
ec1e8da
change file_format.rs to use global encryption options in struct.
corwinjoy Jun 9, 2025
e233408
Turn off page_index for encrypted example. Get encrypted example work…
corwinjoy Jun 9, 2025
9ffaae4
Tidy up example output.
corwinjoy Jun 9, 2025
8e244e9
Add missing license. Run taplo format
corwinjoy Jun 9, 2025
2871d51
Update configs.md by running dev/update_config_docs.sh
corwinjoy Jun 9, 2025
c405167
Cargo fmt + clippy changes.
corwinjoy Jun 9, 2025
506801e
Add filter test for encrypted files.
corwinjoy Jun 9, 2025
3058a90
Cargo clippy changes.
corwinjoy Jun 10, 2025
e7e521a
Merge remote-tracking branch 'origin/main' into parquet_encryption
corwinjoy Jun 10, 2025
bbeecfe
Fix link in README.md
corwinjoy Jun 10, 2025
4ceb072
Add issue tag for parallel writes.
corwinjoy Jun 10, 2025
c998378
Move file encryption and decryption properties out of global options
adamreeve Jun 16, 2025
7780b33
Use config_namespace_with_hashmap for column encryption/decryption props
adamreeve Jun 16, 2025
219d0b3
Merge pull request #5 from adamreeve/crypto_config_namespace
corwinjoy Jun 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ parquet = { version = "55.1.0", default-features = false, features = [
"arrow",
"async",
"object_store",
"encryption",
] }
pbjson = { version = "0.7.0" }
pbjson-types = "0.7"
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ pub async fn main() -> Result<()> {
Options::Cancellation(opt) => opt.run().await,
Options::Clickbench(opt) => opt.run().await,
Options::H2o(opt) => opt.run().await,
Options::Imdb(opt) => opt.run().await,
Options::Imdb(opt) => Box::pin(opt.run()).await,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requested by clippy

Options::ParquetFilter(opt) => opt.run().await,
Options::Sort(opt) => opt.run().await,
Options::SortTpch(opt) => opt.run().await,
Options::Tpch(opt) => opt.run().await,
Options::Tpch(opt) => Box::pin(opt.run()).await,
Options::TpchConvert(opt) => opt.run().await,
}
}
2 changes: 1 addition & 1 deletion benchmarks/src/bin/imdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn main() -> Result<()> {
env_logger::init();
match ImdbOpt::from_args() {
ImdbOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) => {
opt.run().await
Box::pin(opt.run()).await
}
ImdbOpt::Convert(opt) => opt.run().await,
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> Result<()> {
env_logger::init();
match TpchOpt::from_args() {
TpchOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) => {
opt.run().await
Box::pin(opt.run()).await
}
TpchOpt::Convert(opt) => opt.run().await,
}
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ cargo run --example dataframe
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
Expand Down
118 changes: 118 additions & 0 deletions datafusion-examples/examples/parquet_encrypted.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::common::DataFusionError;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::logical_expr::{col, lit};
use datafusion::parquet::encryption::decrypt::FileDecryptionProperties;
use datafusion::parquet::encryption::encrypt::FileEncryptionProperties;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use tempfile::TempDir;

#[tokio::main]
async fn main() -> datafusion::common::Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();

// Find the local path of "alltypes_plain.parquet"
let testdata = datafusion::test_util::parquet_test_data();
let filename = &format!("{testdata}/alltypes_plain.parquet");

// Read the sample parquet file
let parquet_df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?;

// Show information from the dataframe
println!(
"==============================================================================="
);
println!("Original Parquet DataFrame:");
query_dataframe(&parquet_df).await?;

// Setup encryption and decryption properties
let (encrypt, decrypt) = setup_encryption(&parquet_df)?;

// Create a temporary file location for the encrypted parquet file
let tmp_dir = TempDir::new()?;
let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet");
let tempfile_str = tempfile.into_os_string().into_string().unwrap();

// Write encrypted parquet
let mut options = TableParquetOptions::default();
options.crypto.file_encryption = Some((&encrypt).into());
parquet_df
.write_parquet(
tempfile_str.as_str(),
DataFrameWriteOptions::new().with_single_file_output(true),
Some(options),
)
.await?;

// Read encrypted parquet
let ctx: SessionContext = SessionContext::new();
let read_options = ParquetReadOptions::default().file_decryption_properties(decrypt);

let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?;

// Show information from the dataframe
println!("\n\n===============================================================================");
println!("Encrypted Parquet DataFrame:");
query_dataframe(&encrypted_parquet_df).await?;

Ok(())
}

// Show information from the dataframe
async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> {
// show its schema using 'describe'
println!("Schema:");
df.clone().describe().await?.show().await?;

// Select three columns and filter the results
// so that only rows where id > 1 are returned
println!("\nSelected rows and columns:");
df.clone()
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(5)))?
.show()
.await?;

Ok(())
}

// Setup encryption and decryption properties
fn setup_encryption(
parquet_df: &DataFrame,
) -> Result<(FileEncryptionProperties, FileDecryptionProperties), DataFusionError> {
let schema = parquet_df.schema();
let footer_key = b"0123456789012345".to_vec(); // 128bit/16
let column_key = b"1234567890123450".to_vec(); // 128bit/16

let mut encrypt = FileEncryptionProperties::builder(footer_key.clone());
let mut decrypt = FileDecryptionProperties::builder(footer_key.clone());

for field in schema.fields().iter() {
encrypt = encrypt.with_column_key(field.name().as_str(), column_key.clone());
decrypt = decrypt.with_column_key(field.name().as_str(), column_key.clone());
}

let encrypt = encrypt.build()?;
let decrypt = decrypt.build()?;
Ok((encrypt, decrypt))
}
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ arrow-ipc = { workspace = true }
base64 = "0.22.1"
half = { workspace = true }
hashbrown = { workspace = true }
hex = "0.4.3"
indexmap = { workspace = true }
libc = "0.2.172"
log = { workspace = true }
Expand Down
Loading