Skip to content

Conversation

@allisonwang-db
Copy link
Owner

@allisonwang-db allisonwang-db commented Oct 17, 2025

This commit introduces a new data source for reading data from the Spotify API.

The data source currently supports reading a user's saved tracks. It uses OAuth 2.0 with a refresh token for authentication.

The following files have been added:

  • pyspark_datasources/spotify.py: The implementation of the data source.
  • docs/datasources/spotify.md: Documentation for the data source.
  • examples/spotify_example.py: An example of how to use the data source.
  • tests/test_spotify.py: Unit tests for the data source.

Summary by CodeRabbit

  • New Features

    • Added Spotify data source for PySpark to load a user’s saved tracks via OAuth 2.0.
  • Documentation

    • Added a guide covering Spotify OAuth setup, usage examples, and expected track schema.
  • Examples

    • Added a runnable example demonstrating loading and displaying Spotify tracks with Spark.
  • Tests

    • Added unit tests for authentication, pagination, and error handling for the Spotify data source.
  • Chores

    • Updated .gitignore to ignore editor settings.

This commit introduces a new data source for reading data from the Spotify API.

The data source currently supports reading a user's saved tracks.
It uses OAuth 2.0 with a refresh token for authentication.

The following files have been added:
- pyspark_datasources/spotify.py: The implementation of the data source.
- docs/datasources/spotify.md: Documentation for the data source.
- examples/spotify_example.py: An example of how to use the data source.
- tests/test_spotify.py: Unit tests for the data source.
@coderabbitai
Copy link

coderabbitai bot commented Oct 17, 2025

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "feat: Add Spotify data source" directly and accurately summarizes the primary objective of the changeset. The PR introduces a complete new Spotify data source implementation across multiple files (pyspark_datasources/spotify.py, documentation, examples, and tests), and the title clearly captures this main change. The title is concise, specific, and uses conventional commit format, making it immediately clear to developers reviewing history what the changeset accomplishes without requiring them to examine file details.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/spotify-datasource

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (9)
pyspark_datasources/spotify.py (5)

96-118: Consider basic rate-limit handling and retries.

Spotify can return 429. Add lightweight retry/backoff to improve resilience.

Example using Retry (keeps Sessions local to preserve picklability):

+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
@@
     def read(self, _partition):
-        access_token = self._get_access_token()
+        access_token = self._get_access_token()
         headers = {"Authorization": f"Bearer {access_token}"}
@@
-            url = "https://api.spotify.com/v1/me/tracks"
-            while url:
-                response = requests.get(url, headers=headers)
+            url = "https://api.spotify.com/v1/me/tracks"
+            retry = Retry(
+                total=5,
+                backoff_factor=0.5,
+                status_forcelist=(429, 500, 502, 503, 504),
+                allowed_methods=("GET",),
+                respect_retry_after_header=True,
+            )
+            with requests.Session() as s:
+                s.headers.update(headers)
+                s.mount("https://", HTTPAdapter(max_retries=retry))
+                while url:
+                    response = s.get(url, timeout=(3.05, 30))
                     response.raise_for_status()
                     data = response.json()
@@
-                url = data.get("next")
+                    url = data.get("next")

If adopted, mirror the retry for the token call in _get_access_token. Based on learnings


79-81: Satisfy linter: mark unused parameters.

The interface requires these params but they’re unused. Prefix with underscore to appease Ruff.

-    def reader(self, schema):
+    def reader(self, _schema):
         return SpotifyReader(self.options)
@@
-    def read(self, partition):
+    def read(self, _partition):
         access_token = self._get_access_token()

Also applies to: 96-96


83-96: Provide a default single partition.

Implement partitions() to comply with guideline and enable future parallelism.

 class SpotifyReader(DataSourceReader):
     def __init__(self, options):
         self.options = options
@@
         if not all([self.client_id, self.client_secret, self.refresh_token]):
             raise ValueError(
                 "spotify.client.id, spotify.client.secret, and spotify.refresh.token must be specified in options"
             )
 
+    def partitions(self):
+        # Single-partition source; can later split by offset/range if needed.
+        return [None]

64-77: Consider TimestampType for added_at.

Using TimestampType improves type fidelity vs. string (ISO-8601).

-from pyspark.sql.types import (
+from pyspark.sql.types import (
     StructType,
     StructField,
     StringType,
     IntegerType,
     LongType,
     ArrayType,
+    TimestampType,
 )
@@
-                StructField("added_at", StringType(), True),
+                StructField("added_at", TimestampType(), True),

If you change the schema, parse the string to datetime or let Spark cast downstream; document the behavior.


91-95: Keep error messages concise; avoid long literals in raise sites.

Minor style: shorten messages or centralize them if reused.

Also applies to: 119-119

docs/datasources/spotify.md (1)

61-85: Document options, defaults, and add spark.stop() for completeness.

Per guidelines, include an Options section and stop the SparkSession.

@@
 ## Usage
@@
 Once you have your credentials, you can use the Spotify data source in PySpark.
 
+### Options
+
+- spotify.client.id (string, required)
+- spotify.client.secret (string, required)
+- spotify.refresh.token (string, required)
+- type (string, default: "tracks"; supported: "tracks")
+- Note: Network requests use timeouts; handle 429 rate limits by retry/backoff.
+
 ```python
 from pyspark.sql import SparkSession
 
 # Create a SparkSession
 spark = SparkSession.builder.appName("SpotifyExample").getOrCreate()
@@
 # Show the data
 df.show()
+
+# Stop Spark when done
+spark.stop()
Additionally, consider a brief “Error cases” section (missing credentials, 401/429, network timeouts) and a note on partitioning (currently single partition) and Arrow usage.

</blockquote></details>
<details>
<summary>tests/test_spotify.py (2)</summary><blockquote>

`72-75`: **Add assertions for parsed fields to harden coverage.**

Verify artists/album/duration/popularity extraction.


```diff
     assert len(rows) == 2
     assert rows[0]["name"] == "Test Track 1"
     assert rows[1]["name"] == "Test Track 2"
+    assert rows[0]["artists"] == ["Artist 1"]
+    assert rows[0]["album"] == "Album 1"
+    assert rows[0]["duration_ms"] == 200000
+    assert rows[0]["popularity"] == 50

76-79: Add a test for unsupported type.

Ensure a clear error is raised when an unknown type is requested.

def test_spotify_reader_unsupported_type():
    options = {
        "spotify.client.id": "test_id",
        "spotify.client.secret": "test_secret",
        "spotify.refresh.token": "test_refresh",
        "type": "playlists",
    }
    with pytest.raises(ValueError, match="Unsupported type"):
        SpotifyReader(options).read(None).__iter__().__next__()

Also applies to: 90-97

examples/spotify_example.py (1)

2-3: Avoid hardcoded secrets; catch specific exceptions; stop Spark.

Use env vars for creds, handle request errors explicitly, and always stop Spark.

-from pyspark.sql import SparkSession
+from pyspark.sql import SparkSession
+import os
+import requests
@@
-# Replace with your actual credentials
-CLIENT_ID = "YOUR_CLIENT_ID"
-CLIENT_SECRET = "YOUR_CLIENT_SECRET"
-REFRESH_TOKEN = "YOUR_REFRESH_TOKEN"
+# Replace with your actual credentials or set environment variables
+CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "YOUR_CLIENT_ID")
+CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "YOUR_CLIENT_SECRET")
+REFRESH_TOKEN = os.getenv("SPOTIFY_REFRESH_TOKEN", "YOUR_REFRESH_TOKEN")
@@
-try:
+try:
     df = (
         spark.read.format("spotify")
         .option("spotify.client.id", CLIENT_ID)
         .option("spotify.client.secret", CLIENT_SECRET)
         .option("spotify.refresh.token", REFRESH_TOKEN)
         .option("type", "tracks")
         .load()
     )
@@
-except Exception as e:
+except (requests.exceptions.RequestException, ValueError) as e:
     print(f"An error occurred: {e}")
     print("Please ensure you have replaced the placeholder credentials with your actual credentials.")
+finally:
+    spark.stop()

Also applies to: 15-19, 21-39

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5788049 and ba0af03.

📒 Files selected for processing (4)
  • docs/datasources/spotify.md (1 hunks)
  • examples/spotify_example.py (1 hunks)
  • pyspark_datasources/spotify.py (1 hunks)
  • tests/test_spotify.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
pyspark_datasources/!(__init__).py

📄 CodeRabbit inference engine (CLAUDE.md)

pyspark_datasources/!(__init__).py: All data source classes must inherit from Spark's DataSource base class
Implement schema() in each data source to define the schema using PySpark StructType
Implement reader(schema) for batch reads; if streaming is supported, implement streamReader() or simpleStreamReader() accordingly
If batch writing is supported, implement writer(schema, overwrite); if streaming writes are supported, implement streamWriter(schema, overwrite)
All classes used by the data sources (including readers/writers) must be pickle-serializable
Use only standard Spark SQL data types in schemas
Implement robust exception handling in data source read/write paths
For streaming sources, ensure resources are cleaned up (e.g., implement stop()/commit() as appropriate)
Prefer returning pyarrow.RecordBatch from readers for better serialization performance
Implement partitions() on DataSourceReader to enable parallel processing when beneficial
Defer expensive operations until read time (lazy evaluation) in data source implementations
Include comprehensive class docstrings for each data source with: brief description and Name: "format_name", an Options section (parameters/types/defaults), and Examples (registration and basic usage)
In docstrings, include schema output (e.g., df.printSchema()) for clarity
In docstrings, document error cases (e.g., missing files or invalid options)
In docstrings, document the partitioning strategy used to improve performance
In docstrings, document Arrow optimizations (how Arrow is used to transmit data)
DataSourceReader.read must return tuples, Rows, or pyarrow.RecordBatch; implement accordingly

Files:

  • pyspark_datasources/spotify.py
{pyspark_datasources,tests}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

{pyspark_datasources,tests}/**/*.py: When specifying file paths with Spark, use load("/path") instead of option("path", "/path")
Format code with Ruff (ruff format) and fix/lint with ruff check

Files:

  • pyspark_datasources/spotify.py
  • tests/test_spotify.py
🧬 Code graph analysis (2)
examples/spotify_example.py (1)
pyspark_datasources/spotify.py (2)
  • SpotifyDataSource (15-80)
  • read (96-119)
tests/test_spotify.py (1)
pyspark_datasources/spotify.py (3)
  • SpotifyReader (83-128)
  • reader (79-80)
  • read (96-119)
🪛 LanguageTool
docs/datasources/spotify.md

[grammar] ~91-~91: There might be a mistake here.
Context: ...: | Field | Type | |-------------|---------------------| | ...

(QB_NEW_EN)


[grammar] ~92-~92: There might be a mistake here.
Context: ... | |-------------|---------------------| | id | string | | ...

(QB_NEW_EN)


[grammar] ~93-~93: There might be a mistake here.
Context: ...-| | id | string | | name | string | | ...

(QB_NEW_EN)


[grammar] ~94-~94: There might be a mistake here.
Context: ... | | name | string | | artists | array<string> | | ...

(QB_NEW_EN)


[grammar] ~95-~95: There might be a mistake here.
Context: ... | | artists | array<string> | | album | string | | ...

(QB_NEW_EN)


[grammar] ~96-~96: There might be a mistake here.
Context: ... | | album | string | | duration_ms | long | | ...

(QB_NEW_EN)


[grammar] ~97-~97: There might be a mistake here.
Context: ... | | duration_ms | long | | popularity | integer | | ...

(QB_NEW_EN)


[grammar] ~98-~98: There might be a mistake here.
Context: ... | | popularity | integer | | added_at | string |

(QB_NEW_EN)

🪛 Ruff (0.14.0)
pyspark_datasources/spotify.py

79-79: Unused method argument: schema

(ARG002)


92-94: Avoid specifying long messages outside the exception class

(TRY003)


96-96: Unused method argument: partition

(ARG002)


103-103: Probable use of requests call without timeout

(S113)


119-119: Avoid specifying long messages outside the exception class

(TRY003)


122-122: Probable use of requests call without timeout

(S113)

examples/spotify_example.py

17-17: Possible hardcoded password assigned to: "CLIENT_SECRET"

(S105)


18-18: Possible hardcoded password assigned to: "REFRESH_TOKEN"

(S105)


37-37: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: test (3.12)
  • GitHub Check: test (3.9)
  • GitHub Check: test (3.11)
  • GitHub Check: test (3.10)

Comment on lines +37 to +39
>>> from pyspark_datasources import SpotifyDataSource
>>> spark.dataSource.register(SpotifyDataSource)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix import path in docstring example.

The example should import from the module, not the package root.

-    >>> from pyspark_datasources import SpotifyDataSource
+    >>> from pyspark_datasources.spotify import SpotifyDataSource
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
>>> from pyspark_datasources import SpotifyDataSource
>>> spark.dataSource.register(SpotifyDataSource)
>>> from pyspark_datasources.spotify import SpotifyDataSource
>>> spark.dataSource.register(SpotifyDataSource)
🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 37 to 39, the docstring example
imports SpotifyDataSource from the package root; update the example to import
from the module path (from pyspark_datasources.spotify import SpotifyDataSource)
so the example reflects the correct import, leaving the
spark.dataSource.register(...) line unchanged.

Comment on lines +103 to +106
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
for item in data["items"]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add timeouts to external HTTP calls to avoid indefinite hangs.

Both requests lack timeouts. This is a reliability and production-safety issue.

Apply:

@@
-                response = requests.get(url, headers=headers)
+                response = requests.get(url, headers=headers, timeout=(3.05, 30))
@@
-        response = requests.post(
+        response = requests.post(
             "https://accounts.spotify.com/api/token",
             data={"grant_type": "refresh_token", "refresh_token": self.refresh_token},
             auth=(self.client_id, self.client_secret),
-        )
+            timeout=(3.05, 30),
+        )

Optionally, hoist the timeout to a module constant (e.g., DEFAULT_TIMEOUT) for reuse. Based on learnings

Also applies to: 122-128

🧰 Tools
🪛 Ruff (0.14.0)

103-103: Probable use of requests call without timeout

(S113)

🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 103-106 (and also apply the same
fix to 122-128), the requests calls lack timeouts causing potential indefinite
hangs; define a module-level DEFAULT_TIMEOUT (e.g., DEFAULT_TIMEOUT = 10) near
the top of the file and pass timeout=DEFAULT_TIMEOUT to each
requests.get/requests.post call on those lines (and any other external HTTP
calls in the file) so the calls fail fast instead of hanging.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
pyspark_datasources/spotify.py (3)

37-39: Fix import path in docstring example.

The example imports from the package root instead of the module path.

Apply:

-    >>> from pyspark_datasources import SpotifyDataSource
+    >>> from pyspark_datasources.spotify import SpotifyDataSource

103-103: Add timeout to HTTP call to prevent indefinite hangs.

Missing timeout is a reliability issue that can cause jobs to hang indefinitely.


122-128: Add timeout to HTTP call to prevent indefinite hangs.

Missing timeout is a reliability issue.

Also consider validating the response structure:

         response.raise_for_status()
-        return response.json()["access_token"]
+        token_data = response.json()
+        if "access_token" not in token_data:
+            raise ValueError("Spotify API did not return an access token")
+        return token_data["access_token"]
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba0af03 and c150969.

📒 Files selected for processing (3)
  • .gitignore (1 hunks)
  • docs/datasources/spotify.md (1 hunks)
  • pyspark_datasources/spotify.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • .gitignore
🚧 Files skipped from review as they are similar to previous changes (1)
  • docs/datasources/spotify.md
🧰 Additional context used
📓 Path-based instructions (2)
pyspark_datasources/!(__init__).py

📄 CodeRabbit inference engine (CLAUDE.md)

pyspark_datasources/!(__init__).py: All data source classes must inherit from Spark's DataSource base class
Implement schema() in each data source to define the schema using PySpark StructType
Implement reader(schema) for batch reads; if streaming is supported, implement streamReader() or simpleStreamReader() accordingly
If batch writing is supported, implement writer(schema, overwrite); if streaming writes are supported, implement streamWriter(schema, overwrite)
All classes used by the data sources (including readers/writers) must be pickle-serializable
Use only standard Spark SQL data types in schemas
Implement robust exception handling in data source read/write paths
For streaming sources, ensure resources are cleaned up (e.g., implement stop()/commit() as appropriate)
Prefer returning pyarrow.RecordBatch from readers for better serialization performance
Implement partitions() on DataSourceReader to enable parallel processing when beneficial
Defer expensive operations until read time (lazy evaluation) in data source implementations
Include comprehensive class docstrings for each data source with: brief description and Name: "format_name", an Options section (parameters/types/defaults), and Examples (registration and basic usage)
In docstrings, include schema output (e.g., df.printSchema()) for clarity
In docstrings, document error cases (e.g., missing files or invalid options)
In docstrings, document the partitioning strategy used to improve performance
In docstrings, document Arrow optimizations (how Arrow is used to transmit data)
DataSourceReader.read must return tuples, Rows, or pyarrow.RecordBatch; implement accordingly

Files:

  • pyspark_datasources/spotify.py
{pyspark_datasources,tests}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

{pyspark_datasources,tests}/**/*.py: When specifying file paths with Spark, use load("/path") instead of option("path", "/path")
Format code with Ruff (ruff format) and fix/lint with ruff check

Files:

  • pyspark_datasources/spotify.py
🪛 Ruff (0.14.1)
pyspark_datasources/spotify.py

79-79: Unused method argument: schema

(ARG002)


92-94: Avoid specifying long messages outside the exception class

(TRY003)


96-96: Unused method argument: partition

(ARG002)


103-103: Probable use of requests call without timeout

(S113)


119-119: Avoid specifying long messages outside the exception class

(TRY003)


122-122: Probable use of requests call without timeout

(S113)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: test (3.12)
  • GitHub Check: test (3.10)
  • GitHub Check: test (3.11)
🔇 Additional comments (4)
pyspark_datasources/spotify.py (4)

1-12: LGTM! Imports are appropriate.

All necessary imports are present for the Spotify data source implementation.


63-77: LGTM! Schema definition is correct.

The schema appropriately models Spotify track data with correct types.


79-80: LGTM! Reader instantiation is correct.

The unused schema parameter is part of the interface contract and acceptable here since the schema is fixed.


84-94: LGTM! Option validation is appropriate.

The credential validation ensures required options are present before attempting API calls.

Comment on lines +96 to +119
def read(self, partition):
access_token = self._get_access_token()
headers = {"Authorization": f"Bearer {access_token}"}

if self.type == "tracks":
url = "https://api.spotify.com/v1/me/tracks"
while url:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
for item in data["items"]:
track = item["track"]
yield Row(
id=track["id"],
name=track["name"],
artists=[artist["name"] for artist in track["artists"]],
album=track["album"]["name"],
duration_ms=track["duration_ms"],
popularity=track["popularity"],
added_at=item["added_at"],
)
url = data.get("next")
else:
raise ValueError(f"Unsupported type: {self.type}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add validation for API response structure to prevent KeyError.

The code directly accesses nested JSON fields without validation. If the Spotify API returns a malformed response or missing fields, this will raise KeyError and crash the Spark job.

Consider adding defensive checks:

                 for item in data["items"]:
                     track = item["track"]
+                    # Validate required fields
+                    required_fields = ["id", "name", "artists", "album", "duration_ms", "popularity"]
+                    if not all(field in track for field in required_fields):
+                        continue  # Skip malformed tracks
+                    if "name" not in track.get("album", {}):
+                        continue
+                    
                     yield Row(
                         id=track["id"],
                         name=track["name"],
                         artists=[artist["name"] for artist in track["artists"]],
                         album=track["album"]["name"],
                         duration_ms=track["duration_ms"],
                         popularity=track["popularity"],
-                        added_at=item["added_at"],
+                        added_at=item.get("added_at"),
                     )

Alternatively, wrap the access in a try-except block to handle malformed responses gracefully.

🧰 Tools
🪛 Ruff (0.14.1)

96-96: Unused method argument: partition

(ARG002)


103-103: Probable use of requests call without timeout

(S113)


119-119: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 96 to 119, the code assumes the
Spotify API response always contains nested keys (e.g., "items", item["track"],
track["id"]) which can raise KeyError on malformed responses; update the read
method to validate the top-level response (ensure response.ok and presence of
"items"), iterate defensively using dict.get(...) for safe access, wrap per-item
parsing in a try/except to catch KeyError/TypeError and log a warning (or
increment a metric) then skip the malformed item, and ensure you safely retrieve
the next page with data.get("next") so the loop continues gracefully instead of
crashing the Spark job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants