[Core] Extend Ray Data with IcebergDataSink for Distributed Writes to Iceberg Tables #49032
Description
Description
Currently, Ray supports distributed reads from Iceberg tables but lacks support for distributed writes. To enhance the functionality of Ray in handling Iceberg tables, it is proposed to extend the datasink module within Ray Data by introducing an IcebergDataSink. This would allow users to perform distributed writes on Iceberg tables, significantly improving the utility and flexibility of Ray in big data processing workflows.
The current limitation of Ray's Iceberg table integration only supporting read operations restricts its applicability in scenarios where real-time or batch updates to Iceberg tables are required. Additionally, PyIceberg's existing implementation tightly couples file generation and transaction commitment, which is not suitable for a distributed write operation managed by Ray.
Proposed Solution
To achieve distributed writes to Iceberg tables, we propose the following steps:
- Modify PyIceberg Transaction Handling:
- Decouple the transaction management from the append operation.
- Introduce a method generate_data_files that prepares Parquet files for writing without committing them immediately.
- Provide a mechanism to commit transactions manually after all nodes have completed their writes.
- Develop IcebergDataSink in Ray:
- Implement the IcebergDataSink class within Ray Data to handle distributed writes.
- Utilize Ray's distributed computing capabilities to write Parquet files across multiple nodes.
- Coordinate the commit process to maintain data consistency and integrity.
- Handle Failed Writes:
- Implement a cleanup mechanism to remove any partially written or failed data files (data_file) to prevent zombie data from cluttering the storage system.
- Ensure this cleanup process is idempotent and can be safely retried if necessary.
Use case
No response