Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,4 @@ claude_cache/

# Gemini
.gemini/
.vscode
100 changes: 100 additions & 0 deletions docs/datasources/spotify.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@

# Spotify

The Spotify data source allows you to read data from the Spotify API as a Spark DataFrame. Currently, it supports reading a user's saved tracks.

## Authentication

To use the Spotify data source, you need to authenticate with the Spotify API. This is done using OAuth 2.0. You will need to provide your Client ID, Client Secret, and a Refresh Token.

### 1. Create a Spotify Developer App

1. Go to the [Spotify Developer Dashboard](https://developer.spotify.com/dashboard) and log in.
2. Click on "Create an App".
3. Give your app a name and description, and agree to the terms.
4. Once the app is created, you will see your **Client ID** and **Client Secret**. Copy these values.
5. Click on "Edit Settings" and add a **Redirect URI**. For the purpose of generating a refresh token, you can use `https://example.com/callback`. Click "Save".

### 2. Generate a Refresh Token

A refresh token is a long-lived credential that can be used to obtain new access tokens. You only need to generate this once.

To generate a refresh token, you can use the following Python script. You will need to install the `spotipy` library (`pip install spotipy`).

```python
import spotipy
from spotipy.oauth2 import SpotifyOAuth

# Replace with your Client ID, Client Secret, and Redirect URI
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
REDIRECT_URI = "https://example.com/callback"

# The scope determines what permissions your app is requesting.
# For reading saved tracks, you need the 'user-library-read' scope.
SCOPE = "user-library-read"

auth_manager = SpotifyOAuth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uri=REDIRECT_URI,
scope=SCOPE,
open_browser=True
)

# This will open a browser window for you to log in and authorize the app.
# After you authorize, you will be redirected to the redirect URI.
# The URL of the redirected page will contain a 'code' parameter.
# Copy the entire URL and paste it into the terminal.

auth_manager.get_access_token(as_dict=False)

# The refresh token will be printed to the console.
# It will also be saved in a file named .cache in your current directory.

print(f"Refresh Token: {auth_manager.get_cached_token()['refresh_token']}")

```

Run this script, and it will guide you through the authorization process. At the end, it will print your refresh token. **Save this token securely.**

## Usage

Once you have your credentials, you can use the Spotify data source in PySpark.

```python
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("SpotifyExample").getOrCreate()

# Register the data source
from pyspark_datasources.spotify import SpotifyDataSource
spark.dataSource.register(SpotifyDataSource)

# Load your saved tracks
df = spark.read.format("spotify") \
.option("spotify.client.id", "YOUR_CLIENT_ID") \
.option("spotify.client.secret", "YOUR_CLIENT_SECRET") \
.option("spotify.refresh.token", "YOUR_REFRESH_TOKEN") \
.option("type", "tracks") \
.load()

# Show the data
df.show()
```

## Schema

The schema for the `tracks` type is as follows:

| Field | Type |
|-------------|-----------------|
| id | `string` |
| name | `string` |
| artists | `array<string>` |
| album | `string` |
| duration_ms | `long` |
| popularity | `integer` |
| added_at | `string` |

40 changes: 40 additions & 0 deletions examples/spotify_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

from pyspark.sql import SparkSession

# This is an example of how to use the Spotify data source.
# Before running this, make sure you have followed the authentication
# instructions in docs/datasources/spotify.md to get your credentials.

# Create a SparkSession
spark = SparkSession.builder.appName("SpotifyExample").getOrCreate()

# Register the data source
from pyspark_datasources.spotify import SpotifyDataSource
spark.dataSource.register(SpotifyDataSource)

# Replace with your actual credentials
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
REFRESH_TOKEN = "YOUR_REFRESH_TOKEN"

# Load your saved tracks
try:
df = (
spark.read.format("spotify")
.option("spotify.client.id", CLIENT_ID)
.option("spotify.client.secret", CLIENT_SECRET)
.option("spotify.refresh.token", REFRESH_TOKEN)
.option("type", "tracks")
.load()
)

# Show the data
df.show()

# Print the schema
df.printSchema()

except Exception as e:
print(f"An error occurred: {e}")
print("Please ensure you have replaced the placeholder credentials with your actual credentials.")

129 changes: 129 additions & 0 deletions pyspark_datasources/spotify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@

import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import (
StructType,
StructField,
StringType,
IntegerType,
LongType,
ArrayType,
)


class SpotifyDataSource(DataSource):
"""
A DataSource for reading data from Spotify.

Name: `spotify`

The `type` option is used to specify what data to load. Currently, only
the `tracks` type is supported, which loads the user's saved songs.

Schema for `tracks` type:
- `id`: string
- `name`: string
- `artists`: array<string>
- `album`: string
- `duration_ms`: long
- `popularity`: integer
- `added_at`: string

Examples:
---------
Register the data source.

>>> from pyspark_datasources import SpotifyDataSource
>>> spark.dataSource.register(SpotifyDataSource)

Comment on lines +37 to +39
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix import path in docstring example.

The example should import from the module, not the package root.

-    >>> from pyspark_datasources import SpotifyDataSource
+    >>> from pyspark_datasources.spotify import SpotifyDataSource
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
>>> from pyspark_datasources import SpotifyDataSource
>>> spark.dataSource.register(SpotifyDataSource)
>>> from pyspark_datasources.spotify import SpotifyDataSource
>>> spark.dataSource.register(SpotifyDataSource)
🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 37 to 39, the docstring example
imports SpotifyDataSource from the package root; update the example to import
from the module path (from pyspark_datasources.spotify import SpotifyDataSource)
so the example reflects the correct import, leaving the
spark.dataSource.register(...) line unchanged.

Load your saved tracks from Spotify.

>>> df = (
... spark.read.format("spotify")
... .option("spotify.client.id", "YOUR_CLIENT_ID")
... .option("spotify.client.secret", "YOUR_CLIENT_SECRET")
... .option("spotify.refresh.token", "YOUR_REFRESH_TOKEN")
... .option("type", "tracks")
... .load()
... )
>>> df.show()
+----------------------+----------------+------------------+---------+-----------+----------+--------------------+
|id |name |artists |album |duration_ms|popularity|added_at |
+----------------------+----------------+------------------+---------+-----------+----------+--------------------+
|1BxfuPKGuaTgP7aM0B... |All Too Well |[Taylor Swift] |Red |329466 |82 |2025-10-16T12:00:00Z|
|... |... |... |... |... |... |... |
+----------------------+----------------+------------------+---------+-----------+----------+--------------------+
"""

@classmethod
def name(cls):
return "spotify"

def schema(self):
# Simplified schema for tracks
return StructType(
[
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField(
"artists", ArrayType(StringType(), True), True
),
StructField("album", StringType(), True),
StructField("duration_ms", LongType(), True),
StructField("popularity", IntegerType(), True),
StructField("added_at", StringType(), True),
]
)

def reader(self, schema):
return SpotifyReader(self.options)


class SpotifyReader(DataSourceReader):
def __init__(self, options):
self.options = options
self.client_id = self.options.get("spotify.client.id")
self.client_secret = self.options.get("spotify.client.secret")
self.refresh_token = self.options.get("spotify.refresh.token")
self.type = self.options.get("type", "tracks")

if not all([self.client_id, self.client_secret, self.refresh_token]):
raise ValueError(
"spotify.client.id, spotify.client.secret, and spotify.refresh.token must be specified in options"
)

def read(self, partition):
access_token = self._get_access_token()
headers = {"Authorization": f"Bearer {access_token}"}

if self.type == "tracks":
url = "https://api.spotify.com/v1/me/tracks"
while url:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
for item in data["items"]:
Comment on lines +103 to +106
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add timeouts to external HTTP calls to avoid indefinite hangs.

Both requests lack timeouts. This is a reliability and production-safety issue.

Apply:

@@
-                response = requests.get(url, headers=headers)
+                response = requests.get(url, headers=headers, timeout=(3.05, 30))
@@
-        response = requests.post(
+        response = requests.post(
             "https://accounts.spotify.com/api/token",
             data={"grant_type": "refresh_token", "refresh_token": self.refresh_token},
             auth=(self.client_id, self.client_secret),
-        )
+            timeout=(3.05, 30),
+        )

Optionally, hoist the timeout to a module constant (e.g., DEFAULT_TIMEOUT) for reuse. Based on learnings

Also applies to: 122-128

🧰 Tools
🪛 Ruff (0.14.0)

103-103: Probable use of requests call without timeout

(S113)

🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 103-106 (and also apply the same
fix to 122-128), the requests calls lack timeouts causing potential indefinite
hangs; define a module-level DEFAULT_TIMEOUT (e.g., DEFAULT_TIMEOUT = 10) near
the top of the file and pass timeout=DEFAULT_TIMEOUT to each
requests.get/requests.post call on those lines (and any other external HTTP
calls in the file) so the calls fail fast instead of hanging.

track = item["track"]
yield Row(
id=track["id"],
name=track["name"],
artists=[artist["name"] for artist in track["artists"]],
album=track["album"]["name"],
duration_ms=track["duration_ms"],
popularity=track["popularity"],
added_at=item["added_at"],
)
url = data.get("next")
else:
raise ValueError(f"Unsupported type: {self.type}")
Comment on lines +96 to +119
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add validation for API response structure to prevent KeyError.

The code directly accesses nested JSON fields without validation. If the Spotify API returns a malformed response or missing fields, this will raise KeyError and crash the Spark job.

Consider adding defensive checks:

                 for item in data["items"]:
                     track = item["track"]
+                    # Validate required fields
+                    required_fields = ["id", "name", "artists", "album", "duration_ms", "popularity"]
+                    if not all(field in track for field in required_fields):
+                        continue  # Skip malformed tracks
+                    if "name" not in track.get("album", {}):
+                        continue
+                    
                     yield Row(
                         id=track["id"],
                         name=track["name"],
                         artists=[artist["name"] for artist in track["artists"]],
                         album=track["album"]["name"],
                         duration_ms=track["duration_ms"],
                         popularity=track["popularity"],
-                        added_at=item["added_at"],
+                        added_at=item.get("added_at"),
                     )

Alternatively, wrap the access in a try-except block to handle malformed responses gracefully.

🧰 Tools
🪛 Ruff (0.14.1)

96-96: Unused method argument: partition

(ARG002)


103-103: Probable use of requests call without timeout

(S113)


119-119: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In pyspark_datasources/spotify.py around lines 96 to 119, the code assumes the
Spotify API response always contains nested keys (e.g., "items", item["track"],
track["id"]) which can raise KeyError on malformed responses; update the read
method to validate the top-level response (ensure response.ok and presence of
"items"), iterate defensively using dict.get(...) for safe access, wrap per-item
parsing in a try/except to catch KeyError/TypeError and log a warning (or
increment a metric) then skip the malformed item, and ensure you safely retrieve
the next page with data.get("next") so the loop continues gracefully instead of
crashing the Spark job.


def _get_access_token(self):
response = requests.post(
"https://accounts.spotify.com/api/token",
data={"grant_type": "refresh_token", "refresh_token": self.refresh_token},
auth=(self.client_id, self.client_secret),
)
response.raise_for_status()
return response.json()["access_token"]

97 changes: 97 additions & 0 deletions tests/test_spotify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
Unit tests for the Spotify data source.

These tests focus on the SpotifyReader class directly to avoid issues with
SparkSession creation in certain test environments. This approach allows for
testing the core logic of the data source, including authentication and data
parsing, without a full SparkSession.
"""
import pytest
from unittest.mock import patch, MagicMock
import requests
from pyspark.sql import Row
from pyspark_datasources.spotify import SpotifyReader

@patch("pyspark_datasources.spotify.requests.post")
@patch("pyspark_datasources.spotify.requests.get")
def test_spotify_reader_success(mock_get, mock_post):
# Mock the response from the token endpoint
mock_post.return_value.json.return_value = {"access_token": "test_token"}
mock_post.return_value.raise_for_status = MagicMock()

# Mock the response from the tracks endpoint (with pagination)
mock_get.side_effect = [
MagicMock(
json=lambda: {
"items": [
{
"added_at": "2025-10-16T12:00:00Z",
"track": {
"id": "track1",
"name": "Test Track 1",
"artists": [{"name": "Artist 1"}],
"album": {"name": "Album 1"},
"duration_ms": 200000,
"popularity": 50,
},
}
],
"next": "https://api.spotify.com/v1/me/tracks?offset=1&limit=1",
},
raise_for_status=MagicMock(),
),
MagicMock(
json=lambda: {
"items": [
{
"added_at": "2025-10-16T12:05:00Z",
"track": {
"id": "track2",
"name": "Test Track 2",
"artists": [{"name": "Artist 2"}],
"album": {"name": "Album 2"},
"duration_ms": 220000,
"popularity": 60,
},
}
],
"next": None,
},
raise_for_status=MagicMock(),
),
]

options = {
"spotify.client.id": "test_id",
"spotify.client.secret": "test_secret",
"spotify.refresh.token": "test_refresh",
}
reader = SpotifyReader(options)
rows = list(reader.read(None))

assert len(rows) == 2
assert rows[0]["name"] == "Test Track 1"
assert rows[1]["name"] == "Test Track 2"

def test_spotify_reader_missing_credentials():
with pytest.raises(ValueError, match="must be specified in options"):
SpotifyReader({})

@patch("pyspark_datasources.spotify.requests.post")
@patch("pyspark_datasources.spotify.requests.get")
def test_spotify_reader_api_error(mock_get, mock_post):
mock_post.return_value.json.return_value = {"access_token": "test_token"}
mock_post.return_value.raise_for_status = MagicMock()

mock_get.return_value.raise_for_status.side_effect = requests.exceptions.HTTPError(
"401 Client Error: Unauthorized for url"
)

options = {
"spotify.client.id": "test_id",
"spotify.client.secret": "test_secret",
"spotify.refresh.token": "test_refresh",
}
reader = SpotifyReader(options)
with pytest.raises(requests.exceptions.HTTPError):
list(reader.read(None))