Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 211 additions & 62 deletions streamrip/client/deezer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import hashlib
import logging

import re

import deezer
from deezer.errors import DataException
from Cryptodome.Cipher import AES

from ..config import Config
Expand Down Expand Up @@ -35,12 +38,25 @@ class DeezerClient(Client):
max_quality = 2

def __init__(self, config: Config):
"""
Initializes the DeezerClient.

Args:
config (Config): The configuration object.
"""
self.global_config = config
self.client = deezer.Deezer()
self.logged_in = False
self.config = config.session.deezer

async def login(self):
"""
Logs in to Deezer using the ARL (Authentication Request Login) token.

Raises:
MissingCredentialsError: If the ARL is missing from the config.
AuthenticationError: If login fails.
"""
# Used for track downloads
self.session = await self.get_session(
verify_ssl=self.global_config.session.downloads.verify_ssl
Expand All @@ -54,6 +70,19 @@ async def login(self):
self.logged_in = True

async def get_metadata(self, item_id: str, media_type: str) -> dict:
"""
Fetches metadata for a given item.

Args:
item_id (str): The ID of the item.
media_type (str): The type of media (track, album, playlist, artist).

Returns:
dict: The metadata of the item.

Raises:
Exception: If the media type is invalid.
"""
# TODO: open asyncio PR to deezer py and integrate
if media_type == "track":
return await self.get_track(item_id)
Expand All @@ -67,54 +96,152 @@ async def get_metadata(self, item_id: str, media_type: str) -> dict:
raise Exception(f"Media type {media_type} not available on deezer")

async def get_track(self, item_id: str) -> dict:
"""
Fetches metadata for a track.

Args:
item_id (str): The track ID.

Returns:
dict: The track metadata.

Raises:
NonStreamableError: If the track is not streamable.
"""
try:
item = await asyncio.to_thread(self.client.api.get_track, item_id)
except Exception as e:
raise NonStreamableError(e)

album_id = item["album"]["id"]
try:
album_metadata, album_tracks = await asyncio.gather(
asyncio.to_thread(self.client.api.get_album, album_id),
asyncio.to_thread(self.client.api.get_album_tracks, album_id),
)
# reuse get_album to handle redirects
album_metadata = await self.get_album(str(album_id))
except Exception as e:
logger.error(f"Error fetching album of track {item_id}: {e}")
return item

album_metadata["tracks"] = album_tracks["data"]
album_metadata["track_total"] = len(album_tracks["data"])
item["album"] = album_metadata

return item

async def get_album(self, item_id: str) -> dict:
album_metadata, album_tracks = await asyncio.gather(
asyncio.to_thread(self.client.api.get_album, item_id),
asyncio.to_thread(self.client.api.get_album_tracks, item_id),
)
"""
Fetches metadata for an album.

Args:
item_id (str): The album ID.

Returns:
dict: The album metadata.
"""
try:
album_metadata, album_tracks = await asyncio.gather(
asyncio.to_thread(self.client.api.get_album, item_id),
asyncio.to_thread(self.client.api.get_album_tracks, item_id),
)
except DataException:
new_id = await self._resolve_redirect("album", item_id)
if new_id:
return await self.get_album(new_id)
raise

album_metadata["tracks"] = album_tracks["data"]
album_metadata["track_total"] = len(album_tracks["data"])
return album_metadata

async def _resolve_redirect(self, media_type: str, item_id: str) -> str | None:
"""
Resolves potential Deezer redirects to find the actual item ID.

Returns:
The new ID if a redirect occurred, otherwise None.
"""
url = f"https://www.deezer.com/{media_type}/{item_id}"

try:
# Perform a HEAD request to follow redirects without downloading content
async with self.session.head(url, allow_redirects=True) as response:
final_url = str(response.url)
except Exception as e:
logger.warning(f"Failed to resolve redirect for {item_id}: {e}")
return None

# If the URL hasn't changed, return early
if final_url == url:
return None

# Attempt to extract the new ID from the final URL
match = re.search(rf"/{media_type}/(\d+)", final_url)

# If a valid new ID is found and it differs from the original
if match and (new_id := match.group(1)) != item_id:
logger.info(f"Resolved redirect for {media_type} {item_id} -> {new_id}")
return new_id

return None

async def get_playlist(self, item_id: str) -> dict:
pl_metadata, pl_tracks = await asyncio.gather(
asyncio.to_thread(self.client.api.get_playlist, item_id),
asyncio.to_thread(self.client.api.get_playlist_tracks, item_id),
)
"""
Fetches metadata for a playlist.

Args:
item_id (str): The playlist ID.

Returns:
dict: The playlist metadata.
"""
try:
pl_metadata, pl_tracks = await asyncio.gather(
asyncio.to_thread(self.client.api.get_playlist, item_id),
asyncio.to_thread(self.client.api.get_playlist_tracks, item_id),
)
except DataException:
new_id = await self._resolve_redirect("playlist", item_id)
if new_id:
return await self.get_playlist(new_id)
raise

pl_metadata["tracks"] = pl_tracks["data"]
pl_metadata["track_total"] = len(pl_tracks["data"])
return pl_metadata

async def get_artist(self, item_id: str) -> dict:
artist, albums = await asyncio.gather(
asyncio.to_thread(self.client.api.get_artist, item_id),
asyncio.to_thread(self.client.api.get_artist_albums, item_id),
)
"""
Fetches metadata for an artist.

Args:
item_id (str): The artist ID.

Returns:
dict: The artist metadata.
"""
try:
artist, albums = await asyncio.gather(
asyncio.to_thread(self.client.api.get_artist, item_id),
asyncio.to_thread(self.client.api.get_artist_albums, item_id),
)
except DataException:
new_id = await self._resolve_redirect("artist", item_id)
if new_id:
return await self.get_artist(new_id)
raise

artist["albums"] = albums["data"]
return artist

async def search(self, media_type: str, query: str, limit: int = 200) -> list[dict]:
"""
Searches for items on Deezer.

Args:
media_type (str): The type of media to search for.
query (str): The search query.
limit (int): The maximum number of results to return.

Returns:
list[dict]: A list of search results.
"""
# TODO: use limit parameter
if media_type == "featured":
try:
Expand All @@ -141,73 +268,84 @@ async def get_downloadable(
quality: int = 2,
is_retry: bool = False,
) -> DeezerDownloadable:
"""
Prepares a downloadable object for a track.

Args:
item_id (str): The track ID.
quality (int): The desired quality (0, 1, or 2).
is_retry (bool): whether this is a retry attempt (internal use).

Returns:
DeezerDownloadable: The downloadable object.

Raises:
NonStreamableError: If the track cannot be streamed.
"""
if item_id is None:
raise NonStreamableError(
"No item id provided. This can happen when searching for fallback songs.",
)
# TODO: optimize such that all of the ids are requested at once
dl_info: dict = {"quality": quality, "id": item_id}

track_info = self.client.gw.get_track(item_id)
# Ensure quality is within bounds [0, 2]
quality = max(0, min(quality, 2))

# TODO: optimize such that all of the ids are requested at once
track_info = self.client.gw.get_track(item_id)
fallback_id = track_info.get("FALLBACK", {}).get("SNG_ID")

# Mapping internal quality levels to Deezer API formats
# We list them in descending order to facilitate the fallback loop
quality_map = [
(9, "MP3_128"), # quality 0
(3, "MP3_320"), # quality 1
(1, "FLAC"), # quality 2
]
size_map = [
int(track_info.get(f"FILESIZE_{format}", 0)) for _, format in quality_map
(1, "FLAC"), # quality 2
]
dl_info["quality_to_size"] = size_map

# Check if requested quality is available
if size_map[quality] == 0:
if self.config.lower_quality_if_not_available:
# Fallback to lower quality
while size_map[quality] == 0 and quality > 0:
logger.warning(
"The requested quality %s is not available. Falling back to quality %s",
quality,
quality - 1,
)
quality -= 1
else:
# No fallback - raise error
raise NonStreamableError(
f"The requested quality {quality} is not available and fallback is disabled."
)

# Update the quality in dl_info to reflect the final quality used
dl_info["quality"] = quality

_, format_str = quality_map[quality]
# Pre-calculate file sizes for metadata
dl_info: dict = {"quality": quality, "id": item_id}
dl_info["quality_to_size"] = [
int(track_info.get(f"FILESIZE_{fmt}", 0)) for _, fmt in quality_map
]

token = track_info["TRACK_TOKEN"]
try:
logger.debug("Fetching deezer url with token %s", token)
url = self.client.get_track_url(token, format_str)
except deezer.WrongLicense:
raise NonStreamableError(
"The requested quality is not available with your subscription. "
"Deezer HiFi is required for quality 2. Otherwise, the maximum "
"quality allowed is 1.",
)
except deezer.WrongGeolocation:
if not is_retry and fallback_id:
return await self.get_downloadable(fallback_id, quality, is_retry=True)
raise NonStreamableError(
"The requested track is not available. This may be due to your country/location.",
)
url = None
final_quality = quality

# --- START OF FALLBACK LOOP ---
# We try from the requested quality down to 0 (MP3_128)
for q_level in range(quality, -1, -1):
_, format_str = quality_map[q_level]

try:
logger.debug(f"Attempting to fetch URL for quality {q_level} ({format_str})")
url = self.client.get_track_url(token, format_str)
if url:
final_quality = q_level
break # Success!
except deezer.WrongLicense:
logger.warning(f"Quality {q_level} not available for this account. Trying lower...")
continue
except deezer.WrongGeolocation:
if not is_retry and fallback_id:
logger.info(f"Geoblocked. Trying fallback ID: {fallback_id}")
return await self.get_downloadable(fallback_id, quality, is_retry=True)
raise NonStreamableError("Track geoblocked and no fallback available.")
# --- END OF FALLBACK LOOP ---

# If no URL was found through the official API, try the legacy encrypted method
if url is None:
logger.debug("Official API failed, trying encrypted file URL.")
url = self._get_encrypted_file_url(
item_id,
track_info["MD5_ORIGIN"],
track_info["MEDIA_VERSION"],
)

if not url:
raise NonStreamableError(f"Could not retrieve any download URL for track {item_id}")

dl_info["quality"] = final_quality
dl_info["url"] = url
logger.debug("dz track info: %s", track_info)
return DeezerDownloadable(self.session, dl_info)
Expand All @@ -218,6 +356,17 @@ def _get_encrypted_file_url(
track_hash: str,
media_version: str,
):
"""
Generates an encrypted file URL for a track when the standard API fails.

Args:
meta_id (str): The metadata ID.
track_hash (str): The track hash.
media_version (str): The media version.

Returns:
str: The encrypted file URL.
"""
logger.debug("Unable to fetch URL. Trying encryption method.")
format_number = 1

Expand Down