Skip to content

feat: add --dir option for the cli #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ can connect using psql or language drivers to execute `SELECT` queries against
them.

```
datafusion-postgres 0.1.0
A postgres interface for datatfusion. Serve any CSV/JSON/Arrow files as tables.
datafusion-postgres 0.4.0
A postgres interface for datafusion. Serve any CSV/JSON/Arrow files as tables.

USAGE:
datafusion-postgres [OPTIONS]
datafusion-postgres-cli [OPTIONS]

FLAGS:
-h, --help Prints help information
Expand All @@ -67,8 +67,11 @@ OPTIONS:
--arrow <arrow-tables>... Arrow files to register as table, using syntax `table_name:file_path`
--avro <avro-tables>... Avro files to register as table, using syntax `table_name:file_path`
--csv <csv-tables>... CSV files to register as table, using syntax `table_name:file_path`
-d, --dir <directory> Directory to serve, all supported files will be registered as tables
--host <host> Host address the server listens to, default to 127.0.0.1 [default: 127.0.0.1]
--json <json-tables>... JSON files to register as table, using syntax `table_name:file_path`
--parquet <parquet-tables>... Parquet files to register as table, using syntax `table_name:file_path`
-p <port> Port the server listens to, default to 5432 [default: 5432]
```

For example, we use this command to host `ETTm1.csv` dataset as table `ettm1`.
Expand Down
89 changes: 85 additions & 4 deletions datafusion-postgres-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::ffi::OsStr;
use std::fs;

use datafusion::execution::options::{
ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
Expand Down Expand Up @@ -26,6 +29,9 @@ struct Opt {
/// Avro files to register as table, using syntax `table_name:file_path`
#[structopt(long("avro"))]
avro_tables: Vec<String>,
/// Directory to serve, all supported files will be registered as tables
#[structopt(long("dir"), short("d"))]
directory: Option<String>,
/// Port the server listens to, default to 5432
#[structopt(short, default_value = "5432")]
port: u16,
Expand All @@ -40,12 +46,75 @@ fn parse_table_def(table_def: &str) -> (&str, &str) {
.expect("Use this pattern to register table: table_name:file_path")
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let opts = Opt::from_args();
impl Opt {
fn include_directory_files(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if let Some(directory) = &self.directory {
match fs::read_dir(directory) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}

let session_context = SessionContext::new();
if let Some(ext) = path.extension().and_then(OsStr::to_str) {
let ext_lower = ext.to_lowercase();
if let Some(base_name) = path.file_stem().and_then(|s| s.to_str()) {
match ext_lower.as_ref() {
"json" => {
self.json_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"avro" => {
self.avro_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"parquet" => {
self.parquet_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"csv" => {
self.csv_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"arrow" => {
self.arrow_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
_ => {}
}
}
}
}
}
Err(e) => {
return Err(format!("Failed to load directory {}: {}", directory, e).into());
}
}
}
Ok(())
}
}

async fn setup_session_context(
session_context: &SessionContext,
opts: &Opt,
) -> Result<(), Box<dyn std::error::Error>> {
// Register CSV tables
for (table_name, table_path) in opts.csv_tables.iter().map(|s| parse_table_def(s.as_ref())) {
session_context
Expand Down Expand Up @@ -99,6 +168,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Loaded {} as table {}", table_path, table_name);
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut opts = Opt::from_args();
opts.include_directory_files()?;

let session_context = SessionContext::new();

setup_session_context(&session_context, &opts).await?;

let server_options = ServerOptions::new()
.with_host(opts.host)
.with_port(opts.port);
Expand Down
5 changes: 3 additions & 2 deletions datafusion-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ mod encoder;
mod handlers;
mod information_schema;

pub use handlers::{DfSessionService, HandlerFactory, Parser};

use std::sync::Arc;

use datafusion::prelude::SessionContext;
use getset::{Getters, Setters, WithSetters};
use pgwire::tokio::process_socket;
use tokio::net::TcpListener;

use handlers::HandlerFactory;
pub use handlers::{DfSessionService, Parser};

#[derive(Getters, Setters, WithSetters)]
#[getset(get = "pub", set = "pub", set_with = "pub")]
pub struct ServerOptions {
Expand Down
Loading