Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,23 @@ spark.readStream.format("fake").load().writeStream.format("console").start()

## Example Data Sources

| Data Source | Short Name | Description | Dependencies |
|-------------------------------------------------------------------------|----------------|-----------------------------------------------|-----------------------|
| [GithubDataSource](pyspark_datasources/github.py) | `github` | Read pull requests from a Github repository | None |
| [FakeDataSource](pyspark_datasources/fake.py) | `fake` | Generate fake data using the `Faker` library | `faker` |
| [StockDataSource](pyspark_datasources/stock.py) | `stock` | Read stock data from Alpha Vantage | None |
| [GoogleSheetsDataSource](pyspark_datasources/googlesheets.py) | `googlesheets` | Read table from public Google Sheets | None |
| [KaggleDataSource](pyspark_datasources/kaggle.py) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [SimpleJsonDataSource](pyspark_datasources/simplejson.py) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` |
| [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Read from OpenSky Network. | None |
| [SalesforceDataSource](pyspark_datasources/salesforce.py) | `pyspark.datasource.salesforce` | Streaming datasource for writing data to Salesforce | `simple-salesforce` |
| Data Source | Short Name | Type | Description | Dependencies | Example |
|-------------------------------------------------------------------------|----------------|----------------|-----------------------------------------------|-----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Batch Read** | | | | | |
| [ArrowDataSource](pyspark_datasources/arrow.py) | `arrow` | Batch Read | Read Apache Arrow files (.arrow) | `pyarrow` | `pip install pyspark-data-sources[arrow]`<br/>`spark.read.format("arrow").load("/path/to/file.arrow")` |
| [FakeDataSource](pyspark_datasources/fake.py) | `fake` | Batch/Streaming Read | Generate fake data using the `Faker` library | `faker` | `pip install pyspark-data-sources[fake]`<br/>`spark.read.format("fake").load()` or `spark.readStream.format("fake").load()` |
| [GithubDataSource](pyspark_datasources/github.py) | `github` | Batch Read | Read pull requests from a Github repository | None | `pip install pyspark-data-sources`<br/>`spark.read.format("github").load("apache/spark")` |
| [GoogleSheetsDataSource](pyspark_datasources/googlesheets.py) | `googlesheets` | Batch Read | Read table from public Google Sheets | None | `pip install pyspark-data-sources`<br/>`spark.read.format("googlesheets").load("https://docs.google.com/spreadsheets/d/...")` |
| [HuggingFaceDatasets](pyspark_datasources/huggingface.py) | `huggingface` | Batch Read | Read datasets from HuggingFace Hub | `datasets` | `pip install pyspark-data-sources[huggingface]`<br/>`spark.read.format("huggingface").load("imdb")` |
| [KaggleDataSource](pyspark_datasources/kaggle.py) | `kaggle` | Batch Read | Read datasets from Kaggle | `kagglehub`, `pandas` | `pip install pyspark-data-sources[kaggle]`<br/>`spark.read.format("kaggle").load("titanic")` |
| [StockDataSource](pyspark_datasources/stock.py) | `stock` | Batch Read | Read stock data from Alpha Vantage | None | `pip install pyspark-data-sources`<br/>`spark.read.format("stock").option("symbols", "AAPL,GOOGL").option("api_key", "key").load()` |
| **Batch Write** | | | | | |
| [LanceSink](pyspark_datasources/lance.py) | `lance` | Batch Write | Write data in Lance format | `lance` | `pip install pyspark-data-sources[lance]`<br/>`df.write.format("lance").mode("append").save("/tmp/lance_data")` |
| **Streaming Read** | | | | | |
| [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Streaming Read | Read from OpenSky Network. | None | `pip install pyspark-data-sources`<br/>`spark.readStream.format("opensky").option("region", "EUROPE").load()` |
| [WeatherDataSource](pyspark_datasources/weather.py) | `weather` | Streaming Read | Fetch weather data from tomorrow.io | None | `pip install pyspark-data-sources`<br/>`spark.readStream.format("weather").option("locations", "[(37.7749, -122.4194)]").option("apikey", "key").load()` |
| **Streaming Write** | | | | | |
| [SalesforceDataSource](pyspark_datasources/salesforce.py) | `pyspark.datasource.salesforce` | Streaming Write | Streaming datasource for writing data to Salesforce | `simple-salesforce` | `pip install pyspark-data-sources[salesforce]`<br/>`df.writeStream.format("pyspark.datasource.salesforce").option("username", "user").start()` |

See more here: https://allisonwang-db.github.io/pyspark-data-sources/.

Expand Down
6 changes: 6 additions & 0 deletions docs/datasources/arrow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# ArrowDataSource

> Requires the [`PyArrow`](https://arrow.apache.org/docs/python/) library. You can install it manually: `pip install pyarrow`
> or use `pip install pyspark-data-sources[arrow]`.

::: pyspark_datasources.arrow.ArrowDataSource
3 changes: 3 additions & 0 deletions docs/datasources/jsonplaceholder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# JSONPlaceholderDataSource

::: pyspark_datasources.jsonplaceholder.JSONPlaceholderDataSource
6 changes: 6 additions & 0 deletions docs/datasources/lance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# LanceSink

> Requires the [`Lance`](https://lancedb.github.io/lance/) library. You can install it manually: `pip install lance`
> or use `pip install pyspark-data-sources[lance]`.

::: pyspark_datasources.lance.LanceSink
5 changes: 5 additions & 0 deletions docs/datasources/opensky.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# OpenSkyDataSource

> No additional dependencies required. Uses the OpenSky Network REST API for real-time aircraft tracking data.

::: pyspark_datasources.opensky.OpenSkyDataSource
5 changes: 5 additions & 0 deletions docs/datasources/weather.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# WeatherDataSource

> No additional dependencies required. Uses the Tomorrow.io API for weather data. Requires an API key.

::: pyspark_datasources.weather.WeatherDataSource
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None |
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None |
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ nav:
- datasources/salesforce.md
- datasources/googlesheets.md
- datasources/kaggle.md
- datasources/jsonplaceholder.md

markdown_extensions:
- pymdownx.highlight:
Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyspark_datasources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from .salesforce import SalesforceDataSource
from .simplejson import SimpleJsonDataSource
from .stock import StockDataSource
from .jsonplaceholder import JSONPlaceholderDataSource
224 changes: 224 additions & 0 deletions pyspark_datasources/jsonplaceholder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
from typing import Dict, Any, List, Iterator
import requests
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
from pyspark.sql.types import StructType
from pyspark.sql import Row


class JSONPlaceholderDataSource(DataSource):
"""
A PySpark data source for JSONPlaceholder API.

JSONPlaceholder is a free fake REST API for testing and prototyping.
This data source provides access to posts, users, todos, comments, albums, and photos.

Supported endpoints:
- posts: Blog posts with userId, id, title, body
- users: User profiles with complete information
- todos: Todo items with userId, id, title, completed
- comments: Comments with postId, id, name, email, body
- albums: Albums with userId, id, title
- photos: Photos with albumId, id, title, url, thumbnailUrl

Name: `jsonplaceholder`

Examples
--------
Register the data source:

>>> spark.dataSource.register(JSONPlaceholderDataSource)

Read posts (default):

>>> spark.read.format("jsonplaceholder").load().show()

Read users:

>>> spark.read.format("jsonplaceholder").option("endpoint", "users").load().show()

Read with limit:

>>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show()

Read specific item:

>>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show()

Referential Integrity
-------------------
The data source supports joining related datasets:

1. Posts and Users relationship:
posts.userId = users.id
>>> posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
>>> users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
>>> posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id)

2. Posts and Comments relationship:
comments.postId = posts.id
>>> comments_df = spark.read.format("jsonplaceholder").option("endpoint", "comments").load()
>>> posts_with_comments = posts_df.join(comments_df, posts_df.id == comments_df.postId)

3. Users, Albums and Photos relationship:
albums.userId = users.id
photos.albumId = albums.id
>>> albums_df = spark.read.format("jsonplaceholder").option("endpoint", "albums").load()
>>> photos_df = spark.read.format("jsonplaceholder").option("endpoint", "photos").load()
>>> user_albums = users_df.join(albums_df, users_df.id == albums_df.userId)
>>> user_photos = user_albums.join(photos_df, albums_df.id == photos_df.albumId)
"""

@classmethod
def name(cls) -> str:
return "jsonplaceholder"

def __init__(self, options=None):
self.options = options or {}

def schema(self) -> str:
""" Returns the schema for the selected endpoint."""
schemas = {
"posts": "userId INT, id INT, title STRING, body STRING",
"users": ("id INT, name STRING, username STRING, email STRING, phone STRING, "
"website STRING, address_street STRING, address_suite STRING, "
"address_city STRING, address_zipcode STRING, address_geo_lat STRING, "
"address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, "
"company_bs STRING"),
"todos": "userId INT, id INT, title STRING, completed BOOLEAN",
"comments": "postId INT, id INT, name STRING, email STRING, body STRING",
"albums": "userId INT, id INT, title STRING",
"photos": "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING"
}

endpoint = self.options.get("endpoint", "posts")
return schemas.get(endpoint, schemas["posts"])

def reader(self, schema: StructType) -> DataSourceReader:
return JSONPlaceholderReader(self.options)


class JSONPlaceholderReader(DataSourceReader):
"""Reader implementation for JSONPlaceholder API"""

def __init__(self, options: Dict[str, str]):
self.options = options
self.base_url = "https://jsonplaceholder.typicode.com"

self.endpoint = self.options.get("endpoint", "posts")
self.limit = self.options.get("limit")
self.id = self.options.get("id")

def partitions(self) -> List[InputPartition]:
return [InputPartition(0)]

def read(self, partition: InputPartition) -> Iterator[Row]:
url = f"{self.base_url}/{self.endpoint}"

if self.id:
url += f"/{self.id}"

params = {}
if self.limit and not self.id:
params["_limit"] = self.limit

try:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()

data = response.json()

if isinstance(data, dict):
data = [data]
elif not isinstance(data, list):
data = []

return iter([self._process_item(item) for item in data])

except requests.RequestException as e:
print(f"Failed to fetch data from {url}: {e}")
return iter([])
except ValueError as e:
print(f"Failed to parse JSON from {url}: {e}")
return iter([])
except Exception as e:
print(f"Unexpected error while reading data: {e}")
return iter([])

def _process_item(self, item: Dict[str, Any]) -> Row:
"""Process individual items based on endpoint type"""

def _process_posts(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", ""),
body=item.get("body", "")
)

def _process_users(item):
address = item.get("address", {})
geo = address.get("geo", {})
company = item.get("company", {})

return Row(
id=item.get("id"),
name=item.get("name", ""),
username=item.get("username", ""),
email=item.get("email", ""),
phone=item.get("phone", ""),
website=item.get("website", ""),
address_street=address.get("street", ""),
address_suite=address.get("suite", ""),
address_city=address.get("city", ""),
address_zipcode=address.get("zipcode", ""),
address_geo_lat=geo.get("lat", ""),
address_geo_lng=geo.get("lng", ""),
company_name=company.get("name", ""),
company_catchPhrase=company.get("catchPhrase", ""),
company_bs=company.get("bs", "")
)

def _process_todos(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", ""),
completed=item.get("completed", False)
)

def _process_comments(item):
return Row(
postId=item.get("postId"),
id=item.get("id"),
name=item.get("name", ""),
email=item.get("email", ""),
body=item.get("body", "")
)

def _process_albums(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", "")
)

def _process_photos(item):
return Row(
albumId=item.get("albumId"),
id=item.get("id"),
title=item.get("title", ""),
url=item.get("url", ""),
thumbnailUrl=item.get("thumbnailUrl", "")
)

processors = {
"posts": _process_posts,
"users": _process_users,
"todos": _process_todos,
"comments": _process_comments,
"albums": _process_albums,
"photos": _process_photos
}

processor = processors.get(self.endpoint, _process_posts)
return processor(item)
15 changes: 14 additions & 1 deletion tests/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def test_opensky_datasource_stream(spark):
assert len(result.columns) == 18 # Check schema has expected number of fields
assert result.count() > 0 # Verify we got some data


def test_salesforce_datasource_registration(spark):
"""Test that Salesforce DataSource can be registered and validates required options."""
spark.dataSource.register(SalesforceDataSource)
Expand Down Expand Up @@ -176,3 +175,17 @@ def test_arrow_datasource_multiple_files(spark):
rows = df.collect()
names = {row["name"] for row in rows}
assert names == {"Alice", "Bob", "Charlie", "Diana"}

def test_jsonplaceholder_posts(spark):
spark.dataSource.register(JSONPlaceholderDataSource)
posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
assert posts_df.count() > 0 # Ensure we have some posts


def test_jsonplaceholder_referential_integrity(spark):
spark.dataSource.register(JSONPlaceholderDataSource)
users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
assert users_df.count() > 0 # Ensure we have some users
posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id)
assert posts_with_authors.count() > 0 # Ensure join is valid and we have posts with authors
Loading