-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Closed
Closed
Copy link
Description
What happened?
Summary:
Using WriteToParquet with a PyArrow schema where all fields are explicitly set as nullable fails with a KeyError if any field is missing.
File "/site-packages/apache_beam/io/parquetio.py", line 119, in process
self._buffer[i].append(row[n])
~~~^^^
KeyError: "email [while running 'Write to Parquet/ParDo(_RowDictionariesToArrowTable)']"
Expected Behavior
If a field is marked as nullable=True in the schema and is missing in the input dictionary, Beam should write a null value to the Parquet file instead of throwing a KeyError.
Reproducible Code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.parquetio import WriteToParquet
import pyarrow as pa
import os
# Define PyArrow schema with all fields nullable
schema = pa.schema([
pa.field("id", pa.int64(), nullable=True),
pa.field("name", pa.string(), nullable=True),
pa.field("age", pa.int64(), nullable=True),
pa.field("email", pa.string(), nullable=True),
])
# Sample data
data = [
{'id': 1, 'name': 'Alice', 'age': 30, },
{'id': 2, 'name': 'Bob', 'age': 25, 'email': 'bob@example.com'},
{'id': 3, 'name': 'Charlie', 'age': None, 'email': None},
]
def run():
output_prefix = 'output/data'
output_dir = os.path.dirname(output_prefix)
os.makedirs(output_dir)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "Create Data" >> beam.Create(data)
| "Write to Parquet" >> WriteToParquet(
file_path_prefix=output_prefix,
schema=schema,
file_name_suffix='.parquet'
)
)
if __name__ == "__main__":
run()
Questions
Is this expected behavior, or should WriteToParquet default to null for missing fields if the schema allows nulls?
Environment
Apache Beam version: 2.66.0
Python version: 3.13
Runner: DirectRunner
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner