Skip to content

Commit cfbb14d

Browse files
saikrishna1-bidgelyluckylsk34alamb
authored
Allow SessionContext::read_csv, etc to read multiple files (#4908)
* Added a traitDataFilePaths to convert strings and vector of strings to a vector of URLs. * Added docs and tests. Updated DataFilePaths to accept any vector containing AsRef<str> trait. * Added docs to read_ methods and extended the SessionContext doc. * Ran Cargo fmt * removed CallReadTrait methods * Update read_csv example Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * removed addition to SessionContext example --------- Co-authored-by: Lakkam Sai Krishna Reddy <lakkam.saikrishnareddy@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent ae89960 commit cfbb14d

File tree

1 file changed

+78
-23
lines changed

1 file changed

+78
-23
lines changed

datafusion/core/src/execution/context.rs

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,43 @@ use super::options::{
106106
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, ReadOptions,
107107
};
108108

109+
/// DataFilePaths adds a method to convert strings and vector of strings to vector of [`ListingTableUrl`] URLs.
110+
/// This allows methods such [`SessionContext::read_csv`] and `[`SessionContext::read_avro`]
111+
/// to take either a single file or multiple files.
112+
pub trait DataFilePaths {
113+
/// Parse to a vector of [`ListingTableUrl`] URLs.
114+
fn to_urls(self) -> Result<Vec<ListingTableUrl>>;
115+
}
116+
117+
impl DataFilePaths for &str {
118+
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
119+
Ok(vec![ListingTableUrl::parse(self)?])
120+
}
121+
}
122+
123+
impl DataFilePaths for String {
124+
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
125+
Ok(vec![ListingTableUrl::parse(self)?])
126+
}
127+
}
128+
129+
impl DataFilePaths for &String {
130+
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
131+
Ok(vec![ListingTableUrl::parse(self)?])
132+
}
133+
}
134+
135+
impl<P> DataFilePaths for Vec<P>
136+
where
137+
P: AsRef<str>,
138+
{
139+
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
140+
self.iter()
141+
.map(ListingTableUrl::parse)
142+
.collect::<Result<Vec<ListingTableUrl>>>()
143+
}
144+
}
145+
109146
/// SessionContext is the main interface for executing queries with DataFusion. It stands for
110147
/// the connection between user and DataFusion/Ballista cluster.
111148
/// The context provides the following functionality
@@ -627,22 +664,18 @@ impl SessionContext {
627664
///
628665
/// For more control such as reading multiple files, you can use
629666
/// [`read_table`](Self::read_table) with a [`ListingTable`].
630-
async fn _read_type<'a>(
667+
async fn _read_type<'a, P: DataFilePaths>(
631668
&self,
632-
table_path: impl AsRef<str>,
669+
table_paths: P,
633670
options: impl ReadOptions<'a>,
634671
) -> Result<DataFrame> {
635-
let table_path = ListingTableUrl::parse(table_path)?;
672+
let table_paths = table_paths.to_urls()?;
636673
let session_config = self.copied_config();
637674
let listing_options = options.to_listing_options(&session_config);
638-
let resolved_schema = match options
639-
.get_resolved_schema(&session_config, self.state(), table_path.clone())
640-
.await
641-
{
642-
Ok(resolved_schema) => resolved_schema,
643-
Err(e) => return Err(e),
644-
};
645-
let config = ListingTableConfig::new(table_path)
675+
let resolved_schema = options
676+
.get_resolved_schema(&session_config, self.state(), table_paths[0].clone())
677+
.await?;
678+
let config = ListingTableConfig::new_with_multi_paths(table_paths)
646679
.with_listing_options(listing_options)
647680
.with_schema(resolved_schema);
648681
let provider = ListingTable::try_new(config)?;
@@ -653,24 +686,28 @@ impl SessionContext {
653686
///
654687
/// For more control such as reading multiple files, you can use
655688
/// [`read_table`](Self::read_table) with a [`ListingTable`].
656-
pub async fn read_avro(
689+
///
690+
/// For an example, see [`read_csv`](Self::read_csv)
691+
pub async fn read_avro<P: DataFilePaths>(
657692
&self,
658-
table_path: impl AsRef<str>,
693+
table_paths: P,
659694
options: AvroReadOptions<'_>,
660695
) -> Result<DataFrame> {
661-
self._read_type(table_path, options).await
696+
self._read_type(table_paths, options).await
662697
}
663698

664699
/// Creates a [`DataFrame`] for reading an JSON data source.
665700
///
666701
/// For more control such as reading multiple files, you can use
667702
/// [`read_table`](Self::read_table) with a [`ListingTable`].
668-
pub async fn read_json(
703+
///
704+
/// For an example, see [`read_csv`](Self::read_csv)
705+
pub async fn read_json<P: DataFilePaths>(
669706
&self,
670-
table_path: impl AsRef<str>,
707+
table_paths: P,
671708
options: NdJsonReadOptions<'_>,
672709
) -> Result<DataFrame> {
673-
self._read_type(table_path, options).await
710+
self._read_type(table_paths, options).await
674711
}
675712

676713
/// Creates an empty DataFrame.
@@ -685,24 +722,42 @@ impl SessionContext {
685722
///
686723
/// For more control such as reading multiple files, you can use
687724
/// [`read_table`](Self::read_table) with a [`ListingTable`].
688-
pub async fn read_csv(
725+
///
726+
/// Example usage is given below:
727+
///
728+
/// ```
729+
/// use datafusion::prelude::*;
730+
/// # use datafusion::error::Result;
731+
/// # #[tokio::main]
732+
/// # async fn main() -> Result<()> {
733+
/// let ctx = SessionContext::new();
734+
/// // You can read a single file using `read_csv`
735+
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
736+
/// // you can also read multiple files:
737+
/// let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?;
738+
/// # Ok(())
739+
/// # }
740+
/// ```
741+
pub async fn read_csv<P: DataFilePaths>(
689742
&self,
690-
table_path: impl AsRef<str>,
743+
table_paths: P,
691744
options: CsvReadOptions<'_>,
692745
) -> Result<DataFrame> {
693-
self._read_type(table_path, options).await
746+
self._read_type(table_paths, options).await
694747
}
695748

696749
/// Creates a [`DataFrame`] for reading a Parquet data source.
697750
///
698751
/// For more control such as reading multiple files, you can use
699752
/// [`read_table`](Self::read_table) with a [`ListingTable`].
700-
pub async fn read_parquet(
753+
///
754+
/// For an example, see [`read_csv`](Self::read_csv)
755+
pub async fn read_parquet<P: DataFilePaths>(
701756
&self,
702-
table_path: impl AsRef<str>,
757+
table_paths: P,
703758
options: ParquetReadOptions<'_>,
704759
) -> Result<DataFrame> {
705-
self._read_type(table_path, options).await
760+
self._read_type(table_paths, options).await
706761
}
707762

708763
/// Creates a [`DataFrame`] for a [`TableProvider`] such as a

0 commit comments

Comments
 (0)