Skip to content

[PEP] Apache Iceberg Ingestion Plugin #17694

@udaysagar2177

Description

@udaysagar2177

Motivation

Apache Iceberg is becoming the de-facto standard for data lake table formats, widely adopted across modern data platforms. Many organizations store their analytical data in Iceberg tables with frequent incremental updates. Currently, ingesting this data into Pinot requires one of the following approaches:

  1. Deploy Spark jobs to read Iceberg tables and write to Pinot, requiring additional infrastructure and operational overhead
  2. Use batch ingestion through minions, which adds latency and complication.
  3. Extract data file paths from committed file information or from Iceberg manifests, publish to Kafka, and use the new Kafka microbatch plugin proposed in #17331

This proposal explores the opportunity for native, near real-time ingestion from Iceberg tables by treating Iceberg snapshots as the source of file metadata, eliminating the need for Spark, additional orchestration, or Kafka as an intermediary.

Design Overview

The design reuses 95% of the Kafka microbatch infrastructure, created in #17331, replacing only the metadata source from Kafka to Iceberg.

Architecture Comparison

Kafka Microbatch:
Kafka Topic → Protocol Messages → MicroBatchQueueManager → Download Files → Ingest

Iceberg Microbatch:
Iceberg Snapshots → Manifest Files → MicroBatchQueueManager → Download Files → Ingest
                    ↑ NEW                ↑ REUSED (100%)

Key Components

1. IcebergMicroBatchConsumer (NEW)

Replaces KafkaPartitionLevelMicroBatchConsumer - polls Iceberg snapshots for new data files and submits them to the existing MicroBatchQueueManager for download and processing.

2. Offset Format (NEW)

Composite offset tracking snapshot ID, file path, and record position:

{
  "sid": 8723456789012345,
  "dfp": "s3://bucket/warehouse/db/table/data/file-001.parquet",
  "rof": 1500
}
  • sid: Snapshot ID (equivalent to Kafka offset)
  • dfp: Data file path (for identifying which file in snapshot)
  • rof: Record offset in file (for mid-file resume after segment commit)

How It Works

1. Consumer polls Iceberg table → table.refresh()
2. Compare current snapshot ID with last processed → detect new data
3. Use IncrementalAppendScan → get list of new DataFiles since last snapshot
4. For each DataFile:
   - Extract: file path, format, record count from manifest
   - Create MicroBatch object with file metadata
   - Submit to MicroBatchQueueManager (existing code!)
5. MicroBatchQueueManager:
   - Downloads file from PinotFS (S3, HDFS, etc.)
   - Converts to MessageBatch using existing readers
   - Returns to consumer
6. Consumer updates offset with new snapshot ID + file + record position

Open Questions

  1. Partition Mapping: Should each Iceberg partition correspond to a Pinot segment, or can multiple partitions be merged? What is the recommended mapping strategy? Additionally, how should Iceberg partitions map to Kafka topic partitions (or virtual partitions) expected by the consumption path?
  2. Compaction Handling: How should the ingestion system handle Iceberg compactions that replace or remove existing data files?
  3. Schema Evolution: How should schema changes in Iceberg tables be reflected in Pinot?

Metadata

Metadata

Assignees

No one assigned

    Labels

    PEP-RequestPinot Enhancement Proposal request to be reviewed.ingestion

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions