-
Notifications
You must be signed in to change notification settings - Fork 281
Description
Summary
The native columnar-to-row conversion (CometNativeColumnarToRowExec) returns UnsafeRow objects that point directly into a Rust-owned buffer via raw pointer. This buffer is cleared and reused on each convert() call, which can cause SIGSEGV if any row reference outlives the current batch.
Root Cause
In NativeColumnarToRowConverter.scala, the NativeRowIterator does:
unsafeRow.pointTo(null, rowAddress, rowSize)This points the UnsafeRow at off-heap native memory owned by ColumnarToRowContext.buffer on the Rust side. The Rust convert() method calls self.buffer.clear() at the start of each invocation, invalidating all previous row pointers.
Affected Code Paths
Broadcast path (high risk)
In CometNativeColumnarToRowExec.scala (line ~110-130), multiple batches are lazily converted via flatMap and passed to mode.transform(rows, ...). When the second batch's convert() runs, any UnsafeRow from the first batch that hasn't been copied yet points to overwritten or freed memory. Whether this is safe depends on mode.transform() eagerly copying each row before the iterator advances — a fragile assumption.
Streaming path (lower risk)
The doExecute() path uses flatMap which naturally consumes one batch's iterator before requesting the next, so rows don't span batches. However, downstream Spark operators that retain InternalRow references (e.g., sort with spill, certain joins, caching) could still hold stale pointers.
Symptoms
- SIGSEGV in CI (intermittent, timing-dependent)
- Likely became more visible after enabling native columnar-to-row by default (feat: Enable native columnar-to-row by default #3299)
Suggested Fix
Return unsafeRow.copy() from NativeRowIterator.next() to copy each row to Java heap memory. This eliminates dangling pointer risk while preserving the performance benefit of native conversion (the copy is just a memcpy of already-serialized row bytes).
A more targeted fix would be to copy only in the broadcast path, or to change the Rust side to allocate a new buffer per batch instead of reusing.
Related
- feat: Enable native columnar-to-row by default #3299 (feat: Enable native columnar-to-row by default)
native/core/src/execution/columnar_to_row.rsspark/src/main/scala/org/apache/comet/NativeColumnarToRowConverter.scalaspark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala