Description
Problem description
I have a use case that I would imagine wouldn't be too out of the ordinary. I have many files in a format that doesn't already have a reader, and I would like to convert them to a parquet file in a streaming fashion. They don't all fit in memory at the same time, so it's important that they are read individually and appended to the parquet file. I tried writing a lazy reader using AnonymousScan
, but I get the error sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
with the following minimal reproduction:
#[derive(Clone)]
struct LazyReader {
path: PathBuf,
rechunk: bool,
row_count: Option<RowCount>,
n_rows: Option<usize>,
}
impl AnonymousScan for LazyReader {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn scan(&self, _scan_opts: AnonymousScanOptions) -> PolarsResult<DataFrame> {
df!["test" => ["testValue1", "testValue2"]]
}
fn schema(&self, _infer_schema_length: Option<usize>) -> PolarsResult<Schema> {
Ok(Schema::from(
[Field::new("test", DataType::Utf8)].into_iter(),
))
}
}
impl LazyFileListReader for LazyReader {
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let options = ScanArgsAnonymous::default();
LazyFrame::anonymous_scan(Arc::new(self), options)
}
fn path(&self) -> &Path {
&self.path
}
fn with_path(mut self, path: PathBuf) -> Self {
self.path = path;
self
}
fn rechunk(&self) -> bool {
self.rechunk
}
#[must_use]
fn with_rechunk(mut self, toggle: bool) -> Self {
self.rechunk = toggle;
self
}
fn n_rows(&self) -> Option<usize> {
self.n_rows
}
fn row_count(&self) -> Option<&RowCount> {
self.row_count.as_ref()
}
}
fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
LazyReader {
path: "test".into(),
rechunk: false,
row_count: None,
n_rows: None,
}
.finish()?
.sink_parquet(
args[2].clone().into(),
ParquetWriteOptions {
compression: ParquetCompression::Zstd(None),
..ParquetWriteOptions::default()
},
)?;
Ok(())
}
I found barely any examples of using AnonymousScan
, so it's possible I missed something, but I don't know what that might be based on the example in the polars repo. It uses .collect().write_parquet()
which won't work for me.