Skip to content

Commit 0c51fdb

Browse files
committed
feat: add CsvExecBuilder, deprecate CsvExec::new
This adds the `CsvExecBuilder` struct for building a `CsvExec` instance, and deprecates the `CsvExec::new` method which has grown too large. There are some `TODO`s related to the duplication of formatting options and their defaults coming from multiple places. Uses of the deprecated `new` method have not been updated yet.
1 parent c8ef545 commit 0c51fdb

File tree

1 file changed

+164
-14
lines changed
  • datafusion/core/src/datasource/physical_plan

1 file changed

+164
-14
lines changed

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 164 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,27 @@ use object_store::{GetOptions, GetResultPayload, ObjectStore};
4949
use tokio::io::AsyncWriteExt;
5050
use tokio::task::JoinSet;
5151

52-
/// Execution plan for scanning a CSV file
52+
/// Execution plan for scanning a CSV file.
53+
///
54+
/// # Example: create a `CsvExec`
55+
/// ```
56+
/// # use std::sync::Arc;
57+
/// # use arrow::datatypes::Schema;
58+
/// # use datafusion::datasource::{
59+
/// # physical_plan::{CsvExec, FileScanConfig},
60+
/// # listing::PartitionedFile,
61+
/// # };
62+
/// # use datafusion_execution::object_store::ObjectStoreUrl;
63+
/// # let object_store_url = ObjectStoreUrl::local_filesystem();
64+
/// # let file_schema = Arc::new(Schema::empty());
65+
/// // Create a CsvExec for reading the first 100MB of `file1.csv`
66+
/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema)
67+
/// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024));
68+
/// let exec = CsvExec::builder(file_scan_config)
69+
/// .with_has_header(true) // The file has a header row
70+
/// .with_newlines_in_values(true) // The file contains newlines in values
71+
/// .build();
72+
/// ```
5373
#[derive(Debug, Clone)]
5474
pub struct CsvExec {
5575
base_config: FileScanConfig,
@@ -67,27 +87,124 @@ pub struct CsvExec {
6787
cache: PlanProperties,
6888
}
6989

70-
impl CsvExec {
71-
/// Create a new CSV reader execution plan provided base and specific configurations
72-
#[allow(clippy::too_many_arguments)]
73-
pub fn new(
74-
base_config: FileScanConfig,
75-
has_header: bool,
76-
delimiter: u8,
77-
quote: u8,
78-
escape: Option<u8>,
79-
comment: Option<u8>,
80-
newlines_in_values: bool,
90+
/// Builder for [`CsvExec`].
91+
///
92+
/// See example on [`CsvExec`].
93+
#[derive(Debug, Clone)]
94+
pub struct CsvExecBuilder {
95+
file_scan_config: FileScanConfig,
96+
file_compression_type: FileCompressionType,
97+
// TODO: it seems like these format options could be reused across all the various CSV config
98+
has_header: bool,
99+
delimiter: u8,
100+
quote: u8,
101+
escape: Option<u8>,
102+
comment: Option<u8>,
103+
newlines_in_values: bool,
104+
}
105+
106+
impl CsvExecBuilder {
107+
/// Create a new builder to read the provided file scan configuration.
108+
pub fn new(file_scan_config: FileScanConfig) -> Self {
109+
Self {
110+
file_scan_config,
111+
// TODO: these defaults are duplicated from `CsvOptions` - should they be computed?
112+
has_header: false,
113+
delimiter: b',',
114+
quote: b'"',
115+
escape: None,
116+
comment: None,
117+
newlines_in_values: false,
118+
file_compression_type: FileCompressionType::UNCOMPRESSED,
119+
}
120+
}
121+
122+
/// Set whether the first row defines the column names.
123+
///
124+
/// The default value is `false`.
125+
pub fn with_has_header(mut self, has_header: bool) -> Self {
126+
self.has_header = has_header;
127+
self
128+
}
129+
130+
/// Set the column delimeter.
131+
///
132+
/// The default is `,`.
133+
pub fn with_delimeter(mut self, delimiter: u8) -> Self {
134+
self.delimiter = delimiter;
135+
self
136+
}
137+
138+
/// Set the quote character.
139+
///
140+
/// The default is `"`.
141+
pub fn with_quote(mut self, quote: u8) -> Self {
142+
self.quote = quote;
143+
self
144+
}
145+
146+
/// Set the escape character.
147+
///
148+
/// The default is `None` (i.e. quotes cannot be escaped).
149+
pub fn with_escape(mut self, escape: Option<u8>) -> Self {
150+
self.escape = escape;
151+
self
152+
}
153+
154+
/// Set the comment character.
155+
///
156+
/// The default is `None` (i.e. comments are not supported).
157+
pub fn with_comment(mut self, comment: Option<u8>) -> Self {
158+
self.comment = comment;
159+
self
160+
}
161+
162+
/// Set whether newlines in (quoted) values are supported.
163+
///
164+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
165+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
166+
/// parsed successfully, which may reduce performance.
167+
///
168+
/// The default value is `false`.
169+
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
170+
self.newlines_in_values = newlines_in_values;
171+
self
172+
}
173+
174+
/// Set the file compression type.
175+
///
176+
/// The default is [`FileCompressionType::UNCOMPRESSED`].
177+
pub fn with_file_compression_type(
178+
mut self,
81179
file_compression_type: FileCompressionType,
82180
) -> Self {
181+
self.file_compression_type = file_compression_type;
182+
self
183+
}
184+
185+
/// Build a [`CsvExec`].
186+
#[must_use]
187+
pub fn build(self) -> CsvExec {
188+
let Self {
189+
file_scan_config: base_config,
190+
file_compression_type,
191+
has_header,
192+
delimiter,
193+
quote,
194+
escape,
195+
comment,
196+
newlines_in_values,
197+
} = self;
198+
83199
let (projected_schema, projected_statistics, projected_output_ordering) =
84200
base_config.project();
85-
let cache = Self::compute_properties(
201+
let cache = CsvExec::compute_properties(
86202
projected_schema,
87203
&projected_output_ordering,
88204
&base_config,
89205
);
90-
Self {
206+
207+
CsvExec {
91208
base_config,
92209
projected_statistics,
93210
has_header,
@@ -101,6 +218,39 @@ impl CsvExec {
101218
comment,
102219
}
103220
}
221+
}
222+
223+
impl CsvExec {
224+
/// Create a new CSV reader execution plan provided base and specific configurations
225+
#[deprecated(since = "41.0.0", note = "use `CsvExec::builder` or `CsvExecBuilder`")]
226+
#[allow(clippy::too_many_arguments)]
227+
pub fn new(
228+
base_config: FileScanConfig,
229+
has_header: bool,
230+
delimiter: u8,
231+
quote: u8,
232+
escape: Option<u8>,
233+
comment: Option<u8>,
234+
newlines_in_values: bool,
235+
file_compression_type: FileCompressionType,
236+
) -> Self {
237+
CsvExecBuilder::new(base_config)
238+
.with_has_header(has_header)
239+
.with_delimeter(delimiter)
240+
.with_quote(quote)
241+
.with_escape(escape)
242+
.with_comment(comment)
243+
.with_newlines_in_values(newlines_in_values)
244+
.with_file_compression_type(file_compression_type)
245+
.build()
246+
}
247+
248+
/// Return a [`CsvExecBuilder`].
249+
///
250+
/// See example on [`CsvExec`] and [`CsvExecBuilder`] for specifying CSV table options.
251+
pub fn builder(file_scan_config: FileScanConfig) -> CsvExecBuilder {
252+
CsvExecBuilder::new(file_scan_config)
253+
}
104254

105255
/// Ref to the base configs
106256
pub fn base_config(&self) -> &FileScanConfig {

0 commit comments

Comments
 (0)