Skip to content

(feat) Implement Iceberg REST Catalog Metrics API for Compute Audit Correlation #3337

@obelix74

Description

@obelix74

Is your feature request related to a problem? Please describe.

Implement the Apache Iceberg REST Catalog Metrics API (POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics) to enable compute engines (Spark, Trino, Flink) to report query-level metrics back to the catalog. This enables correlation between catalog operations and compute-side query execution for comprehensive audit trails.

Motivation:

Currently, the Polaris catalog provides audit logging for catalog-level operations (table metadata access, namespace management, credential vending). However, critical query-level metrics are not visible at the catalog layer:

Data Point Available at Catalog? Why Not?
Rows read/written No Data flows directly from compute to storage
Query text (SQL) No Queries are composed in compute engines
Bytes processed No File scanning happens in compute engines
Query duration No Catalog only handles metadata lookups

This gap prevents:

  • Fine-grained audit trails (table → actual data access)
  • Cost allocation by catalog/namespace/table based on actual usage
  • Security forensics for data access patterns
  • Compliance reporting with complete access records

Describe the solution you'd like

Proposed Solution:

Implement the Iceberg REST Catalog Metrics endpoint as defined in the Apache Iceberg REST Catalog OpenAPI Specification (line 1316):

POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics

The endpoint accepts ReportMetricsRequest payloads containing either:

  • ScanReport: For read operations (snapshot-id, filter, projected fields, scan metrics)
  • CommitReport: For write operations (snapshot-id, sequence-number, operation, commit metrics)

Correlation with catalog events is achieved via trace-id passed in the metadata field of the metrics report.

Acceptance Criteria:

  • Implement POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics endpoint
  • Accept ReportMetricsRequest schema (ScanReport and CommitReport)
  • Validate request against Iceberg OpenAPI specification
  • Return HTTP 204 No Content on success (per Iceberg spec)
  • Return standard IcebergErrorResponse on errors (400, 401, 403, 404, 5XX)
  • Extract trace-id from metadata field for event correlation
  • Emit audit event with metrics data for downstream correlation
  • Support OAuth2 and Bearer token authentication (per Iceberg security schemes)
  • Documentation updated with endpoint usage and examples
  • Unit and integration tests added

Describe alternatives you've considered

1. Custom audit reporting endpoint

Pros: Full control over schema and behavior
Cons: Non-standard; requires custom client implementations; not compatible with existing Iceberg clients

2. Extend existing catalog events with compute metrics

Pros: Single event stream
Cons: Catalog doesn't have access to compute-side metrics; would require invasive changes to compute engines

3. External correlation via timestamp

Current approach: Join audit logs and compute logs by time window
Cons: Non-deterministic; fails with concurrent requests; complex queries; no guaranteed correlation

4. Use AWS CloudTrail/S3 access logs for correlation

Pros: Captures actual S3 access
Cons: Requires AWS STS session tags (see #3325); doesn't capture Iceberg-specific metrics like snapshot-id, filter expressions

Additional context

Dependencies:

  • Requires compute engines (Spark, Trino, Flink) to implement metrics reporting via their respective listener interfaces
  • Compute engines must propagate trace-id from catalog responses to metrics reports

Related:

Example ScanReport Payload:

{
  "report-type": "scan-report",
  "table-name": "analytics.user_events",
  "snapshot-id": 3497810964824022504,
  "filter": { "type": "eq", "term": "event_date", "value": "2025-12-22" },
  "schema-id": 1,
  "projected-field-ids": [1, 2, 3, 5],
  "projected-field-names": ["id", "user_id", "event_type", "timestamp"],
  "metrics": {
    "total-planning-duration": { "count": 1, "time-unit": "nanoseconds", "total-duration": 2644235116 },
    "result-data-files": { "unit": "count", "value": 47 },
    "total-file-size-bytes": { "unit": "bytes", "value": 5368709120 }
  },
  "metadata": {
    "trace-id": "abc123def456789012345678901234ab",
    "compute-engine": "spark-3.5.0",
    "cluster-id": "emr-cluster-abc123"
  }
}

Example CommitReport Payload:

{
  "report-type": "commit-report",
  "table-name": "analytics.user_events",
  "snapshot-id": 3497810964824022505,
  "sequence-number": 42,
  "operation": "append",
  "metrics": {
    "total-duration": { "count": 1, "time-unit": "nanoseconds", "total-duration": 1523456789 },
    "added-data-files": { "unit": "count", "value": 12 },
    "added-records": { "unit": "count", "value": 1500000 }
  },
  "metadata": {
    "trace-id": "abc123def456789012345678901234ab",
    "compute-engine": "spark-3.5.0"
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions