Skip to content

Commit

Permalink
Add additional metadata to avro output file
Browse files Browse the repository at this point in the history
  • Loading branch information
maxdebayser committed Jul 4, 2023
1 parent 011d3c0 commit fd3a7d0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
6 changes: 4 additions & 2 deletions python/pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,13 @@ class AvroOutputFile(Generic[D]):
sync_bytes: bytes
writer: Writer

def __init__(self, output_file: OutputFile, schema: Schema, schema_name: str) -> None:
def __init__(self, output_file: OutputFile, schema: Schema, schema_name: str, metadata: Dict[str, str] = EMPTY_DICT) -> None:
self.output_file = output_file
self.schema = schema
self.schema_name = schema_name
self.sync_bytes = os.urandom(SYNC_SIZE)
self.writer = construct_writer(self.schema)
self.metadata = metadata

def __enter__(self) -> AvroOutputFile[D]:
"""
Expand All @@ -255,7 +256,8 @@ def __exit__(

def _write_header(self) -> None:
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.schema, schema_name=self.schema_name))
header = AvroFileHeader(magic=MAGIC, meta={_SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}, sync=self.sync_bytes)
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes)
construct_writer(META_SCHEMA).write(self.encoder, header)

def write_block(self, objects: List[D]) -> None:
Expand Down
9 changes: 8 additions & 1 deletion python/tests/avro/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,23 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None:
data_file=data_file,
)

additional_metadata = {"foo": "bar"}

with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/manifest_entry.avro"

with avro.AvroOutputFile[ManifestEntry](
PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry"
PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry", additional_metadata
) as out:
out.write_block([entry])

with open(tmp_avro_file, "rb") as fo:
r = reader(fo=fo)

for k, v in additional_metadata.items():
assert k in r.metadata
assert v == r.metadata[k]

it = iter(r)

fa_entry = next(it)
Expand Down

0 comments on commit fd3a7d0

Please sign in to comment.