-
Notifications
You must be signed in to change notification settings - Fork 22
feat: Add Spotify data source #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This commit introduces a new data source for reading data from the Spotify API. The data source currently supports reading a user's saved tracks. It uses OAuth 2.0 with a refresh token for authentication. The following files have been added: - pyspark_datasources/spotify.py: The implementation of the data source. - docs/datasources/spotify.md: Documentation for the data source. - examples/spotify_example.py: An example of how to use the data source. - tests/test_spotify.py: Unit tests for the data source.
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (9)
pyspark_datasources/spotify.py (5)
96-118: Consider basic rate-limit handling and retries.Spotify can return 429. Add lightweight retry/backoff to improve resilience.
Example using Retry (keeps Sessions local to preserve picklability):
+from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry @@ def read(self, _partition): - access_token = self._get_access_token() + access_token = self._get_access_token() headers = {"Authorization": f"Bearer {access_token}"} @@ - url = "https://api.spotify.com/v1/me/tracks" - while url: - response = requests.get(url, headers=headers) + url = "https://api.spotify.com/v1/me/tracks" + retry = Retry( + total=5, + backoff_factor=0.5, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=("GET",), + respect_retry_after_header=True, + ) + with requests.Session() as s: + s.headers.update(headers) + s.mount("https://", HTTPAdapter(max_retries=retry)) + while url: + response = s.get(url, timeout=(3.05, 30)) response.raise_for_status() data = response.json() @@ - url = data.get("next") + url = data.get("next")If adopted, mirror the retry for the token call in
_get_access_token. Based on learnings
79-81: Satisfy linter: mark unused parameters.The interface requires these params but they’re unused. Prefix with underscore to appease Ruff.
- def reader(self, schema): + def reader(self, _schema): return SpotifyReader(self.options) @@ - def read(self, partition): + def read(self, _partition): access_token = self._get_access_token()Also applies to: 96-96
83-96: Provide a default single partition.Implement partitions() to comply with guideline and enable future parallelism.
class SpotifyReader(DataSourceReader): def __init__(self, options): self.options = options @@ if not all([self.client_id, self.client_secret, self.refresh_token]): raise ValueError( "spotify.client.id, spotify.client.secret, and spotify.refresh.token must be specified in options" ) + def partitions(self): + # Single-partition source; can later split by offset/range if needed. + return [None]
64-77: Consider TimestampType for added_at.Using TimestampType improves type fidelity vs. string (ISO-8601).
-from pyspark.sql.types import ( +from pyspark.sql.types import ( StructType, StructField, StringType, IntegerType, LongType, ArrayType, + TimestampType, ) @@ - StructField("added_at", StringType(), True), + StructField("added_at", TimestampType(), True),If you change the schema, parse the string to datetime or let Spark cast downstream; document the behavior.
91-95: Keep error messages concise; avoid long literals in raise sites.Minor style: shorten messages or centralize them if reused.
Also applies to: 119-119
docs/datasources/spotify.md (1)
61-85: Document options, defaults, and add spark.stop() for completeness.Per guidelines, include an Options section and stop the SparkSession.
@@ ## Usage @@ Once you have your credentials, you can use the Spotify data source in PySpark. +### Options + +- spotify.client.id (string, required) +- spotify.client.secret (string, required) +- spotify.refresh.token (string, required) +- type (string, default: "tracks"; supported: "tracks") +- Note: Network requests use timeouts; handle 429 rate limits by retry/backoff. + ```python from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("SpotifyExample").getOrCreate() @@ # Show the data df.show() + +# Stop Spark when done +spark.stop()Additionally, consider a brief “Error cases” section (missing credentials, 401/429, network timeouts) and a note on partitioning (currently single partition) and Arrow usage. </blockquote></details> <details> <summary>tests/test_spotify.py (2)</summary><blockquote> `72-75`: **Add assertions for parsed fields to harden coverage.** Verify artists/album/duration/popularity extraction. ```diff assert len(rows) == 2 assert rows[0]["name"] == "Test Track 1" assert rows[1]["name"] == "Test Track 2" + assert rows[0]["artists"] == ["Artist 1"] + assert rows[0]["album"] == "Album 1" + assert rows[0]["duration_ms"] == 200000 + assert rows[0]["popularity"] == 50
76-79: Add a test for unsupported type.Ensure a clear error is raised when an unknown type is requested.
def test_spotify_reader_unsupported_type(): options = { "spotify.client.id": "test_id", "spotify.client.secret": "test_secret", "spotify.refresh.token": "test_refresh", "type": "playlists", } with pytest.raises(ValueError, match="Unsupported type"): SpotifyReader(options).read(None).__iter__().__next__()Also applies to: 90-97
examples/spotify_example.py (1)
2-3: Avoid hardcoded secrets; catch specific exceptions; stop Spark.Use env vars for creds, handle request errors explicitly, and always stop Spark.
-from pyspark.sql import SparkSession +from pyspark.sql import SparkSession +import os +import requests @@ -# Replace with your actual credentials -CLIENT_ID = "YOUR_CLIENT_ID" -CLIENT_SECRET = "YOUR_CLIENT_SECRET" -REFRESH_TOKEN = "YOUR_REFRESH_TOKEN" +# Replace with your actual credentials or set environment variables +CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "YOUR_CLIENT_ID") +CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "YOUR_CLIENT_SECRET") +REFRESH_TOKEN = os.getenv("SPOTIFY_REFRESH_TOKEN", "YOUR_REFRESH_TOKEN") @@ -try: +try: df = ( spark.read.format("spotify") .option("spotify.client.id", CLIENT_ID) .option("spotify.client.secret", CLIENT_SECRET) .option("spotify.refresh.token", REFRESH_TOKEN) .option("type", "tracks") .load() ) @@ -except Exception as e: +except (requests.exceptions.RequestException, ValueError) as e: print(f"An error occurred: {e}") print("Please ensure you have replaced the placeholder credentials with your actual credentials.") +finally: + spark.stop()Also applies to: 15-19, 21-39
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
docs/datasources/spotify.md(1 hunks)examples/spotify_example.py(1 hunks)pyspark_datasources/spotify.py(1 hunks)tests/test_spotify.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
pyspark_datasources/!(__init__).py
📄 CodeRabbit inference engine (CLAUDE.md)
pyspark_datasources/!(__init__).py: All data source classes must inherit from Spark's DataSource base class
Implement schema() in each data source to define the schema using PySpark StructType
Implement reader(schema) for batch reads; if streaming is supported, implement streamReader() or simpleStreamReader() accordingly
If batch writing is supported, implement writer(schema, overwrite); if streaming writes are supported, implement streamWriter(schema, overwrite)
All classes used by the data sources (including readers/writers) must be pickle-serializable
Use only standard Spark SQL data types in schemas
Implement robust exception handling in data source read/write paths
For streaming sources, ensure resources are cleaned up (e.g., implement stop()/commit() as appropriate)
Prefer returning pyarrow.RecordBatch from readers for better serialization performance
Implement partitions() on DataSourceReader to enable parallel processing when beneficial
Defer expensive operations until read time (lazy evaluation) in data source implementations
Include comprehensive class docstrings for each data source with: brief description and Name: "format_name", an Options section (parameters/types/defaults), and Examples (registration and basic usage)
In docstrings, include schema output (e.g., df.printSchema()) for clarity
In docstrings, document error cases (e.g., missing files or invalid options)
In docstrings, document the partitioning strategy used to improve performance
In docstrings, document Arrow optimizations (how Arrow is used to transmit data)
DataSourceReader.read must return tuples, Rows, or pyarrow.RecordBatch; implement accordingly
Files:
pyspark_datasources/spotify.py
{pyspark_datasources,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
{pyspark_datasources,tests}/**/*.py: When specifying file paths with Spark, use load("/path") instead of option("path", "/path")
Format code with Ruff (ruff format) and fix/lint with ruff check
Files:
pyspark_datasources/spotify.pytests/test_spotify.py
🧬 Code graph analysis (2)
examples/spotify_example.py (1)
pyspark_datasources/spotify.py (2)
SpotifyDataSource(15-80)read(96-119)
tests/test_spotify.py (1)
pyspark_datasources/spotify.py (3)
SpotifyReader(83-128)reader(79-80)read(96-119)
🪛 LanguageTool
docs/datasources/spotify.md
[grammar] ~91-~91: There might be a mistake here.
Context: ...: | Field | Type | |-------------|---------------------| | ...
(QB_NEW_EN)
[grammar] ~92-~92: There might be a mistake here.
Context: ... | |-------------|---------------------| | id | string | | ...
(QB_NEW_EN)
[grammar] ~93-~93: There might be a mistake here.
Context: ...-| | id | string | | name | string | | ...
(QB_NEW_EN)
[grammar] ~94-~94: There might be a mistake here.
Context: ... | | name | string | | artists | array<string> | | ...
(QB_NEW_EN)
[grammar] ~95-~95: There might be a mistake here.
Context: ... | | artists | array<string> | | album | string | | ...
(QB_NEW_EN)
[grammar] ~96-~96: There might be a mistake here.
Context: ... | | album | string | | duration_ms | long | | ...
(QB_NEW_EN)
[grammar] ~97-~97: There might be a mistake here.
Context: ... | | duration_ms | long | | popularity | integer | | ...
(QB_NEW_EN)
[grammar] ~98-~98: There might be a mistake here.
Context: ... | | popularity | integer | | added_at | string |
(QB_NEW_EN)
🪛 Ruff (0.14.0)
pyspark_datasources/spotify.py
79-79: Unused method argument: schema
(ARG002)
92-94: Avoid specifying long messages outside the exception class
(TRY003)
96-96: Unused method argument: partition
(ARG002)
103-103: Probable use of requests call without timeout
(S113)
119-119: Avoid specifying long messages outside the exception class
(TRY003)
122-122: Probable use of requests call without timeout
(S113)
examples/spotify_example.py
17-17: Possible hardcoded password assigned to: "CLIENT_SECRET"
(S105)
18-18: Possible hardcoded password assigned to: "REFRESH_TOKEN"
(S105)
37-37: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: test (3.12)
- GitHub Check: test (3.9)
- GitHub Check: test (3.11)
- GitHub Check: test (3.10)
| >>> from pyspark_datasources import SpotifyDataSource | ||
| >>> spark.dataSource.register(SpotifyDataSource) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix import path in docstring example.
The example should import from the module, not the package root.
- >>> from pyspark_datasources import SpotifyDataSource
+ >>> from pyspark_datasources.spotify import SpotifyDataSource📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| >>> from pyspark_datasources import SpotifyDataSource | |
| >>> spark.dataSource.register(SpotifyDataSource) | |
| >>> from pyspark_datasources.spotify import SpotifyDataSource | |
| >>> spark.dataSource.register(SpotifyDataSource) | |
🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 37 to 39, the docstring example
imports SpotifyDataSource from the package root; update the example to import
from the module path (from pyspark_datasources.spotify import SpotifyDataSource)
so the example reflects the correct import, leaving the
spark.dataSource.register(...) line unchanged.
| response = requests.get(url, headers=headers) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| for item in data["items"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add timeouts to external HTTP calls to avoid indefinite hangs.
Both requests lack timeouts. This is a reliability and production-safety issue.
Apply:
@@
- response = requests.get(url, headers=headers)
+ response = requests.get(url, headers=headers, timeout=(3.05, 30))
@@
- response = requests.post(
+ response = requests.post(
"https://accounts.spotify.com/api/token",
data={"grant_type": "refresh_token", "refresh_token": self.refresh_token},
auth=(self.client_id, self.client_secret),
- )
+ timeout=(3.05, 30),
+ )Optionally, hoist the timeout to a module constant (e.g., DEFAULT_TIMEOUT) for reuse. Based on learnings
Also applies to: 122-128
🧰 Tools
🪛 Ruff (0.14.0)
103-103: Probable use of requests call without timeout
(S113)
🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 103-106 (and also apply the same
fix to 122-128), the requests calls lack timeouts causing potential indefinite
hangs; define a module-level DEFAULT_TIMEOUT (e.g., DEFAULT_TIMEOUT = 10) near
the top of the file and pass timeout=DEFAULT_TIMEOUT to each
requests.get/requests.post call on those lines (and any other external HTTP
calls in the file) so the calls fail fast instead of hanging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
pyspark_datasources/spotify.py (3)
37-39: Fix import path in docstring example.The example imports from the package root instead of the module path.
Apply:
- >>> from pyspark_datasources import SpotifyDataSource + >>> from pyspark_datasources.spotify import SpotifyDataSource
103-103: Add timeout to HTTP call to prevent indefinite hangs.Missing timeout is a reliability issue that can cause jobs to hang indefinitely.
122-128: Add timeout to HTTP call to prevent indefinite hangs.Missing timeout is a reliability issue.
Also consider validating the response structure:
response.raise_for_status() - return response.json()["access_token"] + token_data = response.json() + if "access_token" not in token_data: + raise ValueError("Spotify API did not return an access token") + return token_data["access_token"]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
.gitignore(1 hunks)docs/datasources/spotify.md(1 hunks)pyspark_datasources/spotify.py(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- .gitignore
🚧 Files skipped from review as they are similar to previous changes (1)
- docs/datasources/spotify.md
🧰 Additional context used
📓 Path-based instructions (2)
pyspark_datasources/!(__init__).py
📄 CodeRabbit inference engine (CLAUDE.md)
pyspark_datasources/!(__init__).py: All data source classes must inherit from Spark's DataSource base class
Implement schema() in each data source to define the schema using PySpark StructType
Implement reader(schema) for batch reads; if streaming is supported, implement streamReader() or simpleStreamReader() accordingly
If batch writing is supported, implement writer(schema, overwrite); if streaming writes are supported, implement streamWriter(schema, overwrite)
All classes used by the data sources (including readers/writers) must be pickle-serializable
Use only standard Spark SQL data types in schemas
Implement robust exception handling in data source read/write paths
For streaming sources, ensure resources are cleaned up (e.g., implement stop()/commit() as appropriate)
Prefer returning pyarrow.RecordBatch from readers for better serialization performance
Implement partitions() on DataSourceReader to enable parallel processing when beneficial
Defer expensive operations until read time (lazy evaluation) in data source implementations
Include comprehensive class docstrings for each data source with: brief description and Name: "format_name", an Options section (parameters/types/defaults), and Examples (registration and basic usage)
In docstrings, include schema output (e.g., df.printSchema()) for clarity
In docstrings, document error cases (e.g., missing files or invalid options)
In docstrings, document the partitioning strategy used to improve performance
In docstrings, document Arrow optimizations (how Arrow is used to transmit data)
DataSourceReader.read must return tuples, Rows, or pyarrow.RecordBatch; implement accordingly
Files:
pyspark_datasources/spotify.py
{pyspark_datasources,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
{pyspark_datasources,tests}/**/*.py: When specifying file paths with Spark, use load("/path") instead of option("path", "/path")
Format code with Ruff (ruff format) and fix/lint with ruff check
Files:
pyspark_datasources/spotify.py
🪛 Ruff (0.14.1)
pyspark_datasources/spotify.py
79-79: Unused method argument: schema
(ARG002)
92-94: Avoid specifying long messages outside the exception class
(TRY003)
96-96: Unused method argument: partition
(ARG002)
103-103: Probable use of requests call without timeout
(S113)
119-119: Avoid specifying long messages outside the exception class
(TRY003)
122-122: Probable use of requests call without timeout
(S113)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (3.12)
- GitHub Check: test (3.10)
- GitHub Check: test (3.11)
🔇 Additional comments (4)
pyspark_datasources/spotify.py (4)
1-12: LGTM! Imports are appropriate.All necessary imports are present for the Spotify data source implementation.
63-77: LGTM! Schema definition is correct.The schema appropriately models Spotify track data with correct types.
79-80: LGTM! Reader instantiation is correct.The unused
schemaparameter is part of the interface contract and acceptable here since the schema is fixed.
84-94: LGTM! Option validation is appropriate.The credential validation ensures required options are present before attempting API calls.
| def read(self, partition): | ||
| access_token = self._get_access_token() | ||
| headers = {"Authorization": f"Bearer {access_token}"} | ||
|
|
||
| if self.type == "tracks": | ||
| url = "https://api.spotify.com/v1/me/tracks" | ||
| while url: | ||
| response = requests.get(url, headers=headers) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| for item in data["items"]: | ||
| track = item["track"] | ||
| yield Row( | ||
| id=track["id"], | ||
| name=track["name"], | ||
| artists=[artist["name"] for artist in track["artists"]], | ||
| album=track["album"]["name"], | ||
| duration_ms=track["duration_ms"], | ||
| popularity=track["popularity"], | ||
| added_at=item["added_at"], | ||
| ) | ||
| url = data.get("next") | ||
| else: | ||
| raise ValueError(f"Unsupported type: {self.type}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add validation for API response structure to prevent KeyError.
The code directly accesses nested JSON fields without validation. If the Spotify API returns a malformed response or missing fields, this will raise KeyError and crash the Spark job.
Consider adding defensive checks:
for item in data["items"]:
track = item["track"]
+ # Validate required fields
+ required_fields = ["id", "name", "artists", "album", "duration_ms", "popularity"]
+ if not all(field in track for field in required_fields):
+ continue # Skip malformed tracks
+ if "name" not in track.get("album", {}):
+ continue
+
yield Row(
id=track["id"],
name=track["name"],
artists=[artist["name"] for artist in track["artists"]],
album=track["album"]["name"],
duration_ms=track["duration_ms"],
popularity=track["popularity"],
- added_at=item["added_at"],
+ added_at=item.get("added_at"),
)Alternatively, wrap the access in a try-except block to handle malformed responses gracefully.
🧰 Tools
🪛 Ruff (0.14.1)
96-96: Unused method argument: partition
(ARG002)
103-103: Probable use of requests call without timeout
(S113)
119-119: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 96 to 119, the code assumes the
Spotify API response always contains nested keys (e.g., "items", item["track"],
track["id"]) which can raise KeyError on malformed responses; update the read
method to validate the top-level response (ensure response.ok and presence of
"items"), iterate defensively using dict.get(...) for safe access, wrap per-item
parsing in a try/except to catch KeyError/TypeError and log a warning (or
increment a metric) then skip the malformed item, and ensure you safely retrieve
the next page with data.get("next") so the loop continues gracefully instead of
crashing the Spark job.
This commit introduces a new data source for reading data from the Spotify API.
The data source currently supports reading a user's saved tracks. It uses OAuth 2.0 with a refresh token for authentication.
The following files have been added:
Summary by CodeRabbit
New Features
Documentation
Examples
Tests
Chores