Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The full streaming engine pipeline scan > join with outer_coalesce > sink does not work #15472

Open
2 tasks done
avlonder opened this issue Apr 4, 2024 · 2 comments
Open
2 tasks done
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@avlonder
Copy link

avlonder commented Apr 4, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import shutil
import os
from itertools import product
from random import shuffle
from unittest import TestCase
import polars as pl

class DataFrameTest(TestCase):

   def setUp(self) -> None:
        self.__temp_folder = f'temp-{self.id()}'
        os.makedirs(self.__temp_folder)

   def tearDown(self) -> None:
        shutil.rmtree(self.__temp_folder)

  @pl.StringCache()
  def test_sum_counts_out_of_core_parquet(self) -> None:
  
          schema = [
              ('a', pl.Categorical(ordering='physical')),
              ('b', pl.Categorical(ordering='physical')),
              ('c', pl.Categorical(ordering='physical')),
              ('d', pl.Categorical(ordering='physical')),
              ('e', pl.Categorical(ordering='physical')),
              ('counts', pl.UInt32)
          ]
  
          # create two DataFrames with 6 columns and 100_000 rows
          # for each row, the categorical columns contain stringified integers and the counts column is the sum of these integers
          size = 10
          overlap_size = 5
          lf_list = []
          for n, offset in enumerate([0, overlap_size]):
              tuples = [tuple([str(x) for x in t] + [sum(t)]) for t in product(*[list(range(offset, size + offset))] * 5)]
              shuffle(tuples)
              data = {column_name: [t[i] for t in tuples] for i, (column_name, _) in enumerate(schema)}
              df = pl.from_dict(data, schema=schema)
              # write to parquet file
              file_path = f'{self.__temp_folder}/df{n}.parquet'
              df.write_parquet(file_path)
              # scan the parquet file
              lf_list.append(pl.scan_parquet(file_path, cache=False, low_memory=True))
  
          # merge, summing counts
          [lf1, lf2] = lf_list
          all_columns_except_counts = [c for c in lf1.columns if c != 'counts']
          merged_lf = lf1.join(lf2, on=all_columns_except_counts, how='outer_coalesce', join_nulls=True)
          merged_lf = merged_lf.with_columns(pl.sum_horizontal('counts', 'counts_right').alias('counts'))
          merged_lf = merged_lf.drop('counts_right')
  
          # check if streaming engine is supported
          self.assertIn('STREAMING', merged_lf.explain(streaming=True))
  
          # check if writing to an ipc file with the streaming engine creates a file
          file_path = f'{self.__temp_folder}/df_merged'
          merged_lf.sink_ipc(file_path, compression=None)
          self.assertTrue(os.path.exists(file_path))
  
          # check if the created ipc file contains data
          merged_df = pl.read_ipc(file_path)
          self.assertFalse(merged_df.is_empty())
  
          # check if the created ipc file contains the right data types
          self.assertCountEqual(list(merged_df.schema.items()), schema)
  
          # check if the created ipc file has the right shape
          n_cols = len(schema)
          n_rows = 2 * size**(n_cols-1) - overlap_size**(n_cols-1)
          self.assertEqual(merged_df.shape, (n_rows, n_cols))
  
          # check if the created ipc file contains the correct data:
          #   check if the overlap has the right shape
          #   check if the counts of the overlap equals the sum of the categorical columns times 2
          #   check if the counts of the overlap equals the sum of the categorical columns times 1
          overlap_range = [str(x) for x in range(overlap_size, size)]
          overlap_expr = pl.all_horizontal(pl.exclude('counts').is_in(overlap_range))
          self.assertEqual(merged_df.filter(overlap_expr).shape, (overlap_size**(n_cols-1), n_cols))
          self.assertTrue(merged_df.filter(overlap_expr).with_columns(pl.col('counts').eq(
              2*pl.sum_horizontal(pl.exclude('counts').cast(str).cast(int))))['counts'].all())
          self.assertTrue(merged_df.filter(~overlap_expr).with_columns(pl.col('counts').eq(
              1*pl.sum_horizontal(pl.exclude('counts').cast(str).cast(int))))['counts'].all())

Log output

AssertionError: Tuples differ: (100000, 6) != (196875, 6)
First differing element 0:
100000
196875
- (100000, 6)
+ (196875, 6)

Issue description

Observations:

  • when materializing before the sink (merged_lf.collect().lazy()), the test succeeds
  • it does not matter what type of sink is used (sink_ipc, sink_parquet, sink_csv), they all fail
  • when merging and summing counts with another method that is not fully out-of-core, such as a concat + group_by + sum, the test succeeds

Expected behavior

I expect the test to succeed instead of throwing the above mentioned error.

Installed versions

--------Version info---------
Polars:               0.20.18
Index type:           UInt32
Platform:             Linux-6.1.0-1036-oem-x86_64-with-glibc2.35
Python:               3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               <not installed>
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         1.6.0
numpy:                1.26.4
openpyxl:             <not installed>
pandas:               2.2.1
pyarrow:              15.0.2
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@avlonder avlonder added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Apr 4, 2024
@proever
Copy link

proever commented May 15, 2024

I think this is the issue that I am seeing right now too.

I am creating two large LazyFrames (bulk and update), each one of which is a concatenation (pl.concat) of many smaller parquet files. I then run combined = bulk.update(update, on=["date", "ticker"], how="outer") (this is stock data).

If I run combined.collect().write_parquet(out_path) I get different results than running combined.sink_parquet(out_path). The files are quite different sizes on disk, and some tickers aren't even present in the sunk version.

@sclamons
Copy link

Related to #15157?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

3 participants