-
Notifications
You must be signed in to change notification settings - Fork 22
feat: Add Spotify data source #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -165,3 +165,4 @@ claude_cache/ | |
|
|
||
| # Gemini | ||
| .gemini/ | ||
| .vscode | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
|
|
||
| # Spotify | ||
|
|
||
| The Spotify data source allows you to read data from the Spotify API as a Spark DataFrame. Currently, it supports reading a user's saved tracks. | ||
|
|
||
| ## Authentication | ||
|
|
||
| To use the Spotify data source, you need to authenticate with the Spotify API. This is done using OAuth 2.0. You will need to provide your Client ID, Client Secret, and a Refresh Token. | ||
|
|
||
| ### 1. Create a Spotify Developer App | ||
|
|
||
| 1. Go to the [Spotify Developer Dashboard](https://developer.spotify.com/dashboard) and log in. | ||
| 2. Click on "Create an App". | ||
| 3. Give your app a name and description, and agree to the terms. | ||
| 4. Once the app is created, you will see your **Client ID** and **Client Secret**. Copy these values. | ||
| 5. Click on "Edit Settings" and add a **Redirect URI**. For the purpose of generating a refresh token, you can use `https://example.com/callback`. Click "Save". | ||
|
|
||
| ### 2. Generate a Refresh Token | ||
|
|
||
| A refresh token is a long-lived credential that can be used to obtain new access tokens. You only need to generate this once. | ||
|
|
||
| To generate a refresh token, you can use the following Python script. You will need to install the `spotipy` library (`pip install spotipy`). | ||
|
|
||
| ```python | ||
| import spotipy | ||
| from spotipy.oauth2 import SpotifyOAuth | ||
|
|
||
| # Replace with your Client ID, Client Secret, and Redirect URI | ||
| CLIENT_ID = "YOUR_CLIENT_ID" | ||
| CLIENT_SECRET = "YOUR_CLIENT_SECRET" | ||
| REDIRECT_URI = "https://example.com/callback" | ||
|
|
||
| # The scope determines what permissions your app is requesting. | ||
| # For reading saved tracks, you need the 'user-library-read' scope. | ||
| SCOPE = "user-library-read" | ||
|
|
||
| auth_manager = SpotifyOAuth( | ||
| client_id=CLIENT_ID, | ||
| client_secret=CLIENT_SECRET, | ||
| redirect_uri=REDIRECT_URI, | ||
| scope=SCOPE, | ||
| open_browser=True | ||
| ) | ||
|
|
||
| # This will open a browser window for you to log in and authorize the app. | ||
| # After you authorize, you will be redirected to the redirect URI. | ||
| # The URL of the redirected page will contain a 'code' parameter. | ||
| # Copy the entire URL and paste it into the terminal. | ||
|
|
||
| auth_manager.get_access_token(as_dict=False) | ||
|
|
||
| # The refresh token will be printed to the console. | ||
| # It will also be saved in a file named .cache in your current directory. | ||
|
|
||
| print(f"Refresh Token: {auth_manager.get_cached_token()['refresh_token']}") | ||
|
|
||
| ``` | ||
|
|
||
| Run this script, and it will guide you through the authorization process. At the end, it will print your refresh token. **Save this token securely.** | ||
|
|
||
| ## Usage | ||
|
|
||
| Once you have your credentials, you can use the Spotify data source in PySpark. | ||
|
|
||
| ```python | ||
| from pyspark.sql import SparkSession | ||
|
|
||
| # Create a SparkSession | ||
| spark = SparkSession.builder.appName("SpotifyExample").getOrCreate() | ||
|
|
||
| # Register the data source | ||
| from pyspark_datasources.spotify import SpotifyDataSource | ||
| spark.dataSource.register(SpotifyDataSource) | ||
|
|
||
| # Load your saved tracks | ||
| df = spark.read.format("spotify") \ | ||
| .option("spotify.client.id", "YOUR_CLIENT_ID") \ | ||
| .option("spotify.client.secret", "YOUR_CLIENT_SECRET") \ | ||
| .option("spotify.refresh.token", "YOUR_REFRESH_TOKEN") \ | ||
| .option("type", "tracks") \ | ||
| .load() | ||
|
|
||
| # Show the data | ||
| df.show() | ||
| ``` | ||
|
|
||
| ## Schema | ||
|
|
||
| The schema for the `tracks` type is as follows: | ||
|
|
||
| | Field | Type | | ||
| |-------------|-----------------| | ||
| | id | `string` | | ||
| | name | `string` | | ||
| | artists | `array<string>` | | ||
| | album | `string` | | ||
| | duration_ms | `long` | | ||
| | popularity | `integer` | | ||
| | added_at | `string` | | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
|
|
||
| from pyspark.sql import SparkSession | ||
|
|
||
| # This is an example of how to use the Spotify data source. | ||
| # Before running this, make sure you have followed the authentication | ||
| # instructions in docs/datasources/spotify.md to get your credentials. | ||
|
|
||
| # Create a SparkSession | ||
| spark = SparkSession.builder.appName("SpotifyExample").getOrCreate() | ||
|
|
||
| # Register the data source | ||
| from pyspark_datasources.spotify import SpotifyDataSource | ||
| spark.dataSource.register(SpotifyDataSource) | ||
|
|
||
| # Replace with your actual credentials | ||
| CLIENT_ID = "YOUR_CLIENT_ID" | ||
| CLIENT_SECRET = "YOUR_CLIENT_SECRET" | ||
| REFRESH_TOKEN = "YOUR_REFRESH_TOKEN" | ||
|
|
||
| # Load your saved tracks | ||
| try: | ||
| df = ( | ||
| spark.read.format("spotify") | ||
| .option("spotify.client.id", CLIENT_ID) | ||
| .option("spotify.client.secret", CLIENT_SECRET) | ||
| .option("spotify.refresh.token", REFRESH_TOKEN) | ||
| .option("type", "tracks") | ||
| .load() | ||
| ) | ||
|
|
||
| # Show the data | ||
| df.show() | ||
|
|
||
| # Print the schema | ||
| df.printSchema() | ||
|
|
||
| except Exception as e: | ||
| print(f"An error occurred: {e}") | ||
| print("Please ensure you have replaced the placeholder credentials with your actual credentials.") | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
|
|
||
| import requests | ||
| from pyspark.sql import Row | ||
| from pyspark.sql.datasource import DataSource, DataSourceReader | ||
| from pyspark.sql.types import ( | ||
| StructType, | ||
| StructField, | ||
| StringType, | ||
| IntegerType, | ||
| LongType, | ||
| ArrayType, | ||
| ) | ||
|
|
||
|
|
||
| class SpotifyDataSource(DataSource): | ||
| """ | ||
| A DataSource for reading data from Spotify. | ||
|
|
||
| Name: `spotify` | ||
|
|
||
| The `type` option is used to specify what data to load. Currently, only | ||
| the `tracks` type is supported, which loads the user's saved songs. | ||
|
|
||
| Schema for `tracks` type: | ||
| - `id`: string | ||
| - `name`: string | ||
| - `artists`: array<string> | ||
| - `album`: string | ||
| - `duration_ms`: long | ||
| - `popularity`: integer | ||
| - `added_at`: string | ||
|
|
||
| Examples: | ||
| --------- | ||
| Register the data source. | ||
|
|
||
| >>> from pyspark_datasources import SpotifyDataSource | ||
| >>> spark.dataSource.register(SpotifyDataSource) | ||
|
|
||
| Load your saved tracks from Spotify. | ||
|
|
||
| >>> df = ( | ||
| ... spark.read.format("spotify") | ||
| ... .option("spotify.client.id", "YOUR_CLIENT_ID") | ||
| ... .option("spotify.client.secret", "YOUR_CLIENT_SECRET") | ||
| ... .option("spotify.refresh.token", "YOUR_REFRESH_TOKEN") | ||
| ... .option("type", "tracks") | ||
| ... .load() | ||
| ... ) | ||
| >>> df.show() | ||
| +----------------------+----------------+------------------+---------+-----------+----------+--------------------+ | ||
| |id |name |artists |album |duration_ms|popularity|added_at | | ||
| +----------------------+----------------+------------------+---------+-----------+----------+--------------------+ | ||
| |1BxfuPKGuaTgP7aM0B... |All Too Well |[Taylor Swift] |Red |329466 |82 |2025-10-16T12:00:00Z| | ||
| |... |... |... |... |... |... |... | | ||
| +----------------------+----------------+------------------+---------+-----------+----------+--------------------+ | ||
| """ | ||
|
|
||
| @classmethod | ||
| def name(cls): | ||
| return "spotify" | ||
|
|
||
| def schema(self): | ||
| # Simplified schema for tracks | ||
| return StructType( | ||
| [ | ||
| StructField("id", StringType(), True), | ||
| StructField("name", StringType(), True), | ||
| StructField( | ||
| "artists", ArrayType(StringType(), True), True | ||
| ), | ||
| StructField("album", StringType(), True), | ||
| StructField("duration_ms", LongType(), True), | ||
| StructField("popularity", IntegerType(), True), | ||
| StructField("added_at", StringType(), True), | ||
| ] | ||
| ) | ||
|
|
||
| def reader(self, schema): | ||
| return SpotifyReader(self.options) | ||
|
|
||
|
|
||
| class SpotifyReader(DataSourceReader): | ||
| def __init__(self, options): | ||
| self.options = options | ||
| self.client_id = self.options.get("spotify.client.id") | ||
| self.client_secret = self.options.get("spotify.client.secret") | ||
| self.refresh_token = self.options.get("spotify.refresh.token") | ||
| self.type = self.options.get("type", "tracks") | ||
|
|
||
| if not all([self.client_id, self.client_secret, self.refresh_token]): | ||
| raise ValueError( | ||
| "spotify.client.id, spotify.client.secret, and spotify.refresh.token must be specified in options" | ||
| ) | ||
|
|
||
| def read(self, partition): | ||
| access_token = self._get_access_token() | ||
| headers = {"Authorization": f"Bearer {access_token}"} | ||
|
|
||
| if self.type == "tracks": | ||
| url = "https://api.spotify.com/v1/me/tracks" | ||
| while url: | ||
| response = requests.get(url, headers=headers) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| for item in data["items"]: | ||
|
Comment on lines
+103
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add timeouts to external HTTP calls to avoid indefinite hangs. Both requests lack timeouts. This is a reliability and production-safety issue. Apply: @@
- response = requests.get(url, headers=headers)
+ response = requests.get(url, headers=headers, timeout=(3.05, 30))
@@
- response = requests.post(
+ response = requests.post(
"https://accounts.spotify.com/api/token",
data={"grant_type": "refresh_token", "refresh_token": self.refresh_token},
auth=(self.client_id, self.client_secret),
- )
+ timeout=(3.05, 30),
+ )Optionally, hoist the timeout to a module constant (e.g., DEFAULT_TIMEOUT) for reuse. Based on learnings Also applies to: 122-128 🧰 Tools🪛 Ruff (0.14.0)103-103: Probable use of (S113) 🤖 Prompt for AI Agents |
||
| track = item["track"] | ||
| yield Row( | ||
| id=track["id"], | ||
| name=track["name"], | ||
| artists=[artist["name"] for artist in track["artists"]], | ||
| album=track["album"]["name"], | ||
| duration_ms=track["duration_ms"], | ||
| popularity=track["popularity"], | ||
| added_at=item["added_at"], | ||
| ) | ||
| url = data.get("next") | ||
| else: | ||
| raise ValueError(f"Unsupported type: {self.type}") | ||
|
Comment on lines
+96
to
+119
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add validation for API response structure to prevent KeyError. The code directly accesses nested JSON fields without validation. If the Spotify API returns a malformed response or missing fields, this will raise Consider adding defensive checks: for item in data["items"]:
track = item["track"]
+ # Validate required fields
+ required_fields = ["id", "name", "artists", "album", "duration_ms", "popularity"]
+ if not all(field in track for field in required_fields):
+ continue # Skip malformed tracks
+ if "name" not in track.get("album", {}):
+ continue
+
yield Row(
id=track["id"],
name=track["name"],
artists=[artist["name"] for artist in track["artists"]],
album=track["album"]["name"],
duration_ms=track["duration_ms"],
popularity=track["popularity"],
- added_at=item["added_at"],
+ added_at=item.get("added_at"),
)Alternatively, wrap the access in a try-except block to handle malformed responses gracefully. 🧰 Tools🪛 Ruff (0.14.1)96-96: Unused method argument: (ARG002) 103-103: Probable use of (S113) 119-119: Avoid specifying long messages outside the exception class (TRY003) 🤖 Prompt for AI Agents |
||
|
|
||
| def _get_access_token(self): | ||
| response = requests.post( | ||
| "https://accounts.spotify.com/api/token", | ||
| data={"grant_type": "refresh_token", "refresh_token": self.refresh_token}, | ||
| auth=(self.client_id, self.client_secret), | ||
| ) | ||
| response.raise_for_status() | ||
| return response.json()["access_token"] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| """ | ||
| Unit tests for the Spotify data source. | ||
|
|
||
| These tests focus on the SpotifyReader class directly to avoid issues with | ||
| SparkSession creation in certain test environments. This approach allows for | ||
| testing the core logic of the data source, including authentication and data | ||
| parsing, without a full SparkSession. | ||
| """ | ||
| import pytest | ||
| from unittest.mock import patch, MagicMock | ||
| import requests | ||
| from pyspark.sql import Row | ||
| from pyspark_datasources.spotify import SpotifyReader | ||
|
|
||
| @patch("pyspark_datasources.spotify.requests.post") | ||
| @patch("pyspark_datasources.spotify.requests.get") | ||
| def test_spotify_reader_success(mock_get, mock_post): | ||
| # Mock the response from the token endpoint | ||
| mock_post.return_value.json.return_value = {"access_token": "test_token"} | ||
| mock_post.return_value.raise_for_status = MagicMock() | ||
|
|
||
| # Mock the response from the tracks endpoint (with pagination) | ||
| mock_get.side_effect = [ | ||
| MagicMock( | ||
| json=lambda: { | ||
| "items": [ | ||
| { | ||
| "added_at": "2025-10-16T12:00:00Z", | ||
| "track": { | ||
| "id": "track1", | ||
| "name": "Test Track 1", | ||
| "artists": [{"name": "Artist 1"}], | ||
| "album": {"name": "Album 1"}, | ||
| "duration_ms": 200000, | ||
| "popularity": 50, | ||
| }, | ||
| } | ||
| ], | ||
| "next": "https://api.spotify.com/v1/me/tracks?offset=1&limit=1", | ||
| }, | ||
| raise_for_status=MagicMock(), | ||
| ), | ||
| MagicMock( | ||
| json=lambda: { | ||
| "items": [ | ||
| { | ||
| "added_at": "2025-10-16T12:05:00Z", | ||
| "track": { | ||
| "id": "track2", | ||
| "name": "Test Track 2", | ||
| "artists": [{"name": "Artist 2"}], | ||
| "album": {"name": "Album 2"}, | ||
| "duration_ms": 220000, | ||
| "popularity": 60, | ||
| }, | ||
| } | ||
| ], | ||
| "next": None, | ||
| }, | ||
| raise_for_status=MagicMock(), | ||
| ), | ||
| ] | ||
|
|
||
| options = { | ||
| "spotify.client.id": "test_id", | ||
| "spotify.client.secret": "test_secret", | ||
| "spotify.refresh.token": "test_refresh", | ||
| } | ||
| reader = SpotifyReader(options) | ||
| rows = list(reader.read(None)) | ||
|
|
||
| assert len(rows) == 2 | ||
| assert rows[0]["name"] == "Test Track 1" | ||
| assert rows[1]["name"] == "Test Track 2" | ||
|
|
||
| def test_spotify_reader_missing_credentials(): | ||
| with pytest.raises(ValueError, match="must be specified in options"): | ||
| SpotifyReader({}) | ||
|
|
||
| @patch("pyspark_datasources.spotify.requests.post") | ||
| @patch("pyspark_datasources.spotify.requests.get") | ||
| def test_spotify_reader_api_error(mock_get, mock_post): | ||
| mock_post.return_value.json.return_value = {"access_token": "test_token"} | ||
| mock_post.return_value.raise_for_status = MagicMock() | ||
|
|
||
| mock_get.return_value.raise_for_status.side_effect = requests.exceptions.HTTPError( | ||
| "401 Client Error: Unauthorized for url" | ||
| ) | ||
|
|
||
| options = { | ||
| "spotify.client.id": "test_id", | ||
| "spotify.client.secret": "test_secret", | ||
| "spotify.refresh.token": "test_refresh", | ||
| } | ||
| reader = SpotifyReader(options) | ||
| with pytest.raises(requests.exceptions.HTTPError): | ||
| list(reader.read(None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix import path in docstring example.
The example should import from the module, not the package root.
📝 Committable suggestion
🤖 Prompt for AI Agents