Skip to content
Open
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[workspace]
resolver = "2"
members = ["crates/paimon"]
members = ["crates/paimon", "crates/example", "crates/integrations/datafusion"]

[workspace.package]
version = "0.0.0"
Expand All @@ -26,3 +26,7 @@ homepage = "https://paimon.apache.org/"
repository = "https://github.com/apache/paimon-rust"
license = "Apache-2.0"
rust-version = "1.86.0"

[workspace.dependencies]
paimon = { version = "0.0.0", path = "./crates/paimon" }

35 changes: 35 additions & 0 deletions crates/example/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "paimon-examples"
repository.workspace = true
edition.workspace = true
license.workspace = true
version.workspace = true

[dependencies]
paimon = { workspace = true }
tokio = { version = "1", features = ["full"] }
# Match paimon-datafusion's datafusion to avoid duplicate/conflicting versions.
datafusion = { version = "49", default-features = false, features = [
"nested_expressions",
"datetime_expressions",
"regex_expressions",
"string_expressions",
"unicode_expressions",
"recursive_protection",
] }
paimon-datafusion = { path = "../integrations/datafusion" }
futures-util = "0.3.31"

[[example]]
name = "paimon-read"
path = "src/paimon_read.rs"


[[example]]
name = "paimon-read-dv"
path = "src/paimon_dv_read.rs"


[[example]]
name = "paimon-read-dv-datafusion"
path = "src/paimon_datafusion_read.rs"
60 changes: 60 additions & 0 deletions crates/example/src/paimon_datafusion_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::prelude::SessionContext;
use paimon::catalog::{Catalog, FileSystemCatalog, Identifier};
use paimon::io::FileIOBuilder;
use paimon_datafusion::PaimonTableProvider;

/// Example: read a Paimon table via DataFusion.
/// Set env PAIMON_WAREHOUSE to your warehouse path (default: ./paimon-warehouse).
#[tokio::main]
async fn main() -> paimon::Result<()> {
let warehouse_path = "/Users/yuxia/Projects/paimon/paimon-rust-demo/warehouse";

let file_io = FileIOBuilder::new("file").build()?;
let catalog = FileSystemCatalog::new(file_io, warehouse_path);

let table_id = Identifier::new("default", "T");
let table = catalog.get_table(&table_id).await?;

let provider = PaimonTableProvider::try_new(table).map_err(|e| {
paimon::Error::DataInvalid {
message: e.to_string(),
source: None,
}
})?;

let ctx = SessionContext::new();
ctx.register_table("my_table", Arc::new(provider))
.unwrap();

let df = ctx.sql("SELECT * FROM my_table").await.map_err(|e| {
paimon::Error::DataInvalid {
message: e.to_string(),
source: None,
}
})?;
df.show().await.map_err(|e| paimon::Error::DataInvalid {
message: e.to_string(),
source: None,
})?;

Ok(())
}
44 changes: 44 additions & 0 deletions crates/example/src/paimon_dv_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use futures_util::{StreamExt, TryStreamExt};
use paimon::catalog::{Catalog, FileSystemCatalog, Identifier};
use paimon::io::FileIOBuilder;

#[tokio::main]
async fn main() -> paimon::Result<()> {
let warehouse_path = "/Users/yuxia/Projects/paimon/paimon-rust-demo/warehouse";
let file_io = FileIOBuilder::new("file").build()?;
let catalog = FileSystemCatalog::new(file_io, warehouse_path);

let table_id = Identifier::new("default", "T");

let table = catalog.get_table(&table_id).await?;

let plan = table.new_read_builder().new_scan().plan().await?;

let mut arow_batch_stream = table
.new_read_builder()
.new_read()
.to_arrow(plan.splits())?;

while let Some(Ok(record_batch)) = arow_batch_stream.next().await {
println!("RecordBatch: {record_batch:#?}");
}

Ok(())
}
33 changes: 33 additions & 0 deletions crates/example/src/paimon_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
mod paimon_dv_read;
mod paimon_datafusion_read;

use futures_util::StreamExt;
use paimon::catalog::Identifier;
use paimon::io::FileIOBuilder;
use paimon::{Catalog, FileSystemCatalog};

use paimon::Result;

#[tokio::main]
async fn main() -> Result<()> {
let warehouse_path = "/Users/yuxia/Desktop/paimon-warehouse";
let file_io = FileIOBuilder::new("file").build()?;
let catalog = FileSystemCatalog::new(file_io, warehouse_path);

let table_id = Identifier::new("default", "T");

let table = catalog.get_table(&table_id).await?;

let plan = table.new_read_builder().new_scan().plan().await?;

let mut arow_batch_stream = table
.new_read_builder()
.new_read()
.to_arrow(plan.splits())?;

while let Some(Ok(record_batch)) = arow_batch_stream.next().await {
println!("RecordBatch: {record_batch:#?}");
}

Ok(())
}
42 changes: 42 additions & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "paimon-datafusion"
edition.workspace = true
version.workspace = true
license.workspace = true
description = "Apache Paimon DataFusion Integration (read-only)"
categories = ["database"]
keywords = ["paimon", "datafusion", "integrations"]

[dependencies]
async-trait = "0.1"
# Disable default features to avoid pulling in compression (xz2/lzma), which can
# conflict with other crates that link to liblzma.
datafusion = { version = "49", default-features = false, features = [
"nested_expressions",
"datetime_expressions",
"regex_expressions",
"string_expressions",
"unicode_expressions",
"recursive_protection",
] }
paimon = { path = "../../paimon" }
futures = "0.3"
tokio = { version = "1", features = ["macros"] }

21 changes: 21 additions & 0 deletions crates/integrations/datafusion/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Converts a Paimon error into a DataFusion error.
pub fn to_datafusion_error(error: paimon::Error) -> datafusion::error::DataFusionError {
datafusion::error::DataFusionError::Plan(error.to_string())
}
46 changes: 46 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Apache Paimon DataFusion Integration (read-only).
//!
//! Register a Paimon table as a DataFusion table provider to query it with SQL or DataFrame API.
//!
//! # Example
//!
//! ```ignore
//! use std::sync::Arc;
//! use datafusion::prelude::SessionContext;
//! use paimon_datafusion::PaimonTableProvider;
//!
//! // Obtain a Paimon Table (e.g. from your catalog), then:
//! let provider = PaimonTableProvider::try_new(table)?;
//! let ctx = SessionContext::new();
//! ctx.register_table("my_table", Arc::new(provider))?;
//! let df = ctx.sql("SELECT * FROM my_table").await?;
//! ```
//!
//! This version does not support write, column projection, or predicate pushdown.

mod error;
mod physical_plan;
mod schema;
mod table;

pub use error::to_datafusion_error;
pub use physical_plan::PaimonTableScan;
pub use schema::paimon_schema_to_arrow;
pub use table::PaimonTableProvider;
20 changes: 20 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub(crate) mod scan;

pub use scan::PaimonTableScan;
Loading
Loading