Skip to content

When normalizing a load package using pyarrow, the schema seems to be ignored #2229

@neuromantik33

Description

@neuromantik33

dlt version

1.4.1

Describe the problem

At rare occasions, my dlt pipeline fails when trying to add the _dlt_load_id to a pyarrow table. This is the exception I'm getting

The above exception was caused by the following exception:
pyarrow.lib.ArrowInvalid: Schema at index 1 was different: 
id: int32 not null
user_id: int32
begin_at: timestamp[us, tz=UTC]
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32
team_id: int32
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int32 not null
user_id: int32 not null
begin_at: timestamp[us, tz=UTC] not null
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32 not null
team_id: int32 not null
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32 not null
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null

Stack Trace:
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 471, in extract
    self._extract_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 1239, in _extract_source
    load_id = extract.extract(
              ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 416, in extract
    self._extract_single_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 330, in _extract_single_source
    with self.manage_writers(load_id, source):
  File "/usr/local/lib/python3.12/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 368, in manage_writers
    self.extract_storage.close_writers(load_id)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/storage.py", line 78, in close_writers
    storage.close_writers(load_id, skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/storages/data_item_storage.py", line 85, in close_writers
    writer.close(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 176, in close
    self._flush_and_close_file(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 260, in _flush_and_close_file
    self._flush_items(allow_empty_file)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 250, in _flush_items
    self._writer.write_data(self._buffered_items)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/writers.py", line 471, in write_data
    table = concat_batches_and_tables_in_order(items)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/common/libs/pyarrow.py", line 572, in concat_batches_and_tables_in_order
    return pyarrow.concat_tables(tables, promote_options="none")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 6106, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status

However when looking at the schema.json in the load package at ./normalize/c85674d3774aa0bd/1737392430.155199/schema.json. This is what I see:

$ cat ./normalize/c85674d3774aa0bd/1737392430.155199/schema.json | jq .tables.scale_teams 
{
  "columns": {
    "id": {
      "name": "id",
      "nullable": false,
      "data_type": "bigint",
      "precision": 32
    },
    "user_id": {
      "name": "user_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "begin_at": {
      "name": "begin_at",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "created_at": {
      "name": "created_at",
      "nullable": false,
      "data_type": "timestamp",
      "precision": 6
    },
    "updated_at": {
      "name": "updated_at",
      "nullable": false,
      "data_type": "timestamp",
      "precision": 6
    },
    "scale_id": {
      "name": "scale_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "team_id": {
      "name": "team_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "comment": {
      "name": "comment",
      "nullable": true,
      "data_type": "text"
    },
    "old_feedback": {
      "name": "old_feedback",
      "nullable": true,
      "data_type": "text"
    },
    "feedback_rating": {
      "name": "feedback_rating",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "final_mark": {
      "name": "final_mark",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "truant_id": {
      "name": "truant_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "flag_id": {
      "name": "flag_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "token": {
      "name": "token",
      "nullable": true,
      "data_type": "text"
    },
    "ip": {
      "name": "ip",
      "nullable": true,
      "data_type": "text"
    },
    "internship_id": {
      "name": "internship_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "filled_at": {
      "name": "filled_at",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "_pg_lsn": {
      "name": "_pg_lsn",
      "nullable": true,
      "data_type": "bigint"
    },
    "_pg_deleted_ts": {
      "name": "_pg_deleted_ts",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "_pg_commit_ts": {
      "name": "_pg_commit_ts",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6,
      "x-bigquery-partition": true
    },
    "_pg_tx_id": {
      "name": "_pg_tx_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "_dlt_load_id": {
      "name": "_dlt_load_id",
      "data_type": "text",
      "nullable": false
    }
  },
  "write_disposition": "append",
  "file_format": "parquet",
  "name": "scale_teams",
  "resource": "scale_teams",
  "x-normalizer": {
    "seen-data": true
  }
}

Here is the diff between the 2 schemas

2,3c2,3
< user_id: int32
< begin_at: timestamp[us, tz=UTC]
---
> user_id: int32 not null
> begin_at: timestamp[us, tz=UTC] not null
6,7c6,7
< scale_id: int32
< team_id: int32
---
> scale_id: int32 not null
> team_id: int32 not null
13c13
< flag_id: int32
---
> flag_id: int32 not null

Expected behavior

I would expect that during normalization that the schema be respected. However I'm not sure this is an issue with pyarrow or dlt's uses of it.

Steps to reproduce

This is difficult to reproduce since this is using a custom source. I'll see if I can work out an example.

Operating system

Linux

Runtime environment

Kubernetes

Python version

3.11

dlt data source

pg_legacy_replication :)

dlt destination

Google BigQuery

Other deployment details

Do failed normalized jobs never get loaded when the pipeline is run later in the future? This is a bit problematic for me since I'm using a custom source which reads from a WAL and commits offsets after the load phase.

Additional information

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions