Skip to content

internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec partition count mismatch 1!=2 #2188

@t1g0rz

Description

@t1g0rz

Environment

Delta-rs version:
rust-v0.17.0

Binding:
python
deltalake-0.15.3

Environment:

  • Cloud provider: AWS
  • OS: 22.04.1-Ubuntu
  • Other:
    • RAM: 949Mi
    • 1 CPU
    • t2.micro
    • ami-03f4878755434977f

Bug

What happened:

When I run a simple script a few times (twice, in my case) to simulate updating and partitioning, I encounter the following error.

Traceback (most recent call last):
  File "/home/ubuntu/test.py", line 52, in <module>
    er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/deltalake/table.py", line 1713, in execute
    metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec, partition count mismatch 1!=2,consider using RepartitionExec.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

What you expected to happen:
Table updated w/o any issues

How to reproduce it:
Here is the script. You can run it on t2.micto AWS

from deltalake import DeltaTable, write_deltalake
from sklearn.datasets import load_iris
import pyarrow as pa
import pandas as pd
from time import time, sleep
import boto3


session = boto3.session.Session()
credentials = session.get_credentials()

S3_BUCKET_RAW_DATA = <S3_BUCKET>
DELTA_TABLE_NAME = 'iris'

storage_options={
    'AWS_S3_LOCKING_PROVIDER': 'dynamodb',
    'DELTA_DYNAMO_TABLE_NAME': 'delta_log',
    'AWS_REGION': 'us-east-1',
    'DELTA_DYNAMO_REGION': 'us-east-1',
    'AWS_ACCESS_KEY_ID': credentials.access_key,
    'AWS_SECRET_ACCESS_KEY': credentials.secret_key,
}

if credentials.token:
    storage_options['AWS_SESSION_TOKEN'] = credentials.token

df = load_iris(as_frame=True)['data']
df.columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
df.reset_index(inplace=True, names=['idx'])

schema = pa.schema([
    pa.field("idx", pa.int64()),
    pa.field("sepal_length", pa.float64()),
    pa.field("sepal_width", pa.float64()),
    pa.field("petal_length", pa.float64()),
    pa.field("petal_width", pa.float64()),
    pa.field("updated_at", pa.float64())
])

DELTA_PATH = f"s3a://{S3_BUCKET_RAW_DATA}/bronze/{DELTA_TABLE_NAME}"

dt = DeltaTable.create(DELTA_PATH, schema=schema, mode="ignore",
                       partition_by=["updated_at"], storage_options=storage_options)

for i in range(50, 151, 50):
    tmp_df = df.iloc[:i].copy()
    tmp_df['updated_at'] = time()
    print(i)
    dt = DeltaTable(DELTA_PATH, storage_options=storage_options)
    er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
    print(er)
    sleep(2)

More details:

How it looked:

ubuntu@ubuntu1:~$ python3 test.py 
50
{'num_source_rows': 50, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 50, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 168, 'scan_time_ms': 0, 'rewrite_time_ms': 106}
100
{'num_source_rows': 100, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 50, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 100, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': 219, 'scan_time_ms': 0, 'rewrite_time_ms': 105}
150
{'num_source_rows': 150, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 100, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 150, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': 173, 'scan_time_ms': 0, 'rewrite_time_ms': 65}

ubuntu@ubuntu1:~$ python3 test3.py 
50
{'num_source_rows': 50, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 50, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 100, 'num_output_rows': 150, 'num_target_files_added': 2, 'num_target_files_removed': 1, 'execution_time_ms': 196, 'scan_time_ms': 0, 'rewrite_time_ms': 104}
100
Traceback (most recent call last):
  File "/home/ubuntu/test3.py", line 52, in <module>
    er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/deltalake/table.py", line 1713, in execute
    metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec, partition count mismatch 1!=2,consider using RepartitionExec.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

But if I try to update the same delta table from my laptop there are no errors at all.

Memory consumption from ubuntu VM:

Top AVERAGE memory consumption, by line:
(1)    43:    10 MB                                                                                                                                          
Top PEAK memory consumption, by line:
(1)     2:    30 MB                                                                                                                                          
(2)     1:    20 MB                                                                                                                                          
(3)    43:    10 MB                                                                                                                                          
(4)     6:    10 MB                            

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingwontfixThis will not be worked on

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions