Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/datasources/jsonplaceholder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# JSONPlaceholderDataSource

::: pyspark_datasources.jsonplaceholder.JSONPlaceholderDataSource
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None |
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None |
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ nav:
- datasources/salesforce.md
- datasources/googlesheets.md
- datasources/kaggle.md
- datasources/jsonplaceholder.md

markdown_extensions:
- pymdownx.highlight:
Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyspark_datasources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from .salesforce import SalesforceDataSource
from .simplejson import SimpleJsonDataSource
from .stock import StockDataSource
from .jsonplaceholder import JSONPlaceholderDataSource
224 changes: 224 additions & 0 deletions pyspark_datasources/jsonplaceholder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
from typing import Dict, Any, List, Iterator
import requests
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
from pyspark.sql.types import StructType
from pyspark.sql import Row


class JSONPlaceholderDataSource(DataSource):
"""
A PySpark data source for JSONPlaceholder API.
JSONPlaceholder is a free fake REST API for testing and prototyping.
This data source provides access to posts, users, todos, comments, albums, and photos.
Supported endpoints:
- posts: Blog posts with userId, id, title, body
- users: User profiles with complete information
- todos: Todo items with userId, id, title, completed
- comments: Comments with postId, id, name, email, body
- albums: Albums with userId, id, title
- photos: Photos with albumId, id, title, url, thumbnailUrl
Name: `jsonplaceholder`
Examples
--------
Register the data source:
>>> spark.dataSource.register(JSONPlaceholderDataSource)
Read posts (default):
>>> spark.read.format("jsonplaceholder").load().show()
Read users:
>>> spark.read.format("jsonplaceholder").option("endpoint", "users").load().show()
Read with limit:
>>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show()
Comment on lines +39 to +41
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the default number of records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not huge.

{'users': 10,
'albums': 100,
'comments': 500,
'photos': 5000,
'posts': 100,
'todos': 200}

Read specific item:
>>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show()
Referential Integrity
-------------------
The data source supports joining related datasets:
1. Posts and Users relationship:
posts.userId = users.id
>>> posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
>>> users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
>>> posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id)
2. Posts and Comments relationship:
comments.postId = posts.id
>>> comments_df = spark.read.format("jsonplaceholder").option("endpoint", "comments").load()
>>> posts_with_comments = posts_df.join(comments_df, posts_df.id == comments_df.postId)
3. Users, Albums and Photos relationship:
albums.userId = users.id
photos.albumId = albums.id
>>> albums_df = spark.read.format("jsonplaceholder").option("endpoint", "albums").load()
>>> photos_df = spark.read.format("jsonplaceholder").option("endpoint", "photos").load()
>>> user_albums = users_df.join(albums_df, users_df.id == albums_df.userId)
>>> user_photos = user_albums.join(photos_df, albums_df.id == photos_df.albumId)
"""

@classmethod
def name(cls) -> str:
return "jsonplaceholder"

def __init__(self, options=None):
self.options = options or {}

def schema(self) -> str:
""" Returns the schema for the selected endpoint."""
schemas = {
"posts": "userId INT, id INT, title STRING, body STRING",
"users": ("id INT, name STRING, username STRING, email STRING, phone STRING, "
"website STRING, address_street STRING, address_suite STRING, "
"address_city STRING, address_zipcode STRING, address_geo_lat STRING, "
"address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, "
"company_bs STRING"),
"todos": "userId INT, id INT, title STRING, completed BOOLEAN",
"comments": "postId INT, id INT, name STRING, email STRING, body STRING",
"albums": "userId INT, id INT, title STRING",
"photos": "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING"
}

endpoint = self.options.get("endpoint", "posts")
return schemas.get(endpoint, schemas["posts"])

Comment on lines +93 to +95
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Normalize endpoint and align schema()/reader() behavior; add visible fallback for unknown values

Without normalization, schema() falls back to posts for unknown endpoints while reader() will hit the unknown path and return empty, producing a confusing empty DF with a posts schema. Normalize the option once and use a visible fallback.

Apply this diff:

@@
-        endpoint = self.options.get("endpoint", "posts")
-        return schemas.get(endpoint, schemas["posts"])
+        endpoint_opt = self.options.get("endpoint", "posts")
+        endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
+        return schemas.get(endpoint, schemas["posts"])
@@
-        self.endpoint = self.options.get("endpoint", "posts")
+        endpoint_opt = self.options.get("endpoint", "posts")
+        self.endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
+        if self.endpoint not in ("posts", "users", "todos", "comments", "albums", "photos"):
+            print(f"Unknown endpoint '{self.endpoint}', defaulting to 'posts'")
+            self.endpoint = "posts"

Also applies to: 107-110

🤖 Prompt for AI Agents
pyspark_datasources/jsonplaceholder.py lines ~93-95 and ~107-110: normalize the
"endpoint" option once (e.g., endpoint =
self.options.get("endpoint","posts").strip().lower()), then check if endpoint is
in the schemas mapping; if not, emit a visible fallback (e.g., warning log via
self.spark._jvm or Python logger) and set endpoint = "posts". Use this
normalized/validated endpoint variable for both schema() and reader() paths so
unknown values consistently fall back to the posts schema (and the reader
returns rows for posts rather than an empty DF).

def reader(self, schema: StructType) -> DataSourceReader:
return JSONPlaceholderReader(self.options)


class JSONPlaceholderReader(DataSourceReader):
"""Reader implementation for JSONPlaceholder API"""

def __init__(self, options: Dict[str, str]):
self.options = options
self.base_url = "https://jsonplaceholder.typicode.com"

self.endpoint = self.options.get("endpoint", "posts")
self.limit = self.options.get("limit")
self.id = self.options.get("id")

Comment on lines +103 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Normalize endpoint option and guard against unsupported values to keep schema/read consistent.

See proposed patch to normalize endpoint and default on unknown values (also keeps logs visible in executors via print).

-        self.endpoint = self.options.get("endpoint", "posts")
+        endpoint_opt = self.options.get("endpoint", "posts")
+        self.endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
+        if self.endpoint not in ("posts", "users", "todos", "comments", "albums", "photos"):
+            print(f"Unknown endpoint '{self.endpoint}', defaulting to 'posts'")
+            self.endpoint = "posts"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, options: Dict[str, str]):
self.options = options
self.base_url = "https://jsonplaceholder.typicode.com"
self.endpoint = self.options.get("endpoint", "posts")
self.limit = self.options.get("limit")
self.id = self.options.get("id")
def __init__(self, options: Dict[str, str]):
self.options = options
self.base_url = "https://jsonplaceholder.typicode.com"
endpoint_opt = self.options.get("endpoint", "posts")
self.endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
if self.endpoint not in ("posts", "users", "todos", "comments", "albums", "photos"):
print(f"Unknown endpoint '{self.endpoint}', defaulting to 'posts'")
self.endpoint = "posts"
self.limit = self.options.get("limit")
self.id = self.options.get("id")
🤖 Prompt for AI Agents
In pyspark_datasources/jsonplaceholder.py around lines 103 to 110, the endpoint
option is used raw which can lead to inconsistent behavior across executors;
normalize the endpoint by trimming whitespace and lowercasing it, validate it
against the supported endpoints (posts, comments, albums, photos, todos, users)
and if it is missing or unsupported default to "posts"; when defaulting or
rejecting an unsupported value emit a visible warning (use print so it appears
in executor logs) so callers know the fallback, and keep the rest of the
initialization unchanged.

def partitions(self) -> List[InputPartition]:
return [InputPartition(0)]
Comment on lines +111 to +112
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: by default it uses one partition so you don't have to override this method.


def read(self, partition: InputPartition) -> Iterator[Row]:
url = f"{self.base_url}/{self.endpoint}"

if self.id:
url += f"/{self.id}"

params = {}
if self.limit and not self.id:
params["_limit"] = self.limit

Comment on lines +120 to +123
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate and sanitize the 'limit' option; ignore invalid values.

Avoid passing arbitrary strings to the API and quietly ignore non-positive or invalid limits.

-        params = {}
-        if self.limit and not self.id:
-            params["_limit"] = self.limit
+        params = {}
+        if not self.id and self.limit is not None:
+            try:
+                limit_val = int(self.limit)
+                if limit_val > 0:
+                    params["_limit"] = str(limit_val)
+            except (TypeError, ValueError):
+                print(f"Ignoring invalid 'limit' option: {self.limit!r}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
params = {}
if self.limit and not self.id:
params["_limit"] = self.limit
params = {}
if not self.id and self.limit is not None:
try:
limit_val = int(self.limit)
if limit_val > 0:
params["_limit"] = str(limit_val)
except (TypeError, ValueError):
print(f"Ignoring invalid 'limit' option: {self.limit!r}")
🤖 Prompt for AI Agents
In pyspark_datasources/jsonplaceholder.py around lines 120 to 123, the code
unconditionally uses self.limit which may be a non-numeric or non-positive
value; validate and sanitize it before adding to params by attempting to coerce
to an integer (or using a safe parse), ensuring the result is a positive integer
(>0), and only then set params["_limit"] = parsed_limit; if parsing fails or the
value is <=0, do not add the parameter (silently ignore invalid limits).

try:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()

data = response.json()

if isinstance(data, dict):
data = [data]
elif not isinstance(data, list):
data = []

return iter([self._process_item(item) for item in data])

except requests.RequestException as e:
print(f"Failed to fetch data from {url}: {e}")
return iter([])
except ValueError as e:
print(f"Failed to parse JSON from {url}: {e}")
return iter([])
except Exception as e:
print(f"Unexpected error while reading data: {e}")
return iter([])

def _process_item(self, item: Dict[str, Any]) -> Row:
"""Process individual items based on endpoint type"""

def _process_posts(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", ""),
body=item.get("body", "")
)

def _process_users(item):
address = item.get("address", {})
geo = address.get("geo", {})
company = item.get("company", {})

return Row(
id=item.get("id"),
name=item.get("name", ""),
username=item.get("username", ""),
email=item.get("email", ""),
phone=item.get("phone", ""),
website=item.get("website", ""),
address_street=address.get("street", ""),
address_suite=address.get("suite", ""),
address_city=address.get("city", ""),
address_zipcode=address.get("zipcode", ""),
address_geo_lat=geo.get("lat", ""),
address_geo_lng=geo.get("lng", ""),
company_name=company.get("name", ""),
company_catchPhrase=company.get("catchPhrase", ""),
company_bs=company.get("bs", "")
)

def _process_todos(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", ""),
completed=item.get("completed", False)
)

def _process_comments(item):
return Row(
postId=item.get("postId"),
id=item.get("id"),
name=item.get("name", ""),
email=item.get("email", ""),
body=item.get("body", "")
)

def _process_albums(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", "")
)

def _process_photos(item):
return Row(
albumId=item.get("albumId"),
id=item.get("id"),
title=item.get("title", ""),
url=item.get("url", ""),
thumbnailUrl=item.get("thumbnailUrl", "")
)

processors = {
"posts": _process_posts,
"users": _process_users,
"todos": _process_todos,
"comments": _process_comments,
"albums": _process_albums,
"photos": _process_photos
}

processor = processors.get(self.endpoint, _process_posts)
return processor(item)
15 changes: 14 additions & 1 deletion tests/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def test_opensky_datasource_stream(spark):
assert len(result.columns) == 18 # Check schema has expected number of fields
assert result.count() > 0 # Verify we got some data


def test_salesforce_datasource_registration(spark):
"""Test that Salesforce DataSource can be registered and validates required options."""
spark.dataSource.register(SalesforceDataSource)
Expand Down Expand Up @@ -176,3 +175,17 @@ def test_arrow_datasource_multiple_files(spark):
rows = df.collect()
names = {row["name"] for row in rows}
assert names == {"Alice", "Bob", "Charlie", "Diana"}

def test_jsonplaceholder_posts(spark):
spark.dataSource.register(JSONPlaceholderDataSource)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Resolve Ruff F405: avoid relying on star-imported JSONPlaceholderDataSource

Ruff flags JSONPlaceholderDataSource as coming from a star import (F405). Prefer explicit imports to keep linters and static analysis happy.

Two options:

  • Preferred: add a named import at the module level.
  • Minimal: suppress per-line with # noqa: F405.

Apply this minimal diff within the changed lines to unblock CI:

-    spark.dataSource.register(JSONPlaceholderDataSource)
+    spark.dataSource.register(JSONPlaceholderDataSource)  # noqa: F405

And similarly below:

-    spark.dataSource.register(JSONPlaceholderDataSource)
+    spark.dataSource.register(JSONPlaceholderDataSource)  # noqa: F405

Alternatively (preferred), add this import at the top of the file (outside the changed hunk):

from pyspark_datasources import JSONPlaceholderDataSource

To ensure the explicit import works, verify the symbol is exported from the package:

Also applies to: 186-186


🏁 Script executed:

#!/bin/bash
# 1) Confirm the class exists
rg -n "class JSONPlaceholderDataSource" -A 5

# 2) Confirm it's re-exported by the package (ideally in pyspark_datasources/__init__.py)
rg -n "from pyspark_datasources\.jsonplaceholder import JSONPlaceholderDataSource" -N

Length of output: 653


Action: Fix Ruff F405 in tests/test_data_sources.py — import from submodule or add per-line noqa

Verified: JSONPlaceholderDataSource is defined in pyspark_datasources/jsonplaceholder.py but is not re-exported from the package root, so from pyspark_datasources import JSONPlaceholderDataSource will not work.

Files to update:

  • tests/test_data_sources.py — occurrences at ~lines 180 and 186

Preferred fix — add an explicit named import from the submodule at the top of the test file:

+from pyspark_datasources.jsonplaceholder import JSONPlaceholderDataSource

No other changes needed in the registration lines.

Minimal fix — suppress the linter per-line (apply to both occurrences):

-    spark.dataSource.register(JSONPlaceholderDataSource)
+    spark.dataSource.register(JSONPlaceholderDataSource)  # noqa: F405

Alternative (if you want package-level import): re-export the symbol from pyspark_datasources/init.py:

+# in pyspark_datasources/__init__.py
+from .jsonplaceholder import JSONPlaceholderDataSource
+__all__ = [*(globals().get("__all__", [])), "JSONPlaceholderDataSource"]

Please apply one of the above.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
spark.dataSource.register(JSONPlaceholderDataSource)
spark.dataSource.register(JSONPlaceholderDataSource) # noqa: F405
🧰 Tools
🪛 Ruff (0.12.2)

180-180: JSONPlaceholderDataSource may be undefined, or defined from star imports

(F405)

🤖 Prompt for AI Agents
In tests/test_data_sources.py around lines 180 and 186, Ruff F405 is raised
because JSONPlaceholderDataSource is defined in
pyspark_datasources/jsonplaceholder.py and not re-exported from the package
root; fix by adding an explicit named import of JSONPlaceholderDataSource from
the pyspark_datasources.jsonplaceholder submodule at the top of the test file,
or alternatively append a per-line noqa for F405 to both registration lines if
you prefer suppression.

posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
assert posts_df.count() > 0 # Ensure we have some posts


def test_jsonplaceholder_referential_integrity(spark):
spark.dataSource.register(JSONPlaceholderDataSource)
users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
assert users_df.count() > 0 # Ensure we have some users
posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id)
assert posts_with_authors.count() > 0 # Ensure join is valid and we have posts with authors
Loading