Skip to content

Commit a40d4e4

Browse files
author
Brent Gardner
committed
Custom table provider factories
1 parent b175f9a commit a40d4e4

File tree

17 files changed

+193
-145
lines changed

17 files changed

+193
-145
lines changed

.github/workflows/rust.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,9 @@ jobs:
280280
rustup default stable
281281
rustup component add rustfmt
282282
- name: Run
283-
run: ci/scripts/rust_fmt.sh
283+
run: |
284+
echo '' > datafusion/proto/src/generated/datafusion.rs
285+
ci/scripts/rust_fmt.sh
284286
285287
coverage:
286288
name: coverage

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ url = "2.2"
9393
uuid = { version = "1.0", features = ["v4"] }
9494

9595
[dev-dependencies]
96+
async-trait = "0.1.53"
9697
criterion = "0.3"
9798
csv = "1.1.6"
9899
ctor = "0.1.22"

datafusion/core/src/datasource/datasource.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,12 @@ pub trait TableProvider: Sync + Send {
7272
Ok(TableProviderFilterPushDown::Unsupported)
7373
}
7474
}
75+
76+
/// A factory which creates [`TableProvider`]s at runtime given a URL.
77+
///
78+
/// For example, this can be used to create a table "on the fly"
79+
/// from a directory of files only when that name is referenced.
80+
pub trait TableProviderFactory: Sync + Send {
81+
/// Create a TableProvider given name and url
82+
fn create(&self, name: &str, url: &str) -> Arc<dyn TableProvider>;
83+
}

datafusion/core/src/logical_plan/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ pub use datafusion_expr::{
4545
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
4646
},
4747
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
48-
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint,
49-
JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
50-
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
51-
UserDefinedLogicalNode, Values,
48+
CreateView, CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit,
49+
LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
50+
Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
5251
},
5352
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif,
5453
octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace,

datafusion/core/src/logical_plan/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ pub use datafusion_expr::{
2323
display::{GraphvizVisitor, IndentVisitor},
2424
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
2525
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
26-
Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
27-
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
26+
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
27+
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
2828
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
2929
UserDefinedLogicalNode, Values, Window,
3030
},

datafusion/core/tests/sql/create_drop.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use async_trait::async_trait;
19+
use std::any::Any;
1820
use std::io::Write;
1921

22+
use datafusion::datasource::datasource::TableProviderFactory;
23+
use datafusion::execution::context::SessionState;
24+
use datafusion_expr::TableType;
2025
use tempfile::TempDir;
2126

2227
use super::*;
@@ -360,6 +365,74 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
360365
Ok(())
361366
}
362367

368+
struct TestTableProvider {}
369+
370+
impl TestTableProvider {}
371+
372+
#[async_trait]
373+
impl TableProvider for TestTableProvider {
374+
fn as_any(&self) -> &dyn Any {
375+
unimplemented!("TestTableProvider is a stub for testing.")
376+
}
377+
378+
fn schema(&self) -> SchemaRef {
379+
unimplemented!("TestTableProvider is a stub for testing.")
380+
}
381+
382+
fn table_type(&self) -> TableType {
383+
unimplemented!("TestTableProvider is a stub for testing.")
384+
}
385+
386+
async fn scan(
387+
&self,
388+
_ctx: &SessionState,
389+
_projection: &Option<Vec<usize>>,
390+
_filters: &[Expr],
391+
_limit: Option<usize>,
392+
) -> Result<Arc<dyn ExecutionPlan>> {
393+
unimplemented!("TestTableProvider is a stub for testing.")
394+
}
395+
}
396+
397+
struct TestTableFactory {}
398+
399+
impl TableProviderFactory for TestTableFactory {
400+
fn create(&self, _name: &str, _path: &str) -> Arc<dyn TableProvider> {
401+
Arc::new(TestTableProvider {})
402+
}
403+
}
404+
405+
#[tokio::test]
406+
async fn create_custom_table() -> Result<()> {
407+
let mut ctx = SessionContext::new();
408+
ctx.register_table_factory("DELTATABLE", Arc::new(TestTableFactory {}));
409+
410+
let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
411+
ctx.sql(sql).await.unwrap();
412+
413+
let cat = ctx.catalog("datafusion").unwrap();
414+
let schema = cat.schema("public").unwrap();
415+
let exists = schema.table_exist("dt");
416+
assert!(exists, "Table should have been created!");
417+
418+
Ok(())
419+
}
420+
421+
#[tokio::test]
422+
async fn create_bad_custom_table() {
423+
let ctx = SessionContext::new();
424+
425+
let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
426+
let res = ctx.sql(sql).await;
427+
match res {
428+
Ok(_) => panic!("Registration of tables without factories should fail"),
429+
Err(e) => {
430+
assert!(e.to_string().contains("Unable to find factory for"), "Registration of tables without factories should throw correct error")
431+
}
432+
}
433+
}
434+
}
435+
363436
#[tokio::test]
364437
async fn create_csv_table_empty_file() -> Result<()> {
365438
let ctx =

datafusion/core/tests/sql/timestamp.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,6 +1398,7 @@ async fn timestamp_sub_interval_days() -> Result<()> {
13981398
}
13991399

14001400
#[tokio::test]
1401+
#[ignore] // https://github.com/apache/arrow-datafusion/issues/3327
14011402
async fn timestamp_add_interval_months() -> Result<()> {
14021403
let ctx = SessionContext::new();
14031404

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ pub use builder::{table_scan, LogicalPlanBuilder};
2424
pub use plan::{
2525
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
2626
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
27-
EmptyRelation, Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType,
28-
Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition,
29-
Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
30-
Values, Window,
27+
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
28+
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, StringifiedPlan,
29+
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window,
3130
};
3231

3332
pub use display::display_schema;

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,19 +1236,6 @@ pub struct CreateView {
12361236
pub definition: Option<String>,
12371237
}
12381238

1239-
/// Types of files to parse as DataFrames
1240-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1241-
pub enum FileType {
1242-
/// Newline-delimited JSON
1243-
NdJson,
1244-
/// Apache Parquet columnar storage
1245-
Parquet,
1246-
/// Comma separated values
1247-
CSV,
1248-
/// Avro binary records
1249-
Avro,
1250-
}
1251-
12521239
/// Creates an external table.
12531240
#[derive(Clone)]
12541241
pub struct CreateExternalTable {
@@ -1259,7 +1246,7 @@ pub struct CreateExternalTable {
12591246
/// The physical location
12601247
pub location: String,
12611248
/// The file type of physical file
1262-
pub file_type: FileType,
1249+
pub file_type: String,
12631250
/// Whether the CSV file contains a header
12641251
pub has_header: bool,
12651252
/// Delimiter for CSV

datafusion/proto/build.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ fn main() -> Result<(), String> {
3030

3131
#[cfg(feature = "json")]
3232
fn build() -> Result<(), String> {
33-
let descriptor_path = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap())
34-
.join("proto_descriptor.bin");
33+
use std::io::Write;
34+
35+
let out = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap());
36+
let descriptor_path = out.join("proto_descriptor.bin");
3537

3638
prost_build::Config::new()
3739
.file_descriptor_set_path(&descriptor_path)
@@ -47,12 +49,24 @@ fn build() -> Result<(), String> {
4749
.build(&[".datafusion"])
4850
.map_err(|e| format!("pbjson compilation failed: {}", e))?;
4951

52+
// .serde.rs is not a valid package name, so append to datafusion.rs so we can treat it normally
53+
let proto = std::fs::read_to_string(out.join("datafusion.rs")).unwrap();
54+
let json = std::fs::read_to_string(out.join("datafusion.serde.rs")).unwrap();
55+
let mut file = std::fs::OpenOptions::new()
56+
.write(true)
57+
.create(true)
58+
.open("src/generated/datafusion.rs")
59+
.unwrap();
60+
file.write(proto.as_str().as_ref()).unwrap();
61+
file.write(json.as_str().as_ref()).unwrap();
62+
5063
Ok(())
5164
}
5265

5366
#[cfg(not(feature = "json"))]
5467
fn build() -> Result<(), String> {
5568
prost_build::Config::new()
69+
.out_dir("src/generated")
5670
.compile_protos(&["proto/datafusion.proto"], &["proto"])
5771
.map_err(|e| format!("protobuf compilation failed: {}", e))
5872
}

0 commit comments

Comments
 (0)