-
Notifications
You must be signed in to change notification settings - Fork 302
Add Avro compression #1976
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Avro compression #1976
Changes from all commits
8befdbd
9fb30da
20a138e
7615c87
518a76d
4f529e5
5030d49
4c3f22d
4668e68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
|
||
from sortedcontainers import SortedList | ||
|
||
from pyiceberg.avro.codecs import AvroCompressionCodec | ||
from pyiceberg.expressions import ( | ||
AlwaysFalse, | ||
BooleanExpression, | ||
|
@@ -104,6 +105,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): | |
_added_data_files: List[DataFile] | ||
_manifest_num_counter: itertools.count[int] | ||
_deleted_data_files: Set[DataFile] | ||
_compression: AvroCompressionCodec | ||
|
||
def __init__( | ||
self, | ||
|
@@ -126,6 +128,11 @@ def __init__( | |
self._deleted_data_files = set() | ||
self.snapshot_properties = snapshot_properties | ||
self._manifest_num_counter = itertools.count(0) | ||
from pyiceberg.table import TableProperties | ||
|
||
self._compression = self._transaction.table_metadata.properties.get( # type: ignore | ||
TableProperties.WRITE_AVRO_COMPRESSION, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT | ||
) | ||
Comment on lines
+133
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think little things like table property parsing has gotten us unexpectedly in the past - would it be possible to add a few cases that demonstrate:
I think this would help us add coverage for the new parameter both in property parsing and the simple logic here in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good point @sungwy, and indeed that caught us in the past. I've added the test that you suggested, let me know what you think 👍 |
||
|
||
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: | ||
self._added_data_files.append(data_file) | ||
|
@@ -154,6 +161,7 @@ def _write_added_manifest() -> List[ManifestFile]: | |
schema=self._transaction.table_metadata.schema(), | ||
output_file=self.new_manifest_output(), | ||
snapshot_id=self._snapshot_id, | ||
avro_compression=self._compression, | ||
) as writer: | ||
for data_file in self._added_data_files: | ||
writer.add( | ||
|
@@ -184,6 +192,7 @@ def _write_delete_manifest() -> List[ManifestFile]: | |
schema=self._transaction.table_metadata.schema(), | ||
output_file=self.new_manifest_output(), | ||
snapshot_id=self._snapshot_id, | ||
avro_compression=self._compression, | ||
) as writer: | ||
for entry in entries: | ||
writer.add_entry(entry) | ||
|
@@ -249,12 +258,14 @@ def _commit(self) -> UpdatesAndRequirements: | |
) | ||
location_provider = self._transaction._table.location_provider() | ||
manifest_list_file_path = location_provider.new_metadata_location(file_name) | ||
|
||
with write_manifest_list( | ||
format_version=self._transaction.table_metadata.format_version, | ||
output_file=self._io.new_output(manifest_list_file_path), | ||
snapshot_id=self._snapshot_id, | ||
parent_snapshot_id=self._parent_snapshot_id, | ||
sequence_number=next_sequence_number, | ||
avro_compression=self._compression, | ||
) as writer: | ||
writer.add_manifests(new_manifests) | ||
|
||
|
@@ -291,6 +302,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: | |
schema=self._transaction.table_metadata.schema(), | ||
output_file=self.new_manifest_output(), | ||
snapshot_id=self._snapshot_id, | ||
avro_compression=self._compression, | ||
) | ||
|
||
def new_manifest_output(self) -> OutputFile: | ||
|
@@ -416,6 +428,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> | |
schema=self._transaction.table_metadata.schema(), | ||
output_file=self.new_manifest_output(), | ||
snapshot_id=self._snapshot_id, | ||
avro_compression=self._compression, | ||
) as writer: | ||
for existing_entry in existing_entries: | ||
writer.add_entry(existing_entry) | ||
|
@@ -550,6 +563,7 @@ def _existing_manifests(self) -> List[ManifestFile]: | |
schema=self._transaction.table_metadata.schema(), | ||
output_file=self.new_manifest_output(), | ||
snapshot_id=self._snapshot_id, | ||
avro_compression=self._compression, | ||
) as writer: | ||
[ | ||
writer.add_entry( | ||
|
Uh oh!
There was an error while loading. Please reload this page.