Skip to content

Conversation

@chanukyapekala
Copy link
Contributor

@chanukyapekala chanukyapekala commented Jul 22, 2025

Add JSONPlaceholder Data Source

Summary

Adds a new data source for the JSONPlaceholder API, providing a simple way to test and learn PySpark custom data sources with realistic fake data.

Features

  • ✅ Support for all 6 JSONPlaceholder endpoints (posts, users, todos, comments, albums, photos)
  • ✅ Configurable options (endpoint, limit, id)
  • ✅ Proper schema definitions for each endpoint with flattened nested data
  • ✅ Comprehensive error handling
  • ✅ Full test coverage
  • ✅ Complete documentation with examples

Use Cases

  • Learning: Perfect for teaching PySpark custom data sources
  • Testing: Reliable fake data for testing data pipelines
  • Prototyping: Quick setup without database dependencies
  • Database Testing: Test joins with relational data (users, posts, comments, albums, photos, todos)

Example Usage

spark.dataSource.register(JSONPlaceholderDataSource)

# Read posts (default)
df = spark.read.format("jsonplaceholder").load()

# Read users with limit
df = spark.read.format("jsonplaceholder").option("endpoint", "users").option("limit", "5").load()

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

* **New Features**
  * Added a JSONPlaceholder data source for Spark (posts, users, todos, comments, albums, photos) and exposed it in the package API.
* **Documentation**
  * Updated site navigation and added docs entries for the JSONPlaceholder data source; added a Data Sources table row (and an additional Salesforce entry).
* **Tests**
  * Added tests for fetching posts and verifying referential integrity between users and posts.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

@coderabbitai
Copy link

coderabbitai bot commented Jul 22, 2025

Walkthrough

Adds a new JSONPlaceholder PySpark data source and reader, exposes it in the package API, updates docs and mkdocs navigation, and adds two tests validating reads and referential integrity.

Changes

Cohort / File(s) Change Summary
Data Source Implementation
pyspark_datasources/jsonplaceholder.py
New file implementing JSONPlaceholderDataSource and JSONPlaceholderReader to fetch JSONPlaceholder endpoints (posts, users, todos, comments, albums, photos), handle optional id/_limit, normalize responses, and convert items into Spark Rows (including nested user/address/company fields).
Package API
pyspark_datasources/__init__.py
Import added to expose JSONPlaceholderDataSource from the package public API.
Documentation Page
docs/datasources/jsonplaceholder.md
New documentation file referencing the JSONPlaceholderDataSource class.
Documentation Index
docs/index.md
Data Sources table updated with new rows: JSONPlaceHolder (jsonplaceholder) and a Salesforce entry (salesforce) with descriptions and dependencies.
MkDocs Navigation
mkdocs.yml
Navigation updated to include datasources/jsonplaceholder.md under Data Sources.
Tests
tests/test_data_sources.py
Two tests added: test_jsonplaceholder_posts (reads posts endpoint and asserts rows exist) and test_jsonplaceholder_referential_integrity (loads users and posts and asserts join returns rows).

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Spark
    participant JSONPlaceholderDataSource
    participant JSONPlaceholderReader
    participant JSONPlaceholderAPI

    User->>Spark: registerDataSource("jsonplaceholder")
    User->>Spark: spark.read.format("jsonplaceholder").option("endpoint","posts").load()
    Spark->>JSONPlaceholderDataSource: instantiate(options)
    JSONPlaceholderDataSource->>JSONPlaceholderReader: reader(schema)
    Spark->>JSONPlaceholderReader: read(partition)
    JSONPlaceholderReader->>JSONPlaceholderAPI: HTTP GET /{endpoint}[?id|_limit]
    JSONPlaceholderAPI-->>JSONPlaceholderReader: JSON payload
    JSONPlaceholderReader->>Spark: yield Rows
    Spark-->>User: DataFrame
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~40 minutes

Poem

I nibble on bytes by moon and by sun,
fetching small posts till the day is done.
Docs snug like carrots, tests hopping in line,
Spark stirs the rows — oh, how they shine! 🐇

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@chanukyapekala chanukyapekala marked this pull request as ready for review July 22, 2025 05:12
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
pyspark_datasources/__init__.py (1)

9-9: Address the unused import warning.

The import is correctly placed and necessary for exposing the data source in the package's public API. However, to address the static analysis warning, consider adding JSONPlaceholderDataSource to an __all__ list or using a redundant alias.

Option 1: Add to __all__ list (recommended):

from .jsonplaceholder import JSONPlaceholderDataSource
+
+__all__ = [
+    "FakeDataSource",
+    "GithubDataSource", 
+    "GoogleSheetsDataSource",
+    "HuggingFaceDatasets",
+    "JSONPlaceholderDataSource",
+    "KaggleDataSource",
+    "OpenSkyDataSource",
+    "SimpleJsonDataSource",
+    "StockDataSource",
+]

Option 2: Use redundant alias:

-from .jsonplaceholder import JSONPlaceholderDataSource
+from .jsonplaceholder import JSONPlaceholderDataSource as JSONPlaceholderDataSource
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ecaa405 and 28840bf.

📒 Files selected for processing (6)
  • docs/datasources/jsonplaceholder.md (1 hunks)
  • docs/index.md (1 hunks)
  • mkdocs.yml (1 hunks)
  • pyspark_datasources/__init__.py (1 hunks)
  • pyspark_datasources/jsonplaceholder.py (1 hunks)
  • tests/test_data_sources.py (1 hunks)
🧬 Code Graph Analysis (1)
pyspark_datasources/__init__.py (1)
pyspark_datasources/jsonplaceholder.py (1)
  • JSONPlaceholderDataSource (8-101)
🪛 Ruff (0.12.2)
pyspark_datasources/__init__.py

9-9: .jsonplaceholder.JSONPlaceholderDataSource imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

pyspark_datasources/jsonplaceholder.py

81-98: Use a dictionary instead of consecutive if statements

(SIM116)

🧰 Additional context used
🧬 Code Graph Analysis (1)
pyspark_datasources/__init__.py (1)
pyspark_datasources/jsonplaceholder.py (1)
  • JSONPlaceholderDataSource (8-101)
🪛 Ruff (0.12.2)
pyspark_datasources/__init__.py

9-9: .jsonplaceholder.JSONPlaceholderDataSource imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

pyspark_datasources/jsonplaceholder.py

81-98: Use a dictionary instead of consecutive if statements

(SIM116)

🔇 Additional comments (8)
docs/datasources/jsonplaceholder.md (1)

1-3: LGTM! Documentation follows project conventions.

The minimal documentation approach using mkdocstrings is consistent with other data source documentation files in the project. The comprehensive docstring in the JSONPlaceholderDataSource class provides all necessary information including usage examples, supported endpoints, and referential integrity details.

docs/index.md (1)

34-43: LGTM! Well-integrated documentation entry.

The JSONPlaceholder data source entry is properly added to the documentation table with accurate information:

  • Correct link to documentation file
  • Appropriate short name matching the data source
  • Accurate description reflecting testing and prototyping use case
  • Correct "None" dependencies

The table formatting improvements also enhance readability.

mkdocs.yml (1)

28-28: LGTM! Navigation entry properly integrated.

The JSONPlaceholder documentation is correctly added to the site navigation, following the same pattern as other data source entries.

tests/test_data_sources.py (1)

68-81: Good test coverage for the new data source.

The tests appropriately cover:

  1. Basic functionality by reading posts data
  2. Referential integrity between users and posts through joins

The test assertions are appropriate and follow the same pattern as other tests in the file.

pyspark_datasources/jsonplaceholder.py (4)

1-6: Clean and appropriate import structure.

The imports are well-organized and all necessary for the data source implementation.


8-77: Excellent documentation and class structure.

The comprehensive docstring with usage examples and relationship documentation is very helpful for users. The class initialization and name method are implemented correctly.


100-102: Reader factory method is correctly implemented.

The reader method properly creates and returns a JSONPlaceholderReader instance.


104-117: Reader initialization and partitioning are well implemented.

The constructor properly extracts options and the single partition approach is appropriate for API calls.

Copy link
Owner

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks for adding this data source!

Comment on lines 81 to 98
if endpoint == "posts":
return "userId INT, id INT, title STRING, body STRING"
elif endpoint == "users":
return ("id INT, name STRING, username STRING, email STRING, phone STRING, "
"website STRING, address_street STRING, address_suite STRING, "
"address_city STRING, address_zipcode STRING, address_geo_lat STRING, "
"address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, "
"company_bs STRING")
elif endpoint == "todos":
return "userId INT, id INT, title STRING, completed BOOLEAN"
elif endpoint == "comments":
return "postId INT, id INT, name STRING, email STRING, body STRING"
elif endpoint == "albums":
return "userId INT, id INT, title STRING"
elif endpoint == "photos":
return "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING"
else:
return "userId INT, id INT, title STRING, body STRING"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is there a way to get the schema programmatically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Its possible, but I can recommend to keep the explicit schema definition in the code rather than generating it programmatically. Explicit schemas perform better since they avoid runtime overhead from inspecting objects. They're clearer to read and review, making the code more maintainable. Since the schemas are simple and stable, and we already need to maintain the processors in _process_item, having explicit schemas provides the best balance of clarity and maintainability. At work, we used to extract schema from API itself, but due to schema changes at the API related to column renaming or type changes, we had to fallback on explicit schema and hence I wanted to go in this approach.

Comment on lines +39 to +41
Read with limit:
>>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the default number of records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not huge.

{'users': 10,
'albums': 100,
'comments': 500,
'photos': 5000,
'posts': 100,
'todos': 200}

@chanukyapekala
Copy link
Contributor Author

@allisonwang-db : I amended the review comments. Pls take a look. Thanks.

@allisonwang-db
Copy link
Owner

@chanukyapekala this looks good. Please rebase and we can trigger the CI

@chanukyapekala
Copy link
Contributor Author

@allisonwang-db please check now?

@chanukyapekala
Copy link
Contributor Author

@allisonwang-db : Would you please check and merge this?

Copy link
Owner

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

def partitions(self) -> List[InputPartition]:
return [InputPartition(0)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: by default it uses one partition so you don't have to override this method.

@allisonwang-db
Copy link
Owner

@chanukyapekala The PR looks good. Can you resolve the merge conflicts? Then we can get the PR merged.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
pyspark_datasources/jsonplaceholder.py (3)

78-95: Schema mapping via dict improves maintainability — nice refactor

This addresses the earlier feedback (SIM116) to avoid lengthy if-elif chains.


111-113: Nit: partitions() override is unnecessary.

Spark will default to one partition; keep it implicit for brevity. This also allows removing the InputPartition import (and List if unused elsewhere).

-    def partitions(self) -> List[InputPartition]:
-        return [InputPartition(0)]

Additionally (outside this range), clean up imports after removal:

# Before:
from typing import Dict, Any, List, Iterator
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

# After:
from typing import Dict, Any, Iterator
from pyspark.sql.datasource import DataSource, DataSourceReader

124-146: Good granular exception handling with executor-visible prints.

Catching RequestException/ValueError separately and using print aligns with prior feedback to surface logs in executors.

🧹 Nitpick comments (1)
pyspark_datasources/jsonplaceholder.py (1)

135-135: Avoid materializing the full list; return a generator for better memory footprint.

-            return iter([self._process_item(item) for item in data])
+            return (self._process_item(item) for item in data)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd20983 and 8c30d7e.

📒 Files selected for processing (1)
  • pyspark_datasources/jsonplaceholder.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
pyspark_datasources/jsonplaceholder.py (3)
pyspark_datasources/fake.py (2)
  • FakeDataSource (35-125)
  • FakeDataSourceReader (128-144)
pyspark_datasources/github.py (1)
  • GithubDataSource (7-45)
pyspark_datasources/stock.py (1)
  • StockDataSource (8-41)
🔇 Additional comments (2)
pyspark_datasources/jsonplaceholder.py (2)

8-69: Excellent docs, examples, and relationship guidance — LGTM

Clear examples, coverage of join relationships, and usage ergonomics look great.


114-146: Confirmed — returning pyspark.sql.Row() instances is fine (no tuple-only adapter found)

Short summary: I inspected readers and consumers in the repo. Some readers yield tuples (Fake, HuggingFace), others yield Row() (GitHub, JSONPlaceholder). There is no adapter/bridge in this codebase that requires tuple-only rows, and no code that indexes rows by position.

Relevant findings:

  • pyspark_datasources/jsonplaceholder.py — _process_item returns Row(...) and read() returns an iterator of those Rows.
  • pyspark_datasources/github.py — yields Row(...) for pull requests.
  • pyspark_datasources/fake.py — yields tuple(row) (tuple-based readers exist).
  • pyspark_datasources/huggingface.py — yields tuple([...]).
  • No adapter/bridge code found that enforces tuple-only rows; searches for row[0]/row[1]/row[2] returned no matches.

Conclusion: The JSONPlaceholderReader returning Row objects is consistent with other readers and safe to keep. No code changes required.

Comment on lines +103 to +110
def __init__(self, options: Dict[str, str]):
self.options = options
self.base_url = "https://jsonplaceholder.typicode.com"

self.endpoint = self.options.get("endpoint", "posts")
self.limit = self.options.get("limit")
self.id = self.options.get("id")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Normalize endpoint option and guard against unsupported values to keep schema/read consistent.

See proposed patch to normalize endpoint and default on unknown values (also keeps logs visible in executors via print).

-        self.endpoint = self.options.get("endpoint", "posts")
+        endpoint_opt = self.options.get("endpoint", "posts")
+        self.endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
+        if self.endpoint not in ("posts", "users", "todos", "comments", "albums", "photos"):
+            print(f"Unknown endpoint '{self.endpoint}', defaulting to 'posts'")
+            self.endpoint = "posts"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, options: Dict[str, str]):
self.options = options
self.base_url = "https://jsonplaceholder.typicode.com"
self.endpoint = self.options.get("endpoint", "posts")
self.limit = self.options.get("limit")
self.id = self.options.get("id")
def __init__(self, options: Dict[str, str]):
self.options = options
self.base_url = "https://jsonplaceholder.typicode.com"
endpoint_opt = self.options.get("endpoint", "posts")
self.endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
if self.endpoint not in ("posts", "users", "todos", "comments", "albums", "photos"):
print(f"Unknown endpoint '{self.endpoint}', defaulting to 'posts'")
self.endpoint = "posts"
self.limit = self.options.get("limit")
self.id = self.options.get("id")
🤖 Prompt for AI Agents
In pyspark_datasources/jsonplaceholder.py around lines 103 to 110, the endpoint
option is used raw which can lead to inconsistent behavior across executors;
normalize the endpoint by trimming whitespace and lowercasing it, validate it
against the supported endpoints (posts, comments, albums, photos, todos, users)
and if it is missing or unsupported default to "posts"; when defaulting or
rejecting an unsupported value emit a visible warning (use print so it appears
in executor logs) so callers know the fallback, and keep the rest of the
initialization unchanged.

Comment on lines +120 to +123
params = {}
if self.limit and not self.id:
params["_limit"] = self.limit

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate and sanitize the 'limit' option; ignore invalid values.

Avoid passing arbitrary strings to the API and quietly ignore non-positive or invalid limits.

-        params = {}
-        if self.limit and not self.id:
-            params["_limit"] = self.limit
+        params = {}
+        if not self.id and self.limit is not None:
+            try:
+                limit_val = int(self.limit)
+                if limit_val > 0:
+                    params["_limit"] = str(limit_val)
+            except (TypeError, ValueError):
+                print(f"Ignoring invalid 'limit' option: {self.limit!r}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
params = {}
if self.limit and not self.id:
params["_limit"] = self.limit
params = {}
if not self.id and self.limit is not None:
try:
limit_val = int(self.limit)
if limit_val > 0:
params["_limit"] = str(limit_val)
except (TypeError, ValueError):
print(f"Ignoring invalid 'limit' option: {self.limit!r}")
🤖 Prompt for AI Agents
In pyspark_datasources/jsonplaceholder.py around lines 120 to 123, the code
unconditionally uses self.limit which may be a non-numeric or non-positive
value; validate and sanitize it before adding to params by attempting to coerce
to an integer (or using a safe parse), ensuring the result is a positive integer
(>0), and only then set params["_limit"] = parsed_limit; if parsing fails or the
value is <=0, do not add the parameter (silently ignore invalid limits).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
pyspark_datasources/jsonplaceholder.py (1)

111-112: You can drop partitions() override; default is a single partition

Spark’s DataSourceReader defaults to one partition; keeping this method adds noise without benefit.

tests/test_data_sources.py (2)

179-183: Fix missing spark fixture parameter (test will NameError on spark)

The test uses the spark fixture but does not declare it, causing a NameError.

-def test_jsonplaceholder_posts():
+def test_jsonplaceholder_posts(spark):
     spark.dataSource.register(JSONPlaceholderDataSource)
     posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
     assert posts_df.count() > 0 # Ensure we have some posts

185-191: Fix missing spark fixture parameter (referential integrity test)

Same issue here; declare the spark fixture.

-def test_jsonplaceholder_referential_integrity():
+def test_jsonplaceholder_referential_integrity(spark):
     spark.dataSource.register(JSONPlaceholderDataSource)
     users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
     assert users_df.count() > 0 # Ensure we have some users
     posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
     posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id)
     assert posts_with_authors.count() > 0  # Ensure join is valid and we have posts with authors
🧹 Nitpick comments (5)
pyspark_datasources/jsonplaceholder.py (3)

120-123: Validate and sanitize the limit option before passing to API

Avoid sending arbitrary strings or non-positive values to the API; parse to int and ignore invalid values with a visible message.

-        params = {}
-        if self.limit and not self.id:
-            params["_limit"] = self.limit
+        params = {}
+        if not self.id and self.limit is not None:
+            try:
+                limit_val = int(self.limit)
+                if limit_val > 0:
+                    params["_limit"] = str(limit_val)
+            except (TypeError, ValueError):
+                print(f"Ignoring invalid 'limit' option: {self.limit!r}")

1-2: Modernize type hints: use built-in generics and collections.abc.Iterator

This addresses Ruff UP035/UP006 and keeps hints current for Python 3.9+.

-from typing import Dict, Any, List, Iterator
+from typing import Any
+from collections.abc import Iterator
@@
-    def __init__(self, options: Dict[str, str]):
+    def __init__(self, options: dict[str, str] | None):
@@
-    def partitions(self) -> List[InputPartition]:
+    def partitions(self) -> list[InputPartition]:
@@
-    def _process_item(self, item: Dict[str, Any]) -> Row:
+    def _process_item(self, item: dict[str, Any]) -> Row:

Also applies to: 103-104, 111-112, 147-147


41-41: Wrap example lines to satisfy line-length (E501) and improve readability

Long example lines exceed 100 chars. Breaking into multiple lines also makes doctests easier to read.

-    >>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show()
+    >>> (spark.read.format("jsonplaceholder")
+    ...       .option("endpoint", "todos")
+    ...       .option("limit", "5")
+    ...       .load()
+    ...       .show())
@@
-    >>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show()
+    >>> (spark.read.format("jsonplaceholder")
+    ...       .option("endpoint", "posts")
+    ...       .option("id", "1")
+    ...       .load()
+    ...       .show())

Also applies to: 45-45

pyspark_datasources/__init__.py (1)

11-11: Silence F401 for re-export or add all entry

This import exists to expose the symbol via the package namespace (used by tests’ star import). Either mark the import as intentionally unused, or export via all.

Minimal change (preferred if the rest of the file follows this pattern):

-from .jsonplaceholder import JSONPlaceholderDataSource
+from .jsonplaceholder import JSONPlaceholderDataSource  # noqa: F401

Alternatively, explicitly add to all (if you maintain one) to make the intent clear:

__all__ = [
    # ...existing exports...
    "JSONPlaceholderDataSource",
]
tests/test_data_sources.py (1)

181-181: Optional: use small limits in tests to reduce network time and flakiness

The API defaults can be large; specifying small limits speeds up CI and reduces flakes.

-    posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
+    posts_df = (spark.read.format("jsonplaceholder")
+                .option("endpoint", "posts")
+                .option("limit", "5")
+                .load())
@@
-    users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
+    users_df = (spark.read.format("jsonplaceholder")
+                .option("endpoint", "users")
+                .option("limit", "10")
+                .load())
@@
-    posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
+    posts_df = (spark.read.format("jsonplaceholder")
+                .option("endpoint", "posts")
+                .option("limit", "10")
+                .load())

Also applies to: 187-187, 189-189

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c30d7e and d0b9c19.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (6)
  • docs/datasources/jsonplaceholder.md (1 hunks)
  • docs/index.md (1 hunks)
  • mkdocs.yml (1 hunks)
  • pyspark_datasources/__init__.py (1 hunks)
  • pyspark_datasources/jsonplaceholder.py (1 hunks)
  • tests/test_data_sources.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • docs/datasources/jsonplaceholder.md
🚧 Files skipped from review as they are similar to previous changes (2)
  • mkdocs.yml
  • docs/index.md
🧰 Additional context used
🧬 Code Graph Analysis (3)
pyspark_datasources/__init__.py (1)
pyspark_datasources/jsonplaceholder.py (1)
  • JSONPlaceholderDataSource (8-97)
pyspark_datasources/jsonplaceholder.py (4)
pyspark_datasources/github.py (1)
  • GithubDataSource (7-45)
pyspark_datasources/weather.py (1)
  • WeatherDataSource (9-72)
pyspark_datasources/stock.py (1)
  • StockDataSource (8-41)
pyspark_datasources/arrow.py (1)
  • ArrowDataSource (10-131)
tests/test_data_sources.py (2)
tests/test_google_sheets.py (1)
  • spark (9-12)
pyspark_datasources/jsonplaceholder.py (2)
  • JSONPlaceholderDataSource (8-97)
  • read (114-145)
🪛 Ruff (0.12.2)
pyspark_datasources/__init__.py

11-11: .jsonplaceholder.JSONPlaceholderDataSource imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

pyspark_datasources/jsonplaceholder.py

1-1: Import from collections.abc instead: Iterator

Import from collections.abc

(UP035)


1-1: typing.Dict is deprecated, use dict instead

(UP035)


1-1: typing.List is deprecated, use list instead

(UP035)


41-41: Line too long (107 > 100)

(E501)


45-45: Line too long (104 > 100)

(E501)


103-103: Use dict instead of Dict for type annotation

Replace with dict

(UP006)


111-111: Use list instead of List for type annotation

Replace with list

(UP006)


147-147: Use dict instead of Dict for type annotation

Replace with dict

(UP006)

tests/test_data_sources.py

180-180: JSONPlaceholderDataSource may be undefined, or defined from star imports

(F405)


186-186: JSONPlaceholderDataSource may be undefined, or defined from star imports

(F405)

Comment on lines +93 to +95
endpoint = self.options.get("endpoint", "posts")
return schemas.get(endpoint, schemas["posts"])

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Normalize endpoint and align schema()/reader() behavior; add visible fallback for unknown values

Without normalization, schema() falls back to posts for unknown endpoints while reader() will hit the unknown path and return empty, producing a confusing empty DF with a posts schema. Normalize the option once and use a visible fallback.

Apply this diff:

@@
-        endpoint = self.options.get("endpoint", "posts")
-        return schemas.get(endpoint, schemas["posts"])
+        endpoint_opt = self.options.get("endpoint", "posts")
+        endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
+        return schemas.get(endpoint, schemas["posts"])
@@
-        self.endpoint = self.options.get("endpoint", "posts")
+        endpoint_opt = self.options.get("endpoint", "posts")
+        self.endpoint = endpoint_opt.lower() if isinstance(endpoint_opt, str) else "posts"
+        if self.endpoint not in ("posts", "users", "todos", "comments", "albums", "photos"):
+            print(f"Unknown endpoint '{self.endpoint}', defaulting to 'posts'")
+            self.endpoint = "posts"

Also applies to: 107-110

🤖 Prompt for AI Agents
pyspark_datasources/jsonplaceholder.py lines ~93-95 and ~107-110: normalize the
"endpoint" option once (e.g., endpoint =
self.options.get("endpoint","posts").strip().lower()), then check if endpoint is
in the schemas mapping; if not, emit a visible fallback (e.g., warning log via
self.spark._jvm or Python logger) and set endpoint = "posts". Use this
normalized/validated endpoint variable for both schema() and reader() paths so
unknown values consistently fall back to the posts schema (and the reader
returns rows for posts rather than an empty DF).

@chanukyapekala
Copy link
Contributor Author

@allisonwang-db : Merge conflicts resolved. Please merge.

@allisonwang-db
Copy link
Owner

Thanks for the contribution. Waiting for CI to pass.

@allisonwang-db
Copy link
Owner

@chanukyapekala looks like the tests are failing. could you fix them ?

=========================== short test summary info ============================
FAILED tests/test_data_sources.py::test_jsonplaceholder_posts - AttributeError: 'function' object has no attribute 'dataSource'
FAILED tests/test_data_sources.py::test_jsonplaceholder_referential_integrity - AttributeError: 'function' object has no attribute 'dataSource'

@chanukyapekala
Copy link
Contributor Author

@allisonwang-db : I tested locally and tests were passed. Pls check with the CI.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/test_data_sources.py (1)

181-181: Reduce network load and flakiness by limiting rows in tests

To make tests faster and more reliable against transient network issues, limit the number of rows fetched from the external API.

Apply these diffs:

-    posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
+    posts_df = (
+        spark.read.format("jsonplaceholder")
+        .option("endpoint", "posts")
+        .option("limit", "10")
+        .load()
+    )
-    users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
+    users_df = (
+        spark.read.format("jsonplaceholder")
+        .option("endpoint", "users")
+        .option("limit", "10")
+        .load()
+    )
-    posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
+    posts_df = (
+        spark.read.format("jsonplaceholder")
+        .option("endpoint", "posts")
+        .option("limit", "10")
+        .load()
+    )

Also applies to: 187-187, 189-189

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these settings in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between d0b9c19 and 18316ed.

📒 Files selected for processing (1)
  • tests/test_data_sources.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/test_data_sources.py (1)
pyspark_datasources/jsonplaceholder.py (2)
  • JSONPlaceholderDataSource (8-97)
  • read (114-145)
🪛 Ruff (0.12.2)
tests/test_data_sources.py

180-180: JSONPlaceholderDataSource may be undefined, or defined from star imports

(F405)


186-186: JSONPlaceholderDataSource may be undefined, or defined from star imports

(F405)

🔇 Additional comments (2)
tests/test_data_sources.py (2)

179-183: LGTM: fixture injection fix resolves prior AttributeError

Using the spark fixture parameter here is correct and should address the previous "'function' object has no attribute 'dataSource'" failure.


185-191: LGTM: referential integrity check is correct

Joining posts on users via posts.userId == users.id is the right relationship for JSONPlaceholder data. Assertion ensures the join yields results.

assert names == {"Alice", "Bob", "Charlie", "Diana"}

def test_jsonplaceholder_posts(spark):
spark.dataSource.register(JSONPlaceholderDataSource)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Resolve Ruff F405: avoid relying on star-imported JSONPlaceholderDataSource

Ruff flags JSONPlaceholderDataSource as coming from a star import (F405). Prefer explicit imports to keep linters and static analysis happy.

Two options:

  • Preferred: add a named import at the module level.
  • Minimal: suppress per-line with # noqa: F405.

Apply this minimal diff within the changed lines to unblock CI:

-    spark.dataSource.register(JSONPlaceholderDataSource)
+    spark.dataSource.register(JSONPlaceholderDataSource)  # noqa: F405

And similarly below:

-    spark.dataSource.register(JSONPlaceholderDataSource)
+    spark.dataSource.register(JSONPlaceholderDataSource)  # noqa: F405

Alternatively (preferred), add this import at the top of the file (outside the changed hunk):

from pyspark_datasources import JSONPlaceholderDataSource

To ensure the explicit import works, verify the symbol is exported from the package:

Also applies to: 186-186


🏁 Script executed:

#!/bin/bash
# 1) Confirm the class exists
rg -n "class JSONPlaceholderDataSource" -A 5

# 2) Confirm it's re-exported by the package (ideally in pyspark_datasources/__init__.py)
rg -n "from pyspark_datasources\.jsonplaceholder import JSONPlaceholderDataSource" -N

Length of output: 653


Action: Fix Ruff F405 in tests/test_data_sources.py — import from submodule or add per-line noqa

Verified: JSONPlaceholderDataSource is defined in pyspark_datasources/jsonplaceholder.py but is not re-exported from the package root, so from pyspark_datasources import JSONPlaceholderDataSource will not work.

Files to update:

  • tests/test_data_sources.py — occurrences at ~lines 180 and 186

Preferred fix — add an explicit named import from the submodule at the top of the test file:

+from pyspark_datasources.jsonplaceholder import JSONPlaceholderDataSource

No other changes needed in the registration lines.

Minimal fix — suppress the linter per-line (apply to both occurrences):

-    spark.dataSource.register(JSONPlaceholderDataSource)
+    spark.dataSource.register(JSONPlaceholderDataSource)  # noqa: F405

Alternative (if you want package-level import): re-export the symbol from pyspark_datasources/init.py:

+# in pyspark_datasources/__init__.py
+from .jsonplaceholder import JSONPlaceholderDataSource
+__all__ = [*(globals().get("__all__", [])), "JSONPlaceholderDataSource"]

Please apply one of the above.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
spark.dataSource.register(JSONPlaceholderDataSource)
spark.dataSource.register(JSONPlaceholderDataSource) # noqa: F405
🧰 Tools
🪛 Ruff (0.12.2)

180-180: JSONPlaceholderDataSource may be undefined, or defined from star imports

(F405)

🤖 Prompt for AI Agents
In tests/test_data_sources.py around lines 180 and 186, Ruff F405 is raised
because JSONPlaceholderDataSource is defined in
pyspark_datasources/jsonplaceholder.py and not re-exported from the package
root; fix by adding an explicit named import of JSONPlaceholderDataSource from
the pyspark_datasources.jsonplaceholder submodule at the top of the test file,
or alternatively append a per-line noqa for F405 to both registration lines if
you prefer suppression.

@chanukyapekala
Copy link
Contributor Author

@allisonwang-db Can you please check this PR ?

@allisonwang-db allisonwang-db merged commit 7c06061 into allisonwang-db:master Aug 19, 2025
1 of 5 checks passed
@chanukyapekala
Copy link
Contributor Author

chanukyapekala commented Aug 22, 2025 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants