Skip to content

Commit 82554f4

Browse files
Merge remote-tracking branch 'origin/master' into salesforce-sink
2 parents ac0e63f + 7c4e881 commit 82554f4

21 files changed

+932
-335
lines changed

CLAUDE.md

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
# PySpark Data Sources - Project Context for Claude
2+
3+
## Project Overview
4+
This is a demonstration library showcasing custom Spark data sources built using Apache Spark 4.0's new Python Data Source API. The project provides various data source connectors for reading from external APIs and services.
5+
6+
**Important**: This is a demo/educational project and not intended for production use.
7+
8+
## Tech Stack
9+
- **Language**: Python (3.9-3.12)
10+
- **Framework**: Apache Spark 4.0+ (PySpark)
11+
- **Package Management**: Poetry
12+
- **Documentation**: MkDocs with Material theme
13+
- **Testing**: pytest
14+
- **Dependencies**: PyArrow, requests, faker, and optional extras
15+
16+
## Project Structure
17+
```
18+
pyspark_datasources/
19+
├── __init__.py # Main package exports
20+
├── fake.py # Fake data generator using Faker
21+
├── github.py # GitHub repository data connector
22+
├── googlesheets.py # Public Google Sheets reader
23+
├── huggingface.py # Hugging Face datasets connector
24+
├── kaggle.py # Kaggle datasets connector
25+
├── lance.py # Lance vector database connector
26+
├── opensky.py # OpenSky flight data connector
27+
├── simplejson.py # JSON writer for Databricks DBFS
28+
├── stock.py # Alpha Vantage stock data reader
29+
└── weather.py # Weather data connector
30+
```
31+
32+
## Available Data Sources
33+
| Short Name | File | Description | Dependencies |
34+
|---------------|------|-------------|--------------|
35+
| `fake` | fake.py | Generate fake data using Faker | faker |
36+
| `github` | github.py | Read GitHub repository PRs | None |
37+
| `googlesheets`| googlesheets.py | Read public Google Sheets | None |
38+
| `huggingface` | huggingface.py | Access Hugging Face datasets | datasets |
39+
| `kaggle` | kaggle.py | Read Kaggle datasets | kagglehub, pandas |
40+
| `opensky` | opensky.py | Flight data from OpenSky Network | None |
41+
| `simplejson` | simplejson.py | Write JSON to Databricks DBFS | databricks-sdk |
42+
| `stock` | stock.py | Stock data from Alpha Vantage | None |
43+
44+
## Development Commands
45+
46+
### Environment Setup
47+
```bash
48+
poetry install # Install dependencies
49+
poetry install --extras all # Install all optional dependencies
50+
poetry shell # Activate virtual environment
51+
```
52+
53+
### Testing
54+
```bash
55+
# Note: On macOS, set this environment variable to avoid fork safety issues
56+
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
57+
58+
pytest # Run all tests
59+
pytest tests/test_data_sources.py # Run specific test file
60+
pytest tests/test_data_sources.py::test_arrow_datasource_single_file -v # Run a specific test
61+
```
62+
63+
### Documentation
64+
```bash
65+
mkdocs serve # Start local docs server
66+
mkdocs build # Build static documentation
67+
```
68+
69+
### Code Formatting
70+
This project uses [Ruff](https://github.com/astral-sh/ruff) for code formatting and linting.
71+
72+
```bash
73+
poetry run ruff format . # Format code
74+
poetry run ruff check . # Run linter
75+
poetry run ruff check . --fix # Run linter with auto-fix
76+
```
77+
78+
### Package Management
79+
Please refer to RELEASE.md for more details.
80+
```bash
81+
poetry build # Build package
82+
poetry publish # Publish to PyPI (requires auth)
83+
poetry add <package> # Add new dependency
84+
poetry update # Update dependencies
85+
```
86+
87+
## Usage Patterns
88+
All data sources follow the Spark Data Source API pattern:
89+
90+
```python
91+
from pyspark_datasources import FakeDataSource
92+
93+
# Register the data source
94+
spark.dataSource.register(FakeDataSource)
95+
96+
# Batch reading
97+
df = spark.read.format("fake").option("numRows", 100).load()
98+
99+
# Streaming (where supported)
100+
stream = spark.readStream.format("fake").load()
101+
```
102+
103+
## Testing Strategy
104+
- Tests use pytest with PySpark session fixtures
105+
- Each data source has basic functionality tests
106+
- Tests verify data reading and schema validation
107+
- Some tests may require external API access
108+
109+
## Key Implementation Details
110+
- All data sources inherit from Spark's DataSource base class
111+
- Implements reader() method for batch reading
112+
- Some implement streamReader() for streaming
113+
- Schema is defined using PySpark StructType
114+
- Options are passed via Spark's option() method
115+
116+
## External Dependencies
117+
- **GitHub API**: Uses public API, no auth required
118+
- **Alpha Vantage**: Stock data API (may require API key)
119+
- **Google Sheets**: Public sheets only, no auth
120+
- **Kaggle**: Requires Kaggle API credentials
121+
- **Databricks**: SDK for DBFS access
122+
- **OpenSky**: Public flight data API
123+
124+
## Common Issues
125+
- Ensure PySpark >= 4.0.0 is installed
126+
- Some data sources require API keys/credentials
127+
- Network connectivity required for external APIs
128+
- Rate limiting may affect some external services
129+
130+
## Python Data Source API Specification
131+
132+
### Core Abstract Base Classes
133+
134+
#### DataSource
135+
Primary abstract base class for custom data sources supporting read/write operations.
136+
137+
**Key Methods:**
138+
- `__init__(self, options: Dict[str, str])` - Initialize with user options
139+
- `name() -> str` - Return format name (defaults to class name)
140+
- `schema() -> StructType` - Define data source schema
141+
- `reader(schema: StructType) -> DataSourceReader` - Create batch reader
142+
- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer
143+
- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader
144+
- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer
145+
- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader
146+
147+
#### DataSourceReader
148+
Abstract base class for reading data from sources.
149+
150+
**Key Methods:**
151+
- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch
152+
- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading
153+
154+
#### DataSourceStreamReader
155+
Abstract base class for streaming data sources with offset management.
156+
157+
**Key Methods:**
158+
- `initialOffset() -> Offset` - Return starting offset
159+
- `latestOffset() -> Offset` - Return latest available offset
160+
- `partitions(start: Offset, end: Offset) -> List[InputPartition]` - Get partitions for offset range
161+
- `read(partition) -> Iterator` - Read data from partition
162+
- `commit(end: Offset)` - Mark offsets as processed
163+
- `stop()` - Clean up resources
164+
165+
#### DataSourceWriter
166+
Abstract base class for writing data to external sources.
167+
168+
**Key Methods:**
169+
- `write(iterator) -> WriteResult` - Write data from iterator
170+
- `abort(messages: List[WriterCommitMessage])` - Handle write failures
171+
- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes
172+
173+
#### DataSourceArrowWriter
174+
Optimized writer using PyArrow RecordBatch for improved performance.
175+
176+
### Implementation Requirements
177+
178+
1. **Serialization**: All classes must be pickle serializable
179+
2. **Schema Definition**: Use PySpark StructType for schema specification
180+
3. **Data Types**: Support standard Spark SQL data types
181+
4. **Error Handling**: Implement proper exception handling
182+
5. **Resource Management**: Clean up resources properly in streaming sources
183+
6. **Use load() for paths**: Specify file paths in `load("/path")`, not `option("path", "/path")`
184+
185+
### Usage Patterns
186+
187+
```python
188+
# Custom data source implementation
189+
class MyDataSource(DataSource):
190+
def __init__(self, options):
191+
self.options = options
192+
193+
def name(self):
194+
return "myformat"
195+
196+
def schema(self):
197+
return StructType([StructField("id", IntegerType(), True)])
198+
199+
def reader(self, schema):
200+
return MyDataSourceReader(self.options, schema)
201+
202+
# Registration and usage
203+
spark.dataSource.register(MyDataSource)
204+
df = spark.read.format("myformat").option("key", "value").load()
205+
```
206+
207+
### Performance Optimizations
208+
209+
1. **Arrow Integration**: Return `pyarrow.RecordBatch` for better serialization
210+
2. **Partitioning**: Implement `partitions()` for parallel processing
211+
3. **Lazy Evaluation**: Defer expensive operations until read time
212+
213+
## Documentation
214+
- Main docs: https://allisonwang-db.github.io/pyspark-data-sources/
215+
- Individual data source docs in `docs/datasources/`
216+
- Spark Data Source API: https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html
217+
- API Source Code: https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py
218+
219+
### Data Source Docstring Guidelines
220+
When creating new data sources, include these sections in the class docstring:
221+
222+
**Required Sections:**
223+
- Brief description and `Name: "format_name"`
224+
- `Options` section documenting all parameters with types/defaults
225+
- `Examples` section with registration and basic usage
226+
227+
**Key Guidelines:**
228+
- **Include schema output**: Show `df.printSchema()` results for clarity
229+
- **Document error cases**: Show what happens with missing files/invalid options
230+
- **Document partitioning strategy**: Show how data sources leverage partitioning to increase performance
231+
- **Document Arrow optimization**: Show how data data sources use Arrow to transmit data

README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# PySpark Data Sources
22

33
[![pypi](https://img.shields.io/pypi/v/pyspark-data-sources.svg?color=blue)](https://pypi.org/project/pyspark-data-sources/)
4+
[![code style: ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
45

56
This repository showcases custom Spark data sources built using the new [**Python Data Source API**](https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html) introduced in Apache Spark 4.0.
67
For an in-depth understanding of the API, please refer to the [API source code](https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py).
@@ -58,6 +59,29 @@ For production use, consider these official data source implementations built wi
5859
|--------------------------|-----------------------------------------------------------------------------------------------|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|
5960
| **HuggingFace Datasets** | [@huggingface/pyspark_huggingface](https://github.com/huggingface/pyspark_huggingface) | Production-ready Spark Data Source for 🤗 Hugging Face Datasets | • Stream datasets as Spark DataFrames<br>• Select subsets/splits with filters<br>• Authentication support<br>• Save DataFrames to Hugging Face<br> |
6061

62+
## Data Source Naming Convention
63+
64+
When creating custom data sources using the Python Data Source API, follow these naming conventions for the `short_name` parameter:
65+
66+
### Recommended Approach
67+
- **Use the system name directly**: Use lowercase system names like `huggingface`, `opensky`, `googlesheets`, etc.
68+
- This provides clear, intuitive naming that matches the service being integrated
69+
70+
### Conflict Resolution
71+
- **If there's a naming conflict**: Use the format `pyspark.datasource.<system_name>`
72+
- Example: `pyspark.datasource.salesforce` if "salesforce" conflicts with existing naming
73+
74+
### Examples from this repository:
75+
```python
76+
# Direct system naming (preferred)
77+
spark.read.format("github").load() # GithubDataSource
78+
spark.read.format("googlesheets").load() # GoogleSheetsDataSource
79+
spark.read.format("opensky").load() # OpenSkyDataSource
80+
81+
# Namespaced format (when conflicts exist)
82+
spark.read.format("pyspark.datasource.opensky").load()
83+
```
84+
6185
## Contributing
6286
We welcome and appreciate any contributions to enhance and expand the custom data sources.:
6387

@@ -77,3 +101,17 @@ poetry env activate
77101
```
78102
mkdocs serve
79103
```
104+
105+
### Code Formatting
106+
This project uses [Ruff](https://github.com/astral-sh/ruff) for code formatting and linting.
107+
108+
```bash
109+
# Format code
110+
poetry run ruff format .
111+
112+
# Run linter
113+
poetry run ruff check .
114+
115+
# Run linter with auto-fix
116+
poetry run ruff check . --fix
117+
```

RELEASE.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,21 @@ Then follow steps 2-4 above.
119119
- Authenticate: `gh auth login`
120120
- Check repository access: `gh repo view`
121121

122+
### PyArrow Compatibility Issues
123+
124+
If you see `objc_initializeAfterForkError` crashes on macOS, set this environment variable:
125+
126+
```bash
127+
# For single commands
128+
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES python your_script.py
129+
130+
# For Poetry environment
131+
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES poetry run python your_script.py
132+
133+
# To set permanently in your shell (add to ~/.zshrc or ~/.bash_profile):
134+
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
135+
```
136+
122137
## Useful Poetry Version Commands
123138

124139
```bash

0 commit comments

Comments
 (0)