Skip to content

Commit ba0af03

Browse files
feat: Add Spotify data source
This commit introduces a new data source for reading data from the Spotify API. The data source currently supports reading a user's saved tracks. It uses OAuth 2.0 with a refresh token for authentication. The following files have been added: - pyspark_datasources/spotify.py: The implementation of the data source. - docs/datasources/spotify.md: Documentation for the data source. - examples/spotify_example.py: An example of how to use the data source. - tests/test_spotify.py: Unit tests for the data source.
1 parent 5788049 commit ba0af03

File tree

4 files changed

+366
-0
lines changed

4 files changed

+366
-0
lines changed

docs/datasources/spotify.md

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
2+
# Spotify
3+
4+
The Spotify data source allows you to read data from the Spotify API as a Spark DataFrame. Currently, it supports reading a user's saved tracks.
5+
6+
## Authentication
7+
8+
To use the Spotify data source, you need to authenticate with the Spotify API. This is done using OAuth 2.0. You will need to provide your Client ID, Client Secret, and a Refresh Token.
9+
10+
### 1. Create a Spotify Developer App
11+
12+
1. Go to the [Spotify Developer Dashboard](https://developer.spotify.com/dashboard) and log in.
13+
2. Click on "Create an App".
14+
3. Give your app a name and description, and agree to the terms.
15+
4. Once the app is created, you will see your **Client ID** and **Client Secret**. Copy these values.
16+
5. Click on "Edit Settings" and add a **Redirect URI**. For the purpose of generating a refresh token, you can use `https://example.com/callback`. Click "Save".
17+
18+
### 2. Generate a Refresh Token
19+
20+
A refresh token is a long-lived credential that can be used to obtain new access tokens. You only need to generate this once.
21+
22+
To generate a refresh token, you can use the following Python script. You will need to install the `spotipy` library (`pip install spotipy`).
23+
24+
```python
25+
import spotipy
26+
from spotipy.oauth2 import SpotifyOAuth
27+
28+
# Replace with your Client ID, Client Secret, and Redirect URI
29+
CLIENT_ID = "YOUR_CLIENT_ID"
30+
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
31+
REDIRECT_URI = "https://example.com/callback"
32+
33+
# The scope determines what permissions your app is requesting.
34+
# For reading saved tracks, you need the 'user-library-read' scope.
35+
SCOPE = "user-library-read"
36+
37+
auth_manager = SpotifyOAuth(
38+
client_id=CLIENT_ID,
39+
client_secret=CLIENT_SECRET,
40+
redirect_uri=REDIRECT_URI,
41+
scope=SCOPE,
42+
open_browser=True
43+
)
44+
45+
# This will open a browser window for you to log in and authorize the app.
46+
# After you authorize, you will be redirected to the redirect URI.
47+
# The URL of the redirected page will contain a 'code' parameter.
48+
# Copy the entire URL and paste it into the terminal.
49+
50+
auth_manager.get_access_token(as_dict=False)
51+
52+
# The refresh token will be printed to the console.
53+
# It will also be saved in a file named .cache in your current directory.
54+
55+
print(f"Refresh Token: {auth_manager.get_cached_token()['refresh_token']}")
56+
57+
```
58+
59+
Run this script, and it will guide you through the authorization process. At the end, it will print your refresh token. **Save this token securely.**
60+
61+
## Usage
62+
63+
Once you have your credentials, you can use the Spotify data source in PySpark.
64+
65+
```python
66+
from pyspark.sql import SparkSession
67+
68+
# Create a SparkSession
69+
spark = SparkSession.builder.appName("SpotifyExample").getOrCreate()
70+
71+
# Register the data source
72+
from pyspark_datasources.spotify import SpotifyDataSource
73+
spark.dataSource.register(SpotifyDataSource)
74+
75+
# Load your saved tracks
76+
df = spark.read.format("spotify") \
77+
.option("spotify.client.id", "YOUR_CLIENT_ID") \
78+
.option("spotify.client.secret", "YOUR_CLIENT_SECRET") \
79+
.option("spotify.refresh.token", "YOUR_REFRESH_TOKEN") \
80+
.option("type", "tracks") \
81+
.load()
82+
83+
# Show the data
84+
df.show()
85+
```
86+
87+
## Schema
88+
89+
The schema for the `tracks` type is as follows:
90+
91+
| Field | Type |
92+
|-------------|---------------------|
93+
| id | `string` |
94+
| name | `string` |
95+
| artists | `array<string>` |
96+
| album | `string` |
97+
| duration_ms | `long` |
98+
| popularity | `integer` |
99+
| added_at | `string` |
100+

examples/spotify_example.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
2+
from pyspark.sql import SparkSession
3+
4+
# This is an example of how to use the Spotify data source.
5+
# Before running this, make sure you have followed the authentication
6+
# instructions in docs/datasources/spotify.md to get your credentials.
7+
8+
# Create a SparkSession
9+
spark = SparkSession.builder.appName("SpotifyExample").getOrCreate()
10+
11+
# Register the data source
12+
from pyspark_datasources.spotify import SpotifyDataSource
13+
spark.dataSource.register(SpotifyDataSource)
14+
15+
# Replace with your actual credentials
16+
CLIENT_ID = "YOUR_CLIENT_ID"
17+
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
18+
REFRESH_TOKEN = "YOUR_REFRESH_TOKEN"
19+
20+
# Load your saved tracks
21+
try:
22+
df = (
23+
spark.read.format("spotify")
24+
.option("spotify.client.id", CLIENT_ID)
25+
.option("spotify.client.secret", CLIENT_SECRET)
26+
.option("spotify.refresh.token", REFRESH_TOKEN)
27+
.option("type", "tracks")
28+
.load()
29+
)
30+
31+
# Show the data
32+
df.show()
33+
34+
# Print the schema
35+
df.printSchema()
36+
37+
except Exception as e:
38+
print(f"An error occurred: {e}")
39+
print("Please ensure you have replaced the placeholder credentials with your actual credentials.")
40+

pyspark_datasources/spotify.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
2+
import requests
3+
from pyspark.sql import Row
4+
from pyspark.sql.datasource import DataSource, DataSourceReader
5+
from pyspark.sql.types import (
6+
StructType,
7+
StructField,
8+
StringType,
9+
IntegerType,
10+
LongType,
11+
ArrayType,
12+
)
13+
14+
15+
class SpotifyDataSource(DataSource):
16+
"""
17+
A DataSource for reading data from Spotify.
18+
19+
Name: `spotify`
20+
21+
The `type` option is used to specify what data to load. Currently, only
22+
the `tracks` type is supported, which loads the user's saved songs.
23+
24+
Schema for `tracks` type:
25+
- `id`: string
26+
- `name`: string
27+
- `artists`: array<string>
28+
- `album`: string
29+
- `duration_ms`: long
30+
- `popularity`: integer
31+
- `added_at`: string
32+
33+
Examples:
34+
---------
35+
Register the data source.
36+
37+
>>> from pyspark_datasources import SpotifyDataSource
38+
>>> spark.dataSource.register(SpotifyDataSource)
39+
40+
Load your saved tracks from Spotify.
41+
42+
>>> df = (
43+
... spark.read.format("spotify")
44+
... .option("spotify.client.id", "YOUR_CLIENT_ID")
45+
... .option("spotify.client.secret", "YOUR_CLIENT_SECRET")
46+
... .option("spotify.refresh.token", "YOUR_REFRESH_TOKEN")
47+
... .option("type", "tracks")
48+
... .load()
49+
... )
50+
>>> df.show()
51+
+----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+
52+
| id| name| artists| album|duration_ms|popularity| added_at|
53+
+----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+
54+
|1BxfuPKGuaTgP7aM0B...| All Too Well| [Taylor Swift]| Red| 329466| 82|2025-10-16T12:00:00Z|
55+
| ...| ...| ...| ...| ...| ...| ...|
56+
+----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+
57+
"""
58+
59+
@classmethod
60+
def name(cls):
61+
return "spotify"
62+
63+
def schema(self):
64+
# Simplified schema for tracks
65+
return StructType(
66+
[
67+
StructField("id", StringType(), True),
68+
StructField("name", StringType(), True),
69+
StructField(
70+
"artists", ArrayType(StringType(), True), True
71+
),
72+
StructField("album", StringType(), True),
73+
StructField("duration_ms", LongType(), True),
74+
StructField("popularity", IntegerType(), True),
75+
StructField("added_at", StringType(), True),
76+
]
77+
)
78+
79+
def reader(self, schema):
80+
return SpotifyReader(self.options)
81+
82+
83+
class SpotifyReader(DataSourceReader):
84+
def __init__(self, options):
85+
self.options = options
86+
self.client_id = self.options.get("spotify.client.id")
87+
self.client_secret = self.options.get("spotify.client.secret")
88+
self.refresh_token = self.options.get("spotify.refresh.token")
89+
self.type = self.options.get("type", "tracks")
90+
91+
if not all([self.client_id, self.client_secret, self.refresh_token]):
92+
raise ValueError(
93+
"spotify.client.id, spotify.client.secret, and spotify.refresh.token must be specified in options"
94+
)
95+
96+
def read(self, partition):
97+
access_token = self._get_access_token()
98+
headers = {"Authorization": f"Bearer {access_token}"}
99+
100+
if self.type == "tracks":
101+
url = "https://api.spotify.com/v1/me/tracks"
102+
while url:
103+
response = requests.get(url, headers=headers)
104+
response.raise_for_status()
105+
data = response.json()
106+
for item in data["items"]:
107+
track = item["track"]
108+
yield Row(
109+
id=track["id"],
110+
name=track["name"],
111+
artists=[artist["name"] for artist in track["artists"]],
112+
album=track["album"]["name"],
113+
duration_ms=track["duration_ms"],
114+
popularity=track["popularity"],
115+
added_at=item["added_at"],
116+
)
117+
url = data.get("next")
118+
else:
119+
raise ValueError(f"Unsupported type: {self.type}")
120+
121+
def _get_access_token(self):
122+
response = requests.post(
123+
"https://accounts.spotify.com/api/token",
124+
data={"grant_type": "refresh_token", "refresh_token": self.refresh_token},
125+
auth=(self.client_id, self.client_secret),
126+
)
127+
response.raise_for_status()
128+
return response.json()["access_token"]
129+

tests/test_spotify.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""
2+
Unit tests for the Spotify data source.
3+
4+
These tests focus on the SpotifyReader class directly to avoid issues with
5+
SparkSession creation in certain test environments. This approach allows for
6+
testing the core logic of the data source, including authentication and data
7+
parsing, without a full SparkSession.
8+
"""
9+
import pytest
10+
from unittest.mock import patch, MagicMock
11+
import requests
12+
from pyspark.sql import Row
13+
from pyspark_datasources.spotify import SpotifyReader
14+
15+
@patch("pyspark_datasources.spotify.requests.post")
16+
@patch("pyspark_datasources.spotify.requests.get")
17+
def test_spotify_reader_success(mock_get, mock_post):
18+
# Mock the response from the token endpoint
19+
mock_post.return_value.json.return_value = {"access_token": "test_token"}
20+
mock_post.return_value.raise_for_status = MagicMock()
21+
22+
# Mock the response from the tracks endpoint (with pagination)
23+
mock_get.side_effect = [
24+
MagicMock(
25+
json=lambda: {
26+
"items": [
27+
{
28+
"added_at": "2025-10-16T12:00:00Z",
29+
"track": {
30+
"id": "track1",
31+
"name": "Test Track 1",
32+
"artists": [{"name": "Artist 1"}],
33+
"album": {"name": "Album 1"},
34+
"duration_ms": 200000,
35+
"popularity": 50,
36+
},
37+
}
38+
],
39+
"next": "https://api.spotify.com/v1/me/tracks?offset=1&limit=1",
40+
},
41+
raise_for_status=MagicMock(),
42+
),
43+
MagicMock(
44+
json=lambda: {
45+
"items": [
46+
{
47+
"added_at": "2025-10-16T12:05:00Z",
48+
"track": {
49+
"id": "track2",
50+
"name": "Test Track 2",
51+
"artists": [{"name": "Artist 2"}],
52+
"album": {"name": "Album 2"},
53+
"duration_ms": 220000,
54+
"popularity": 60,
55+
},
56+
}
57+
],
58+
"next": None,
59+
},
60+
raise_for_status=MagicMock(),
61+
),
62+
]
63+
64+
options = {
65+
"spotify.client.id": "test_id",
66+
"spotify.client.secret": "test_secret",
67+
"spotify.refresh.token": "test_refresh",
68+
}
69+
reader = SpotifyReader(options)
70+
rows = list(reader.read(None))
71+
72+
assert len(rows) == 2
73+
assert rows[0]["name"] == "Test Track 1"
74+
assert rows[1]["name"] == "Test Track 2"
75+
76+
def test_spotify_reader_missing_credentials():
77+
with pytest.raises(ValueError, match="must be specified in options"):
78+
SpotifyReader({})
79+
80+
@patch("pyspark_datasources.spotify.requests.post")
81+
@patch("pyspark_datasources.spotify.requests.get")
82+
def test_spotify_reader_api_error(mock_get, mock_post):
83+
mock_post.return_value.json.return_value = {"access_token": "test_token"}
84+
mock_post.return_value.raise_for_status = MagicMock()
85+
86+
mock_get.return_value.raise_for_status.side_effect = requests.exceptions.HTTPError(
87+
"401 Client Error: Unauthorized for url"
88+
)
89+
90+
options = {
91+
"spotify.client.id": "test_id",
92+
"spotify.client.secret": "test_secret",
93+
"spotify.refresh.token": "test_refresh",
94+
}
95+
reader = SpotifyReader(options)
96+
with pytest.raises(requests.exceptions.HTTPError):
97+
list(reader.read(None))

0 commit comments

Comments
 (0)