Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge execution time grows exponetially with the number of column #2107

Closed
hmilbi opened this issue Jan 22, 2024 · 19 comments
Closed

Merge execution time grows exponetially with the number of column #2107

hmilbi opened this issue Jan 22, 2024 · 19 comments
Labels
bug Something isn't working

Comments

@hmilbi
Copy link

hmilbi commented Jan 22, 2024

Environment

Python 3.11

Delta-rs version:
0.15.1

Environment:

  • OS: Windows 11

Bug

What happened:
I have several tables that have more than 100 columns. Mergin into these takes too much time.

What you expected to happen:
Merging 1 row with 100 columns into a table with 1 row and 100 columns should not take 22 second.

How to reproduce it:

import deltalake as dl
import polars as pl
import shutil
import timeit

times = []

for i in range(15):
	shutil.rmtree("c:/temp/test", ignore_errors=True)

	n = (i+1) * 10

	df = pl.DataFrame({'col_' + str(i): [i] for i in range(n)}).to_arrow()

	dl.write_deltalake("c:/temp/test", df, mode="overwrite")

	table = dl.DeltaTable("c:/temp/test")

	df = pl.DataFrame({'col_' + str(i): [i] for i in range(n)}).to_arrow()

	execution_time = timeit.timeit(
		lambda: table.merge(df, "s.col_0 = t.col_0", "s", "t").when_matched_update_all().when_not_matched_insert_all().execute(),
		number=1
	)

	print(f"n={n:3d} : {execution_time:.2f} seconds")

	times.append(execution_time)

# print chart
max_time = max(times)
for i, t in enumerate(times):
	bar = '|' * int((t / max_time) * 80)
	print(f'n={i*10:3d} : {bar}')

Output

n= 10 : 0.13 seconds
n= 20 : 0.43 seconds
n= 30 : 1.00 seconds
n= 40 : 1.91 seconds
n= 50 : 3.27 seconds
n= 60 : 5.16 seconds
n= 70 : 7.94 seconds
n= 80 : 11.55 seconds
n= 90 : 16.16 seconds
n=100 : 22.09 seconds

n= 10 :
n= 20 : |
n= 30 : |||
n= 40 : ||||||
n= 50 : |||||||||||
n= 60 : ||||||||||||||||||
n= 70 : ||||||||||||||||||||||||||||
n= 80 : |||||||||||||||||||||||||||||||||||||||||
n= 90 : ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
n=100 : ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

More details:

@hmilbi hmilbi added the bug Something isn't working label Jan 22, 2024
@Blajda
Copy link
Collaborator

Blajda commented Jan 22, 2024

Thanks @hmilbi for reporting the issue.
As you observed there is definitely some exponential time growth that occurs when building the merge plan.
If you kept the number of column fixed N=100 and changed the number of rows (M) being inserted is there also a case of exponential growth?
This would help narrow the root cause to the planner.

@hmilbi
Copy link
Author

hmilbi commented Jan 23, 2024

Hey @Blajda,

the number of rows does not seem to matter

100000 x 100 : 23.10 seconds
200000 x 100 : 22.60 seconds
300000 x 100 : 22.87 seconds
400000 x 100 : 23.13 seconds
500000 x 100 : 23.37 seconds
600000 x 100 : 23.92 seconds
700000 x 100 : 24.18 seconds
800000 x 100 : 24.67 seconds
900000 x 100 : 24.90 seconds
1000000 x 100 : 25.85 seconds

@ion-elgreco
Copy link
Collaborator

@Blajda maybe this is related: apache/datafusion#7698?

@Blajda
Copy link
Collaborator

Blajda commented Jan 26, 2024

Yes the issue in the DF repo align with the observations here. If I get some free time I think it would still be worth checking if any optimizations can be made on our end.

@pfwnicks
Copy link

pfwnicks commented Feb 8, 2024

I can confirm this as well, a drastic slowdown in merge times using azure blob storage:


       write_times  merge_times  write_times_long  merge_times_long
1      1.315523     0.319775          1.573440          0.461228
2      1.200063     0.322942          1.595484          0.397051
3      1.232149     0.363974          1.656145          0.424877
4      1.265115     0.366493          1.699405          0.423009
5      1.223527     0.363278          1.689739          0.434520
6      1.287108     0.399268          1.676032          0.455089
7      1.259679     0.422059          1.647931          0.496785
8      1.264607     0.440369          1.635233          0.486893
9      1.312716     0.454957          1.739328          0.519054
10     1.343817     0.498257          1.723349          0.497675
11     1.601838     0.546965          1.692456          0.510667
12     1.357384     0.642838          1.770617          0.540901
13     1.391560     0.589546          1.744749          0.562739
14     1.352916     0.647591          1.740748          0.560324
15     1.686491     0.666263          1.728198          0.638664
16     1.344106     0.710800          1.838365          0.593921
17     1.462181     0.816697          1.862849          0.609602
18     1.384713     0.821440          1.856153          0.610648
19     1.363077     0.884522          1.856833          0.625019
20     1.473088     0.938717          2.330158          0.667030
21     1.366137     1.001634          1.856153          0.698924
22     1.383928     1.046432          1.812380          0.708997
23     1.470542     1.183623          1.920206          0.675724
24     1.443298     1.247846          1.881745          0.703955
25     1.436146     1.302510          1.923425          0.705724
26     1.528344     1.367521          1.888656          0.710758
27     1.520355     1.448664          2.229822          0.749015
28     1.496258     1.557173          1.872347          0.776579
29     1.421938     1.643564          1.923053          0.826197
30     1.434297     1.753025          1.937335          0.770912
31     1.533521     1.865498          1.934067          0.797195
32     1.522463     1.984033          1.960573          0.758156
33     1.464018     2.095758          1.989295          0.823269
34     1.518422     2.220998          1.974942          0.830530
35     1.512976     2.396796          1.975505          0.842135
36     1.551044     2.752355          1.975289          0.827682
37     1.590812     2.685907          2.031577          0.827763
38     1.570815     2.788936          1.941982          0.873138
39     1.549042     3.064402          2.050466          0.885497
40     1.589878     3.175862          1.972667          0.877674
41     1.564959     3.601097          2.639654          0.917430
42     1.534066     3.571279          2.005958          0.930729
43     1.625666     3.772152          2.036915          0.901131
44     1.595741     3.951559          1.982727          0.928625
45     2.159082     4.306948          2.071615          0.940605
46     1.963092     4.384528          2.122818          1.056828
47     1.637023     4.607500          2.207811          0.938387
48     1.639931     4.869770          2.184478          1.018744
49     1.635735     5.126989          2.273826          1.024544
50     2.047995     5.372921          2.201243          0.973529
51     1.651135     5.670030          2.238150          0.989654
52     1.690923     5.941398          2.225132          0.992352
53     1.692163     6.250862          2.213540          1.013054
54     1.755166     6.577121          2.300071          1.024944
55     1.683691     6.928820          2.369658          1.106561
56     1.675477     7.206279          2.359760          1.035895
57     1.679940     7.579419          2.223100          1.080192
58     1.660199     7.955031          2.372170          0.910749
59     1.681843     8.376170          2.272307          1.275752

the even more interesting thing is that if you just melt your column instead of putting them wide then it is more efficient in terms of mergeing but not writing.

so in the first two columns merge and write are just classic tabular style:

df_og = <your_df_for_testing>
df = df_og[df_og.columns[:cur_num]]

and where the merge and write long come from this operation:

long_df = pd.melt(df, id_vars=["index"], var_name="metric", value_name="value")

so the additional data affects the write process but not nearly as much on the update and merge:

where your normal merge looks like this:

dt = <your_delta_table_for_testing>

pred_str = """ target.index = source.index """
dt.merge(source=df_dub, predicate=pred_str_long, source_alias="source", target_alias="target" ).when_matched_update_all().execute()

vs the long merge:

pred_str_long = """ target.index = source.index and target.metric = source.metric """
dt.merge(source=df_dub, predicate=pred_str_long, source_alias="source", target_alias="target" ).when_matched_update_all().execute()

I would expect somewhat more similar to the behaviour of the long one for both, if not even a bit more efficient when running tabular since the matching will be easier as you are matching on fewer columns.

@pfwnicks
Copy link

pfwnicks commented Feb 8, 2024

any updates on datafusions progress on this or if there are some ways to improve this in delta-rs?

if keeping data in pure tabular form (many columns, single index) it very quickly becomes more effecient to just do the operation in python instead of using .merge + .when_matched_update_all

and there some other performance drawbacks of using the melted versions of these datasets

@ion-elgreco
Copy link
Collaborator

@pfwnicks for that you need to follow the DataFusion issue I mentioned above. Most of the slowness is the plan building which I don't think much can be done there to improve it, and is caused by how DataFusion works at the moment

@pfwnicks
Copy link

pfwnicks commented Feb 9, 2024

@ion-elgreco Thanks for the reply, do we know which part of it specifically is affecting it, is it possible that some of the subtasks from below will help?:

apache/datafusion#5637 (comment)

Also how is it looking in terms of implementing these changes from datafusion when they are finished?

@ldacey
Copy link
Contributor

ldacey commented Feb 24, 2024

Ran into this as well. 178 columns turned my insert merge statement into a 288 second task even if the table was completely empty (first write) and only had 75,000 rows. Appending or overwriting the partition takes < 2 seconds.

@Blajda
Copy link
Collaborator

Blajda commented Feb 27, 2024

There should be some improvements with #2222 merged in.

@t1g0rz
Copy link
Contributor

t1g0rz commented Mar 13, 2024

Is that pull request stalled? There have been no improvements in 0.16.0 as I can see.

@ion-elgreco
Copy link
Collaborator

@t1g0rz those changes are in 0.16.0.

If you don't see an impact, could you then share some numbers before and after?

@hmilbi
Copy link
Author

hmilbi commented Mar 13, 2024

Compared to my first test, it is now much faster:

n= 10 : 0.03 seconds
n= 20 : 0.06 seconds
n= 30 : 0.09 seconds
n= 40 : 0.13 seconds
n= 50 : 0.20 seconds
n= 60 : 0.22 seconds
n= 70 : 0.28 seconds
n= 80 : 0.34 seconds
n= 90 : 0.41 seconds
n=100 : 0.50 seconds

n= 10 : |||||
n= 20 : |||||||||
n= 30 : ||||||||||||||
n= 40 : ||||||||||||||||||||
n= 50 : ||||||||||||||||||||||||||||||||
n= 60 : ||||||||||||||||||||||||||||||||||
n= 70 : ||||||||||||||||||||||||||||||||||||||||||||
n= 80 : |||||||||||||||||||||||||||||||||||||||||||||||||||||||
n= 90 : |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
n=100 : ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

@ion-elgreco
Copy link
Collaborator

@emcake amazing work 🤗🎉

@t1g0rz
Copy link
Contributor

t1g0rz commented Mar 13, 2024

@ion-elgreco
No, it's probably another issue. I'm trying to merge just 5 rows which are already present in the Delta Lake. And each time elapsed increases by 4x. Also, please pay attention to num_target_rows_updated. After the third time, I cannot execute dt.to_pandas(); it crashes the VM due to out-of-memory (OOM) issues. It's an r6a.2xlarge instance with 8 vCPUs and 64GiB of RAM.

# CPU times: user 39.8 ms, sys: 25.5 ms, total: 65.3 ms
# Wall time: 455 ms

N_COL = 300
N_ROWS = 200_000

storage_options = {
                "AWS_S3_LOCKING_PROVIDER": "dynamodb",
                "DELTA_DYNAMO_TABLE_NAME": "delta_log_dev",
                ...}

dt = DeltaTable.create("s3a://lake/test_rand/",
                  schema=pa.schema(
                      [
                          pa.field("feature", pa.string(), nullable=True),
                          pa.field("ts", pa.timestamp("us"), nullable=True)
                      ] + [pa.field(f"col_{i}", pa.float64(), nullable=True) for i in range(N_COL)]
                      ),
                    mode="overwrite",
                    partition_by=["feature"],
                    storage_options=storage_options
                  )


# CPU times: user 2min 35s, sys: 32.8 s, total: 3min 8s
# Wall time: 1min 34s
for i in range(10):
    df = pd.DataFrame(np.round(np.random.random((N_ROWS, N_COL)),3))
    df.columns = [f"col_{i}" for i in df.columns]
    df.insert(0, "feature", f"feature_{i}")
    df.insert(1, "ts", pd.date_range(start="2021-01-01", periods=N_ROWS, freq="s"))

    dt.merge(df,
         "s.feature = t.feature and s.ts = t.ts",
         source_alias='s',
         target_alias='t').when_matched_update_all().when_not_matched_insert_all().execute()


# CPU times: user 5.48 s, sys: 7.66 s, total: 13.1 s
# Wall time: 3.64 s
dt.to_pandas()

# CPU times: user 587 ms, sys: 734 ms, total: 1.32 s
# Wall time: 1.05 s
x = dt.to_pandas(filters=[('feature', '=', 'feature_1')]).head()

# FIRST MERGE
# CPU times: user 30.5 s, sys: 5.83 s, total: 36.3 s
# Wall time: 31.4 s
dt.merge(
    x,
    predicate="s.feature = t.feature and s.ts = s.ts",
    source_alias='s',
    target_alias='t',
).when_matched_update_all().when_not_matched_insert_all().execute()

"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 1000000,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 1000000,
 'num_target_files_added': 2,
 'num_target_files_removed': 1,
 'execution_time_ms': 30941,
 'scan_time_ms': 0,
 'rewrite_time_ms': 27918}
"""

# SECOND MERGE
# CPU times: user 2min 6s, sys: 8.72 s, total: 2min 15s
# Wall time: 2min 3s
dt.merge(
    x,
    predicate="s.feature = t.feature and s.ts = s.ts",
    source_alias='s',
    target_alias='t',
).when_matched_update_all().when_not_matched_insert_all().execute()

"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 5000000,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 5000000,
 'num_target_files_added': 8,
 'num_target_files_removed': 2,
 'execution_time_ms': 122965,
 'scan_time_ms': 0,
 'rewrite_time_ms': 120026}
 """"


# THIRD MERGE
# CPU times: user 10min 26s, sys: 26 s, total: 10min 52s
# Wall time: 8min
dt.merge(
    x,
    predicate="s.feature = t.feature and s.ts = s.ts",
    source_alias='s',
    target_alias='t',
).when_matched_update_all().when_not_matched_insert_all().execute()

"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 25000000,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 25000000,
 'num_target_files_added': 36,
 'num_target_files_removed': 8,
 'execution_time_ms': 477999,
 'scan_time_ms': 0,
 'rewrite_time_ms': 475107}
"""

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Mar 13, 2024

@t1g0rz that's likely because you're doing s.ts = s.ts, instead of s.ts = t.ts in the last 3 merges

@t1g0rz
Copy link
Contributor

t1g0rz commented Mar 13, 2024

@ion-elgreco
That was stupid, sorry(

@ion-elgreco
Copy link
Collaborator

@t1g0rz no worries!!

@ion-elgreco
Copy link
Collaborator

@Blajda shall we close this issue then, seems that performance is now quite good. Any upstream changes to datafusion will also improve it even more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants