Skip to content

Support sink_parquet for anonymous scan #8719

Open
@sid-6581

Description

Problem description

I have a use case that I would imagine wouldn't be too out of the ordinary. I have many files in a format that doesn't already have a reader, and I would like to convert them to a parquet file in a streaming fashion. They don't all fit in memory at the same time, so it's important that they are read individually and appended to the parquet file. I tried writing a lazy reader using AnonymousScan, but I get the error sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()' with the following minimal reproduction:

#[derive(Clone)]
struct LazyReader {
    path: PathBuf,
    rechunk: bool,
    row_count: Option<RowCount>,
    n_rows: Option<usize>,
}

impl AnonymousScan for LazyReader {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn scan(&self, _scan_opts: AnonymousScanOptions) -> PolarsResult<DataFrame> {
        df!["test" => ["testValue1", "testValue2"]]
    }

    fn schema(&self, _infer_schema_length: Option<usize>) -> PolarsResult<Schema> {
        Ok(Schema::from(
            [Field::new("test", DataType::Utf8)].into_iter(),
        ))
    }
}

impl LazyFileListReader for LazyReader {
    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
        let options = ScanArgsAnonymous::default();
        LazyFrame::anonymous_scan(Arc::new(self), options)
    }

    fn path(&self) -> &Path {
        &self.path
    }

    fn with_path(mut self, path: PathBuf) -> Self {
        self.path = path;
        self
    }

    fn rechunk(&self) -> bool {
        self.rechunk
    }

    #[must_use]
    fn with_rechunk(mut self, toggle: bool) -> Self {
        self.rechunk = toggle;
        self
    }

    fn n_rows(&self) -> Option<usize> {
        self.n_rows
    }

    fn row_count(&self) -> Option<&RowCount> {
        self.row_count.as_ref()
    }
}

fn main() -> Result<()> {
    let args: Vec<String> = env::args().collect();

    LazyReader {
        path: "test".into(),
        rechunk: false,
        row_count: None,
        n_rows: None,
    }
    .finish()?
    .sink_parquet(
        args[2].clone().into(),
        ParquetWriteOptions {
            compression: ParquetCompression::Zstd(None),
            ..ParquetWriteOptions::default()
        },
    )?;

    Ok(())
}

I found barely any examples of using AnonymousScan, so it's possible I missed something, but I don't know what that might be based on the example in the polars repo. It uses .collect().write_parquet() which won't work for me.

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or an improvement of an existing feature

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions