Skip to content

Add Avro compression #1976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pyiceberg/avro/codecs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,27 @@

from __future__ import annotations

from typing import Dict, Optional, Type
from typing import Dict, Literal, Optional, Type

from typing_extensions import TypeAlias

from pyiceberg.avro.codecs.bzip2 import BZip2Codec
from pyiceberg.avro.codecs.codec import Codec
from pyiceberg.avro.codecs.deflate import DeflateCodec
from pyiceberg.avro.codecs.snappy_codec import SnappyCodec
from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec

KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = {
AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"]

AVRO_CODEC_KEY = "avro.codec"

KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = {
"null": None,
"bzip2": BZip2Codec,
"snappy": SnappyCodec,
"zstandard": ZStandardCodec,
"deflate": DeflateCodec,
}

# Map to convert the naming from Iceberg to Avro
CODEC_MAPPING_ICEBERG_TO_AVRO: Dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"}
47 changes: 40 additions & 7 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
TypeVar,
)

from pyiceberg.avro.codecs import KNOWN_CODECS
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
from pyiceberg.avro.codecs.codec import Codec
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
from pyiceberg.avro.encoder import BinaryEncoder
Expand Down Expand Up @@ -69,7 +69,6 @@
NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True),
)

_CODEC_KEY = "avro.codec"
_SCHEMA_KEY = "avro.schema"


Expand All @@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]:
In the case of a null codec, we return a None indicating that we
don't need to compress/decompress.
"""
codec_name = self.meta.get(_CODEC_KEY, "null")
from pyiceberg.table import TableProperties

codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
if codec_name not in KNOWN_CODECS:
raise ValueError(f"Unsupported codec: {codec_name}")

return KNOWN_CODECS[codec_name]
return KNOWN_CODECS[codec_name] # type: ignore

def get_schema(self) -> Schema:
if _SCHEMA_KEY in self.meta:
Expand Down Expand Up @@ -276,11 +277,36 @@ def __exit__(
self.output_stream.close()

def _write_header(self) -> None:
from pyiceberg.table import TableProperties

codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
codec_name = avro_codec_name

json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}

meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec_name}
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
construct_writer(META_SCHEMA).write(self.encoder, header)

def compression_codec(self) -> Optional[Type[Codec]]:
"""Get the file's compression codec algorithm from the file's metadata.

In the case of a null codec, we return a None indicating that we
don't need to compress/decompress.
"""
from pyiceberg.table import TableProperties

codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)

if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
codec_name = avro_codec_name

if codec_name not in KNOWN_CODECS:
raise ValueError(f"Unsupported codec: {codec_name}")

return KNOWN_CODECS[codec_name] # type: ignore

def write_block(self, objects: List[D]) -> None:
in_memory = io.BytesIO()
block_content_encoder = BinaryEncoder(output_stream=in_memory)
Expand All @@ -289,6 +315,13 @@ def write_block(self, objects: List[D]) -> None:
block_content = in_memory.getvalue()

self.encoder.write_int(len(objects))
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)

if codec := self.compression_codec():
content, content_length = codec.compress(block_content)
self.encoder.write_int(content_length)
self.encoder.write(content)
else:
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)

self.encoder.write(self.sync_bytes)
76 changes: 58 additions & 18 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from cachetools.keys import hashkey
from pydantic_core import to_json

from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
from pyiceberg.avro.file import AvroFile, AvroOutputFile
from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ValidationError
Expand Down Expand Up @@ -798,9 +799,16 @@ class ManifestWriter(ABC):
_deleted_rows: int
_min_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry
_compression: AvroCompressionCodec

def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
def __init__(
self,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
) -> None:
self.closed = False
self._spec = spec
self._schema = schema
Expand All @@ -815,6 +823,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
self._deleted_rows = 0
self._min_sequence_number = None
self._partitions = []
self._compression = avro_compression

def __enter__(self) -> ManifestWriter:
"""Open the writer."""
Expand Down Expand Up @@ -850,6 +859,7 @@ def _meta(self) -> Dict[str, str]:
"partition-spec": to_json(self._spec.fields).decode("utf-8"),
"partition-spec-id": str(self._spec.spec_id),
"format-version": str(self.version),
AVRO_CODEC_KEY: self._compression,
}

def _with_partition(self, format_version: TableVersion) -> Schema:
Expand Down Expand Up @@ -961,13 +971,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:


class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
super().__init__(
spec,
schema,
output_file,
snapshot_id,
)
def __init__(
self,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)

def content(self) -> ManifestContent:
return ManifestContent.DATA
Expand All @@ -981,8 +993,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:


class ManifestWriterV2(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
super().__init__(spec, schema, output_file, snapshot_id)
def __init__(
self,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)

def content(self) -> ManifestContent:
return ManifestContent.DATA
Expand All @@ -1008,12 +1027,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:


def write_manifest(
format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int
format_version: TableVersion,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
) -> ManifestWriter:
if format_version == 1:
return ManifestWriterV1(spec, schema, output_file, snapshot_id)
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id)
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")

Expand Down Expand Up @@ -1063,14 +1087,21 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite


class ManifestListWriterV1(ManifestListWriter):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
def __init__(
self,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: Optional[int],
compression: AvroCompressionCodec,
):
super().__init__(
format_version=1,
output_file=output_file,
meta={
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"format-version": "1",
AVRO_CODEC_KEY: compression,
},
)

Expand All @@ -1084,7 +1115,14 @@ class ManifestListWriterV2(ManifestListWriter):
_commit_snapshot_id: int
_sequence_number: int

def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
def __init__(
self,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: Optional[int],
sequence_number: int,
compression: AvroCompressionCodec,
):
super().__init__(
format_version=2,
output_file=output_file,
Expand All @@ -1093,6 +1131,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"format-version": "2",
AVRO_CODEC_KEY: compression,
},
)
self._commit_snapshot_id = snapshot_id
Expand Down Expand Up @@ -1127,12 +1166,13 @@ def write_manifest_list(
snapshot_id: int,
parent_snapshot_id: Optional[int],
sequence_number: Optional[int],
avro_compression: AvroCompressionCodec,
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id)
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
elif format_version == 2:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")
3 changes: 3 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ class TableProperties:
WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB

WRITE_AVRO_COMPRESSION = "write.avro.compression-codec"
WRITE_AVRO_COMPRESSION_DEFAULT = "gzip"

DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"

Expand Down
14 changes: 14 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from sortedcontainers import SortedList

from pyiceberg.avro.codecs import AvroCompressionCodec
from pyiceberg.expressions import (
AlwaysFalse,
BooleanExpression,
Expand Down Expand Up @@ -104,6 +105,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
_compression: AvroCompressionCodec

def __init__(
self,
Expand All @@ -126,6 +128,11 @@ def __init__(
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
from pyiceberg.table import TableProperties

self._compression = self._transaction.table_metadata.properties.get( # type: ignore
TableProperties.WRITE_AVRO_COMPRESSION, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT
)
Comment on lines +133 to +135
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think little things like table property parsing has gotten us unexpectedly in the past - would it be possible to add a few cases that demonstrate:

  • that the newly written manifests now have the new default compression when a new snapshot is committed?
  • and that newly written manifests respect a compression codec value when the property is set, when a new snapshot is committed?

I think this would help us add coverage for the new parameter both in property parsing and the simple logic here in update.snapshots module in each functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point @sungwy, and indeed that caught us in the past. I've added the test that you suggested, let me know what you think 👍


def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._added_data_files.append(data_file)
Expand Down Expand Up @@ -154,6 +161,7 @@ def _write_added_manifest() -> List[ManifestFile]:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
for data_file in self._added_data_files:
writer.add(
Expand Down Expand Up @@ -184,6 +192,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
for entry in entries:
writer.add_entry(entry)
Expand Down Expand Up @@ -249,12 +258,14 @@ def _commit(self) -> UpdatesAndRequirements:
)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)

with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
avro_compression=self._compression,
) as writer:
writer.add_manifests(new_manifests)

Expand Down Expand Up @@ -291,6 +302,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
)

def new_manifest_output(self) -> OutputFile:
Expand Down Expand Up @@ -416,6 +428,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
Expand Down Expand Up @@ -550,6 +563,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
[
writer.add_entry(
Expand Down
Loading