Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with join operations followed by sink_csv on LazyFrame #15157

Open
2 tasks done
PhilippJunk opened this issue Mar 19, 2024 · 7 comments
Open
2 tasks done

Problem with join operations followed by sink_csv on LazyFrame #15157

PhilippJunk opened this issue Mar 19, 2024 · 7 comments
Labels
bug Something isn't working invalid A bug report that is not actually a bug python Related to Python Polars

Comments

@PhilippJunk
Copy link

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import os

import numpy as np
import polars as pl

rng = np.random.default_rng()

test_dir = "./test_sink"
os.makedirs(test_dir, exist_ok=True)

DATA_SIZE = 10_000


def joins_only(df1, df2):
    overlap_df = df1.join(df2, on=["a", "b"], how="inner")
    size_overlap = overlap_df.select(pl.len()).collect().item()
    print(f"The size of overlap is: {size_overlap}")
    df1 = df1.join(overlap_df, on=["a", "b"], how="anti")
    return df1



dummy_df_1 = pl.DataFrame(
    {
        "a": rng.normal(size=DATA_SIZE),
        "b": rng.normal(size=DATA_SIZE),
        "c": np.full(DATA_SIZE, "a"),
    }
).lazy()

dummy_df_2 = pl.DataFrame(
    {
        "a": rng.normal(size=DATA_SIZE),
        "b": rng.normal(size=DATA_SIZE),
        "c": np.full(DATA_SIZE, "b"),
    }
).lazy()


removed_overlaps = joins_only(dummy_df_1, dummy_df_2)

removed_overlaps.collect().write_csv(os.path.join(test_dir, "write.csv"))
removed_overlaps.sink_csv(os.path.join(test_dir, "sink.csv")) # <- This line raises an error


# does it matter that overlap is empty?
dummy_df_3 = pl.concat([dummy_df_1, dummy_df_2])
removed_overlaps = joins_only(dummy_df_1, dummy_df_3)

The same problem exists if the overlap is not empty:

# does it matter that overlap is empty?
dummy_df_3 = pl.concat([dummy_df_1, dummy_df_2])
removed_overlaps = joins_only(dummy_df_1, dummy_df_3)

removed_overlaps.collect().write_csv(os.path.join(test_dir, "write.csv"))
removed_overlaps.sink_csv(os.path.join(test_dir, "sink.csv")) # <- This line raises an error as well

I originally noticed this after an additional concat operation, which does not error, but silently omits some of the data:

def joins_and_concat(df1, df2):
    overlap_df = df1.join(df2, on=["a", "b"], how="inner")
    df1 = df1.join(overlap_df, on=["a", "b"], how="anti")
    return pl.concat([df1, df2])

concat = joins_and_concat(dummy_df_1, dummy_df_2)
concat.sink_csv(os.path.join(test_dir, "sink.csv"))  # <- no error, but part of output is missing
concat.collect().write_csv(os.path.join(test_dir, "write.csv"))

Log output

join parallel: true
INNER join dataframes finished
join parallel: false
join parallel: false
INNER join dataframes finished
ANTI join dataframes finished
join parallel: false
INNER join dataframes finished
join parallel: false
join parallel: false
INNER join dataframes finished
ANTI join dataframes finished
Traceback (most recent call last):
  File "/home/philipp/work/others/2024-03-19_polars_concat_bug/report.py", line 50, in <module>
    removed_overlaps.sink_csv(os.path.join(test_dir, "join_sink.csv"))
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 134, in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 134, in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/_utils/unstable.py", line 59, in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/lazyframe/frame.py", line 2404, in sink_csv
    return lf.sink_csv(
           ^^^^^^^^^^^^
polars.exceptions.InvalidOperationError: sink_Csv(CsvWriterOptions { include_bom: false, include_header: true, batch_size: 1024, maintain_order: true, serialize_options: SerializeOptions { date_format: None, time_format: None, datetime_format: None, float_precision: None, separator: 44, quote_char: 34, null: "", line_terminator: "\n", quote_style: Necessary } }) not yet supported in standard engine. Use 'collect().write_parquet()'

Issue description

sink_csv does not behave as expected after join operations on LazyFrames. In some cases it errors. In other cases, it silently produces different results compared to collect().write_csv()

Expected behavior

df.sink_csv(file) and df.collect().write_csv(file) should lead to the identical output.

Installed versions

--------Version info---------
Polars:               0.20.16
Index type:           UInt32
Platform:             Linux-6.5.0-25-generic-x86_64-with-glibc2.35
Python:               3.12.2 | packaged by conda-forge | (main, Feb 16 2024, 20:50:58) [GCC 12.3.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               <not installed>
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
numpy:                1.26.4
openpyxl:             <not installed>
pandas:               <not installed>
pyarrow:              <not installed>
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>

@PhilippJunk PhilippJunk added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Mar 19, 2024
@ritchie46
Copy link
Member

As the error states, we don't support the full query streaming yet, so sink_csv results in that error.

This is expected behavior. In the future we might resolve the collect() -> write ourselves, but this is expected behavior.

We are working on supporting more of our queries streaming, it is an ongoing process.

@ritchie46 ritchie46 added invalid A bug report that is not actually a bug and removed needs triage Awaiting prioritization by a maintainer labels Mar 19, 2024
@PhilippJunk
Copy link
Author

Thanks for clarifying. Is it also expected behavior that the third examples runs without an error, but silently omits anything from the join?

@alkment
Copy link

alkment commented Jun 20, 2024

It would be great if an error could be raised when trying to sink a query that's not fully supported instead of generating an incorrect result.

Unsurprisingly this affects sink_parquet too.

@sclamons
Copy link

sclamons commented Jul 11, 2024

EDIT: This might be different as there's not even a join here, just a concat. Should this be its own issue? Or is it the same underlying problem?

Also ran across this problem, and worked up a minimal example. The streaming engine is probably the biggest draw of polars for me, so I'd really love to see this fixed.


Minimal example

df1 = pl.LazyFrame({"Name": ["A"], "X": [1]})
df2 = pl.LazyFrame({"Name": ["B"], "X": [2]})
merged_df = pl.concat([df1, df2], how="align")
merged_df.sink_csv("sunk.csv")
merged_df.collect().write_csv("written.csv")

Contents of sunk.csv:

Name,X
B,2

Contents of written.csv:

Name,X
A,1
B,2

Interestingly this bug goes away if you omit the how="align" from the join. In that case, sunk.csv is identical to written.csv.

@sclamons
Copy link

sclamons commented Jul 11, 2024

EDIT: This example doesn't take the same logical path—it's actually equivalent (I think) to my above example without how="align", so no surprise it works.

One more detail -- the Rust interface doesn't replicate the issue with the same logical setup:

use polars::prelude::*;

fn main() {
    
    let sink_df = make_concat_lazy_example();
    sink_df.sink_csv("out/sunk.csv", CsvWriterOptions::default());

    let write_df = make_concat_lazy_example();
    let mut write_file = std::fs::File::create("out/written.csv").unwrap();
    CsvWriter::new(&mut write_file).finish(&mut write_df.collect().unwrap());
}

fn make_concat_lazy_example() -> LazyFrame {
    let df1 = df![
        "Name" => ["A"],
        "X"    => [1]
    ].unwrap().lazy();
    let df2 = df![
        "Name" => ["B"],
        "X"    => [2]
    ].unwrap().lazy();

    let merged_df = concat(
        [df1, df2], 
        UnionArgs::default()
    ).unwrap();
    merged_df
}

This produces identical CSVs containing both rows.

@cmdlineluser
Copy link
Contributor

cmdlineluser commented Jul 11, 2024

@sclamons align is a wrapper around full joins:

x.join(y, how="full", on=common_cols, suffix="_PL_CONCAT_RIGHT")

It looks like coalesce() is causing the issue on the streaming engine, without it - both rows are present.

(df1.join(df2, how="full", on=["Name", "X"], suffix="_PL_CONCAT_RIGHT")
    .with_columns(
       pl.coalesce([name, f"{name}_PL_CONCAT_RIGHT"])
       for name in ["Name", "X"]
    )
    .collect(streaming=True)
)

# shape: (1, 4)
# ┌──────┬─────┬──────────────────────┬───────────────────┐
# │ Name ┆ X   ┆ Name_PL_CONCAT_RIGHT ┆ X_PL_CONCAT_RIGHT │
# │ ---  ┆ --- ┆ ---                  ┆ ---               │
# │ str  ┆ i64 ┆ str                  ┆ i64               │
# ╞══════╪═════╪══════════════════════╪═══════════════════╡
# │ B    ┆ 2   ┆ B                    ┆ 2                 │
# └──────┴─────┴──────────────────────┴───────────────────┘

I think your Rust example is just doing a default vertical concat, so it's not equivalent to the Python repro?

@sclamons
Copy link

@cmdlineluser Yes, you're right—the Rust example isn't taking the how="align" path, so no surprise it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working invalid A bug report that is not actually a bug python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

5 participants