-
Notifications
You must be signed in to change notification settings - Fork 0
/
generator_engine.py
316 lines (281 loc) · 11.5 KB
/
generator_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#!/usr/bin/env python3
"""
Replay Mix+ by soreikomori
https://github.com/soreikomori/ReplayMixPlus
"""
from ytmusicapi import YTMusic
import time
import pylast
import logging
import json_tools as jt
############### PRERUN FUNCTIONS ###############
logger = logging.getLogger('rpmplusLogger')
yt = YTMusic("oauth.json")
############### SETUP FUNCTIONS ###############
def lastFmNetworkConnect():
"""
Opens the .json with API secret info and loads it into a lastFm variable
to operate with the lastFM Network. Code inspired from @akraus53 on Github. Kudos!
Returns
-------
pylast.User
The userSelf object that will be used to fetch the top tracks and recent tracks.
"""
logger.info("Connecting to Last.FM Network...")
lastFmCreds = jt.loadJson("lastfmcreds.json")
network = pylast.LastFMNetwork(api_key=lastFmCreds['apikey'], api_secret=lastFmCreds['apisecret'],
username=lastFmCreds['username'],
password_hash=lastFmCreds['password'])
userSelf = network.get_user(lastFmCreds['username'])
logger.info("Connected to Last.FM Network, returning UserSelf.")
return userSelf
############### IMPORT AND FETCHES ###############
def fetchTopTracks(userSelf):
"""
Fetches the top tracks for a certain "period" of time for the user. Saves these in a list of TopItems.
Note that period and limit have configurable value. Configure them as you like.
Parameters
----------
userSelf : pylast.User
The userSelf function that lastFmNetworkConnect() returns.
Returns
-------
list
A list of TopItem objects.
"""
# NOTES FOR PERIOD
# Accepted values: overall | 7day | 1month | 3month | 6month | 12month
period = "7day"
limit = 200
logger.info("Fetching top tracks from Last.FM...")
return userSelf.get_top_tracks(period=period,limit=limit)
def fetchRecentTracks(userSelf):
"""
Fetches all last.fm scrobbles for the past 7 days.
Do note that this takes rather long.
Parameters
----------
userSelf : pylast.User
The userSelf function that lastFmNetworkConnect() returns.
Returns
-------
list
A list of PlayedTrack objects.
"""
#So far, this only supports 7 days according to the calculation below. I plan to add more days at a later version if there is demand for other timeframes.
sevenDaysAgo = round(time.time() - (7 * 24 * 60 * 60))
logger.info("Fetching recent tracks from Last.FM...")
return userSelf.get_recent_tracks(time_from=sevenDaysAgo, limit=None)
############### MASTERLIST CREATION ###############
def createMasterList():
"""
Creates the main list that will be put into YTM later. It works by getting all the info from other functions and then returning the list sorted by score.
Returns
-------
list
A list of dictionaries, each containing the track's title, artist, YTM ID and score. The list is sorted by score in descending order.
"""
# Declarations
masterList = []
# Imports
userSelf = lastFmNetworkConnect()
topTracks = fetchTopTracks(userSelf)
recentTracks = fetchRecentTracks(userSelf)
maxScrobbles = topTracks[0]._asdict()["weight"]
maxRepetitions = max(repetitionChecker(track.track.get_title(),recentTracks) for track in recentTracks)
uniqueIds = []
# Engine
logger.info("Creating MasterList...")
for track in topTracks:
tiAsDict = track._asdict()
scrobbles = tiAsDict["weight"]
title = tiAsDict["item"].get_title()
logger.debug(f"MASTERLIST - Processing last.fm track \"{title}\"")
artist = tiAsDict["item"].get_artist().get_name()
# Currently the value in "artist" has no use, but I still leave it to make the dictionary more understandable. Feel free to remove it from here and from checkYTMId().
lastPlayed = lastPlayedChecker(title, recentTracks)
repetitions = repetitionChecker(title, recentTracks)
score = calcScore(scrobbles, lastPlayed, repetitions, maxScrobbles, maxRepetitions)
ytmId = checkYTMId(title, artist)
logger.debug("Track: " + title + " | Score: " + str(score))
if ytmId != None and ytmId not in uniqueIds:
uniqueIds.append(ytmId)
# Declare Dictionary
trackDict = {
"title": title,
# "artist": artist,
"ytmid": ytmId,
"score": score
}
masterList.append(trackDict)
#### PLAYLIST SIZE
# You can change this if you want your playlist to be bigger or smaller.
# CHANGE THIS LINE -----
if len(masterList) == 100:
break
# ---------------------
logger.debug("MASTERLIST:")
for track in masterList:
logger.debug("Track fm Title: " + track["title"] + " | Score: " + str(track["score"]))
logger.debug("-----------------")
logger.info("MasterList created.")
return sorted(masterList, key=lambda x: x["score"], reverse=True)
def checkYTMId(title, artist):
"""
Does a cross-check between last.fm and YTM to find the YTM ID of the specific track to be added to the playlist.
Parameters
----------
title : str
The title of the track.
artist : str
The artist of the track. Currently unused.
Returns
-------
str or None
The videoId of the track in the compendium. None if the track is not found.
"""
compendium = jt.loadJson("ytm_compendium.json")
for track in compendium:
""" NOTE The commented line below checks if a given artist exists in a YTM Track's "artists" value.
It's currently unused because artists between YTM and last.fm are a lot more
prone to errors unlike titles (especially when working with multiple artists)
and thus I decided to not use it.
If you're looking at my code and want to add more verifications for each track to make
sure the program gets the right one (for example if you have multiple tracks with the
same name) then maybe this can help you.
"""
# artistPresent = any(artist == artists['name'] for artists in track.get('artists', []))
compendiumTitle = track["title"].lower()
noFeatureTitle = compendiumTitle.split(" (feat.")[0].lower()
if title.lower() == compendiumTitle or title.lower() == noFeatureTitle:
if title.lower() != compendiumTitle:
logger.debug("NOTE - Track matched by title without (feat.)")
logger.debug(f"fmtitle: \"{title.lower()}\" - compendiumTitle: \"{compendiumTitle}\" - noFeatureTitle: \"{noFeatureTitle}\"")
return track["videoId"]
logger.error("Could not find the track \"" + title + "\" in the Compendium.")
return None
############### SCORE CALCULATION ###############
def repetitionChecker(title, recentTracks):
"""
Checks the maximum amount of repetitions (uninterrupted loops) for a given track.
Parameters
----------
title : str
The title of the track.
recentTracks : list
A list of recent tracks in the selected timestamp, generated by fetchRecentTracks.
Returns
-------
int
The maximum amount of repetitions for the given track. Returns 0 if the title is never found in recentTracks.
"""
scrobbled = False
count = 0
max_count = 0
for track in recentTracks:
if track.track.get_title() == title:
scrobbled = True
else:
scrobbled = False
if scrobbled:
count += 1
else:
if max_count < count:
max_count = count
count = 0
return max_count
def lastPlayedChecker(title, recentTracks):
"""
Finds the last time a track was played according to the recent tracks in last.fm.
Parameters
----------
title : str
The title of the track.
recentTracks : list
A list of recent tracks in the selected timestamp, generated by fetchRecentTracks.
Returns
-------
int or None
The unix timestamp of the last time the track was played. None if the title is never found in recentTracks.
"""
for track in recentTracks:
if track.track.get_title() == title:
return track.__getattribute__("timestamp")
return None
def calcScore(scrobbles, lastPlayed, repetitions, maxScrobbles, maxRepetitions):
"""
Calculates the score of a track based on the algorithm in the return line.
Parameters
----------
scroVal : int
The amount of scrobbles the track has.
lpVal : int
The unix timestamp of the last time the track was found in last.fm.
repetitions : int
The maximum amount of repetitions (uninterrupted loops) the track has.
maxScrobbles : int
The maximum scrobbles of any track in the selected timespan.
maxRepetitions : int
The maximum number of repetitions of any track in the selected timestamp.
Returns
-------
int
The calculated score of the track.
"""
# Calculations
scroVal = scrobbles/maxScrobbles
# 604800 is the amount of seconds in 7 days.
lpVal = (time.time()-int(lastPlayed))/604800
repVal = repetitions/maxRepetitions
#### WEIGHTS
# Note that these parameters are to be changed accordingly to what the user prefers the most.
# I include my personal cocktail of weights, but feel free to change it as you see fit.
# CHANGE THESE LINES -----
scrobbleWeight = 1
recenWeight = 0.7
repetitionWeight = 0.50
# ----------------------
return scrobbleWeight*scroVal + recenWeight*(1-lpVal) + repetitionWeight*repVal
############### YTM IMPLEMENTATION ###############
def recreatePlaylist():
"""
Empties the ReplayMix+ playlist then readds all items in the MasterList in order.
This is the only function that this entire script needs to run to work independently, as it calls all other functions.
It will work as long as a compendium exists, and the playlistId is set in config.json.
"""
logger.info("Playlist Recreation started.")
playlistId = jt.loadJson("config.json")["ytPlaylistId"]
currentTracks = yt.get_playlist(playlistId, None).get("tracks") # type: ignore
videoIdList = []
masterList = createMasterList()
for track in masterList:
logger.debug("Playlist Recreation - Adding track " + track["title"] + " to the playlist.")
videoIdList.append(track["ytmid"])
if len(currentTracks) > 0:
logger.info("Removing current tracks from the playlist. (Playlist was not empty.)")
yt.remove_playlist_items(playlistId, currentTracks)
logger.info("Adding new tracks to the playlist.")
time.sleep(15)
currentTracks = yt.get_playlist(playlistId, None).get("tracks") # type: ignore
while len(currentTracks) == 0:
logger.error("Playlist is empty. Retrying...")
yt.add_playlist_items(playlistId, videoIdList)
time.sleep(15)
currentTracks = yt.get_playlist(playlistId, None).get("tracks")
logger.info("Playlist Recreation finished.")
############### CONSOLE HELPER FUNCTIONS ###############
def createPlaylist(name, desc):
"""
Creates a YTM playlist given a name and a description. Raises an exception if it didn't work.
Parameters
----------
name : str
The title of the playlist.
desc : str
The description of the playlist.
Returns
-------
str
The playlist ID of the newly created playlist.
"""
return yt.create_playlist(name, desc)