Skip to content

[Core] Extend Ray Data with IcebergDataSink for Distributed Writes to Iceberg Tables #49032

Open
@zszheng

Description

Description

Currently, Ray supports distributed reads from Iceberg tables but lacks support for distributed writes. To enhance the functionality of Ray in handling Iceberg tables, it is proposed to extend the datasink module within Ray Data by introducing an IcebergDataSink. This would allow users to perform distributed writes on Iceberg tables, significantly improving the utility and flexibility of Ray in big data processing workflows.

The current limitation of Ray's Iceberg table integration only supporting read operations restricts its applicability in scenarios where real-time or batch updates to Iceberg tables are required. Additionally, PyIceberg's existing implementation tightly couples file generation and transaction commitment, which is not suitable for a distributed write operation managed by Ray.

Proposed Solution
To achieve distributed writes to Iceberg tables, we propose the following steps:

  1. Modify PyIceberg Transaction Handling:
  • Decouple the transaction management from the append operation.
  • Introduce a method generate_data_files that prepares Parquet files for writing without committing them immediately.
  • Provide a mechanism to commit transactions manually after all nodes have completed their writes.
  1. Develop IcebergDataSink in Ray:
  • Implement the IcebergDataSink class within Ray Data to handle distributed writes.
  • Utilize Ray's distributed computing capabilities to write Parquet files across multiple nodes.
  • Coordinate the commit process to maintain data consistency and integrity.
  1. Handle Failed Writes:
  • Implement a cleanup mechanism to remove any partially written or failed data files (data_file) to prevent zombie data from cluttering the storage system.
  • Ensure this cleanup process is idempotent and can be safely retried if necessary.

Use case

No response

Metadata

Assignees

No one assigned

    Labels

    P2Important issue, but not time-criticaldataRay Data-related issuesenhancementRequest for new feature and/or capability

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions