- 
                Notifications
    You must be signed in to change notification settings 
- Fork 356
Description
dlt version
1.4.1
Describe the problem
At rare occasions, my dlt pipeline fails when trying to add the _dlt_load_id to a pyarrow table. This is the exception I'm getting
The above exception was caused by the following exception:
pyarrow.lib.ArrowInvalid: Schema at index 1 was different: 
id: int32 not null
user_id: int32
begin_at: timestamp[us, tz=UTC]
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32
team_id: int32
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int32 not null
user_id: int32 not null
begin_at: timestamp[us, tz=UTC] not null
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32 not null
team_id: int32 not null
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32 not null
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
Stack Trace:
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 471, in extract
    self._extract_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 1239, in _extract_source
    load_id = extract.extract(
              ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 416, in extract
    self._extract_single_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 330, in _extract_single_source
    with self.manage_writers(load_id, source):
  File "/usr/local/lib/python3.12/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 368, in manage_writers
    self.extract_storage.close_writers(load_id)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/storage.py", line 78, in close_writers
    storage.close_writers(load_id, skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/storages/data_item_storage.py", line 85, in close_writers
    writer.close(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 176, in close
    self._flush_and_close_file(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 260, in _flush_and_close_file
    self._flush_items(allow_empty_file)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 250, in _flush_items
    self._writer.write_data(self._buffered_items)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/writers.py", line 471, in write_data
    table = concat_batches_and_tables_in_order(items)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/common/libs/pyarrow.py", line 572, in concat_batches_and_tables_in_order
    return pyarrow.concat_tables(tables, promote_options="none")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 6106, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
However when looking at the schema.json in the load package at ./normalize/c85674d3774aa0bd/1737392430.155199/schema.json. This is what I see:
$ cat ./normalize/c85674d3774aa0bd/1737392430.155199/schema.json | jq .tables.scale_teams 
{
  "columns": {
    "id": {
      "name": "id",
      "nullable": false,
      "data_type": "bigint",
      "precision": 32
    },
    "user_id": {
      "name": "user_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "begin_at": {
      "name": "begin_at",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "created_at": {
      "name": "created_at",
      "nullable": false,
      "data_type": "timestamp",
      "precision": 6
    },
    "updated_at": {
      "name": "updated_at",
      "nullable": false,
      "data_type": "timestamp",
      "precision": 6
    },
    "scale_id": {
      "name": "scale_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "team_id": {
      "name": "team_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "comment": {
      "name": "comment",
      "nullable": true,
      "data_type": "text"
    },
    "old_feedback": {
      "name": "old_feedback",
      "nullable": true,
      "data_type": "text"
    },
    "feedback_rating": {
      "name": "feedback_rating",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "final_mark": {
      "name": "final_mark",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "truant_id": {
      "name": "truant_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "flag_id": {
      "name": "flag_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "token": {
      "name": "token",
      "nullable": true,
      "data_type": "text"
    },
    "ip": {
      "name": "ip",
      "nullable": true,
      "data_type": "text"
    },
    "internship_id": {
      "name": "internship_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "filled_at": {
      "name": "filled_at",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "_pg_lsn": {
      "name": "_pg_lsn",
      "nullable": true,
      "data_type": "bigint"
    },
    "_pg_deleted_ts": {
      "name": "_pg_deleted_ts",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "_pg_commit_ts": {
      "name": "_pg_commit_ts",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6,
      "x-bigquery-partition": true
    },
    "_pg_tx_id": {
      "name": "_pg_tx_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "_dlt_load_id": {
      "name": "_dlt_load_id",
      "data_type": "text",
      "nullable": false
    }
  },
  "write_disposition": "append",
  "file_format": "parquet",
  "name": "scale_teams",
  "resource": "scale_teams",
  "x-normalizer": {
    "seen-data": true
  }
}Here is the diff between the 2 schemas
2,3c2,3
< user_id: int32
< begin_at: timestamp[us, tz=UTC]
---
> user_id: int32 not null
> begin_at: timestamp[us, tz=UTC] not null
6,7c6,7
< scale_id: int32
< team_id: int32
---
> scale_id: int32 not null
> team_id: int32 not null
13c13
< flag_id: int32
---
> flag_id: int32 not nullExpected behavior
I would expect that during normalization that the schema be respected. However I'm not sure this is an issue with pyarrow or dlt's uses of it.
Steps to reproduce
This is difficult to reproduce since this is using a custom source. I'll see if I can work out an example.
Operating system
Linux
Runtime environment
Kubernetes
Python version
3.11
dlt data source
pg_legacy_replication :)
dlt destination
Google BigQuery
Other deployment details
Do failed normalized jobs never get loaded when the pipeline is run later in the future? This is a bit problematic for me since I'm using a custom source which reads from a WAL and commits offsets after the load phase.
Additional information
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status