-
Notifications
You must be signed in to change notification settings - Fork 545
Closed
Labels
bugSomething isn't workingSomething isn't workingwontfixThis will not be worked onThis will not be worked on
Description
Environment
Delta-rs version:
rust-v0.17.0
Binding:
python
deltalake-0.15.3
Environment:
- Cloud provider: AWS
- OS: 22.04.1-Ubuntu
- Other:
- RAM: 949Mi
- 1 CPU
- t2.micro
- ami-03f4878755434977f
Bug
What happened:
When I run a simple script a few times (twice, in my case) to simulate updating and partitioning, I encounter the following error.
Traceback (most recent call last):
File "/home/ubuntu/test.py", line 52, in <module>
er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
File "/home/ubuntu/.local/lib/python3.10/site-packages/deltalake/table.py", line 1713, in execute
metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec, partition count mismatch 1!=2,consider using RepartitionExec.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
What you expected to happen:
Table updated w/o any issues
How to reproduce it:
Here is the script. You can run it on t2.micto AWS
from deltalake import DeltaTable, write_deltalake
from sklearn.datasets import load_iris
import pyarrow as pa
import pandas as pd
from time import time, sleep
import boto3
session = boto3.session.Session()
credentials = session.get_credentials()
S3_BUCKET_RAW_DATA = <S3_BUCKET>
DELTA_TABLE_NAME = 'iris'
storage_options={
'AWS_S3_LOCKING_PROVIDER': 'dynamodb',
'DELTA_DYNAMO_TABLE_NAME': 'delta_log',
'AWS_REGION': 'us-east-1',
'DELTA_DYNAMO_REGION': 'us-east-1',
'AWS_ACCESS_KEY_ID': credentials.access_key,
'AWS_SECRET_ACCESS_KEY': credentials.secret_key,
}
if credentials.token:
storage_options['AWS_SESSION_TOKEN'] = credentials.token
df = load_iris(as_frame=True)['data']
df.columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
df.reset_index(inplace=True, names=['idx'])
schema = pa.schema([
pa.field("idx", pa.int64()),
pa.field("sepal_length", pa.float64()),
pa.field("sepal_width", pa.float64()),
pa.field("petal_length", pa.float64()),
pa.field("petal_width", pa.float64()),
pa.field("updated_at", pa.float64())
])
DELTA_PATH = f"s3a://{S3_BUCKET_RAW_DATA}/bronze/{DELTA_TABLE_NAME}"
dt = DeltaTable.create(DELTA_PATH, schema=schema, mode="ignore",
partition_by=["updated_at"], storage_options=storage_options)
for i in range(50, 151, 50):
tmp_df = df.iloc[:i].copy()
tmp_df['updated_at'] = time()
print(i)
dt = DeltaTable(DELTA_PATH, storage_options=storage_options)
er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
print(er)
sleep(2)More details:
How it looked:
ubuntu@ubuntu1:~$ python3 test.py
50
{'num_source_rows': 50, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 50, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 168, 'scan_time_ms': 0, 'rewrite_time_ms': 106}
100
{'num_source_rows': 100, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 50, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 100, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': 219, 'scan_time_ms': 0, 'rewrite_time_ms': 105}
150
{'num_source_rows': 150, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 100, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 150, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': 173, 'scan_time_ms': 0, 'rewrite_time_ms': 65}
ubuntu@ubuntu1:~$ python3 test3.py
50
{'num_source_rows': 50, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 50, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 100, 'num_output_rows': 150, 'num_target_files_added': 2, 'num_target_files_removed': 1, 'execution_time_ms': 196, 'scan_time_ms': 0, 'rewrite_time_ms': 104}
100
Traceback (most recent call last):
File "/home/ubuntu/test3.py", line 52, in <module>
er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
File "/home/ubuntu/.local/lib/python3.10/site-packages/deltalake/table.py", line 1713, in execute
metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec, partition count mismatch 1!=2,consider using RepartitionExec.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
But if I try to update the same delta table from my laptop there are no errors at all.
Memory consumption from ubuntu VM:
Top AVERAGE memory consumption, by line:
(1) 43: 10 MB
Top PEAK memory consumption, by line:
(1) 2: 30 MB
(2) 1: 20 MB
(3) 43: 10 MB
(4) 6: 10 MB
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingwontfixThis will not be worked onThis will not be worked on