Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,40 +135,57 @@ stream = spark.readStream.format("fake").load()
Primary abstract base class for custom data sources supporting read/write operations.

**Key Methods:**
- `__init__(self, options: Dict[str, str])` - Initialize with user options
- `name() -> str` - Return format name (defaults to class name)
- `schema() -> StructType` - Define data source schema
- `reader(schema: StructType) -> DataSourceReader` - Create batch reader
- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer
- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader
- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer
- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader
- `__init__(self, options: Dict[str, str])` - Initialize with user options (Optional; base class provides default)
- `name() -> str` - Return format name (Optional to override; defaults to class name)
- `schema() -> StructType` - Define data source schema (Required)
- `reader(schema: StructType) -> DataSourceReader` - Create batch reader (Required if batch read is supported)
- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer (Required if batch write is supported)
- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader (Required if streaming read is supported and `simpleStreamReader` is not implemented)
- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer (Required if streaming write is supported)
- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader (Required if streaming read is supported and `streamReader` is not implemented)

#### DataSourceReader
Abstract base class for reading data from sources.

**Key Methods:**
- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch
- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading
- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch (Required)
- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading (Optional; defaults to a single partition)

#### DataSourceStreamReader
Abstract base class for streaming data sources with offset management.

**Key Methods:**
- `initialOffset() -> Offset` - Return starting offset
- `latestOffset() -> Offset` - Return latest available offset
- `partitions(start: Offset, end: Offset) -> List[InputPartition]` - Get partitions for offset range
- `read(partition) -> Iterator` - Read data from partition
- `commit(end: Offset)` - Mark offsets as processed
- `stop()` - Clean up resources
- `initialOffset() -> dict` - Return starting offset (Required)
- `latestOffset() -> dict` - Return latest available offset (Required)
- `partitions(start: dict, end: dict) -> List[InputPartition]` - Get partitions for offset range (Required)
- `read(partition) -> Iterator` - Read data from partition (Required)
- `commit(end: dict) -> None` - Mark offsets as processed (Optional)
- `stop() -> None` - Clean up resources (Optional)

#### SimpleDataSourceStreamReader
Simplified streaming reader interface without partition planning.

**Key Methods:**
- `initialOffset() -> dict` - Return starting offset (Required)
- `read(start: dict) -> Tuple[Iterator, dict]` - Read from start offset; return an iterator and the next start offset (Required)
- `readBetweenOffsets(start: dict, end: dict) -> Iterator` - Deterministic replay between offsets for recovery (Optional; recommended for reliable recovery)
- `commit(end: dict) -> None` - Mark offsets as processed (Optional)

#### DataSourceWriter
Abstract base class for writing data to external sources.

**Key Methods:**
- `write(iterator) -> WriteResult` - Write data from iterator
- `abort(messages: List[WriterCommitMessage])` - Handle write failures
- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes
- `write(iterator) -> WriteResult` - Write data from iterator (Required)
- `abort(messages: List[WriterCommitMessage])` - Handle write failures (Optional)
- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes (Required)

#### DataSourceStreamWriter
Abstract base class for writing data to external sinks in streaming queries.

**Key Methods:**
- `write(iterator) -> WriterCommitMessage` - Write data for a partition and return a commit message (Required)
- `commit(messages: List[WriterCommitMessage], batchId: int) -> None` - Commit successful microbatch writes (Required)
- `abort(messages: List[WriterCommitMessage], batchId: int) -> None` - Handle write failures for a microbatch (Optional)

#### DataSourceArrowWriter
Optimized writer using PyArrow RecordBatch for improved performance.
Expand Down