mirror of
synced 2025-01-16 11:55:31 +00:00
Removed saveDownloadQueue and tagsLanguage from lib settings Revert embedded cover change Fixed bitrate fallback check Use overwriteFile setting when downloading embedded covers Fixed bitrate fallback not working Fixed some issues to make the lib work Implemented spotify plugin back Better handling of albums upcs Fixed queue item not cancelling correctly Code parity with deemix-js Code cleanup with pylint Even more rework on the library More work on the library (WIP) Total rework of the library (WIP) Some rework done on types Added start queue function Made nextitem work on a thread Removed dz as first parameter Started queuemanager refactoring Removed eventlet Co-authored-by: RemixDev <RemixDev64@gmail.com> Reviewed-on: https://git.freezer.life/RemixDev/deemix-py/pulls/4 Co-Authored-By: RemixDev <remixdev@noreply.localhost> Co-Committed-By: RemixDev <remixdev@noreply.localhost>
347 lines
14 KiB
347 lines
14 KiB
from concurrent.futures import ThreadPoolExecutor
import json
from pathlib import Path
import re
from urllib.request import urlopen
from deemix.plugins import Plugin
from deemix.utils.localpaths import getConfigFolder
from deemix.itemgen import generateTrackItem, generateAlbumItem, GenerationError, TrackNotOnDeezer, AlbumNotOnDeezer
from deemix.types.DownloadObjects import Convertable
import spotipy
SpotifyClientCredentials = spotipy.oauth2.SpotifyClientCredentials
class Spotify(Plugin):
def __init__(self, configFolder=None):
self.credentials = {'clientId': "", 'clientSecret': ""}
self.settings = {
'fallbackSearch': False
self.enabled = False
self.sp = None
self.configFolder = Path(configFolder or getConfigFolder())
self.configFolder /= 'spotify'
def setup(self):
if not self.configFolder.is_dir(): self.configFolder.mkdir()
return self
def parseLink(cls, link):
if 'link.tospotify.com' in link: link = urlopen(link).url
# Remove extra stuff
if '?' in link: link = link[:link.find('?')]
if '&' in link: link = link[:link.find('&')]
if link.endswith('/'): link = link[:-1] # Remove last slash if present
link_type = None
link_id = None
if not 'spotify' in link: return (link, link_type, link_id) # return if not a spotify link
if re.search(r"[/:]track[/:](.+)", link):
link_type = 'track'
link_id = re.search(r"[/:]track[/:](.+)", link).group(1)
elif re.search(r"[/:]album[/:](.+)", link):
link_type = 'album'
link_id = re.search(r"[/:]album[/:](.+)", link).group(1)
elif re.search(r"[/:]playlist[/:](.+)", link):
link_type = 'playlist'
link_id = re.search(r"[/:]playlist[/:](.+)", link).group(1)
return (link, link_type, link_id)
def generateDownloadObject(self, dz, link, bitrate):
(link, link_type, link_id) = self.parseLink(link)
if link_type is None or link_id is None: return None
if link_type == "track":
return self.generateTrackItem(dz, link_id, bitrate)
if link_type == "album":
return self.generateAlbumItem(dz, link_id, bitrate)
if link_type == "playlist":
return self.generatePlaylistItem(dz, link_id, bitrate)
return None
def generateTrackItem(self, dz, link_id, bitrate):
cache = self.loadCache()
if link_id in cache['tracks']:
cachedTrack = cache['tracks'][link_id]
cachedTrack = self.getTrack(link_id)
cache['tracks'][link_id] = cachedTrack
if 'isrc' in cachedTrack:
try: return generateTrackItem(dz, f"isrc:{cachedTrack['isrc']}", bitrate)
except GenerationError: pass
if self.settings['fallbackSearch']:
if 'id' not in cachedTrack or cachedTrack['id'] == "0":
trackID = dz.api.get_track_id_from_metadata(
if trackID != "0":
cachedTrack['id'] = trackID
cache['tracks'][link_id] = cachedTrack
if cachedTrack['id'] != "0": return generateTrackItem(dz, cachedTrack['id'], bitrate)
raise TrackNotOnDeezer(f"https://open.spotify.com/track/{link_id}")
def generateAlbumItem(self, dz, link_id, bitrate):
cache = self.loadCache()
if link_id in cache['albums']:
cachedAlbum = cache['albums'][link_id]
cachedAlbum = self.getAlbum(link_id)
cache['albums'][link_id] = cachedAlbum
try: return generateAlbumItem(dz, f"upc:{cachedAlbum['upc']}", bitrate)
except GenerationError as e: raise AlbumNotOnDeezer(f"https://open.spotify.com/album/{link_id}") from e
def generatePlaylistItem(self, dz, link_id, bitrate):
if not self.enabled: raise Exception("Spotify plugin not enabled")
spotifyPlaylist = self.sp.playlist(link_id)
playlistAPI = self._convertPlaylistStructure(spotifyPlaylist)
playlistAPI.various_artist = dz.api.get_artist(5080) # Useful for save as compilation
tracklistTemp = spotifyPlaylist.track.items
while spotifyPlaylist['tracks']['next']:
spotifyPlaylist['tracks'] = self.sp.next(spotifyPlaylist['tracks'])
tracklistTemp += spotifyPlaylist['tracks']['items']
tracklist = []
for item in tracklistTemp:
if item['track']:
if item['track']['explicit']:
playlistAPI['explicit'] = True
if 'explicit' not in playlistAPI: playlistAPI['explicit'] = False
return Convertable({
'type': 'spotify_playlist',
'id': link_id,
'bitrate': bitrate,
'title': spotifyPlaylist['name'],
'artist': spotifyPlaylist['owner']['display_name'],
'cover': playlistAPI['picture_thumbnail'],
'explicit': playlistAPI['explicit'],
'size': len(tracklist),
'collection': {
'tracks_gw': [],
'playlistAPI': playlistAPI
'plugin': 'spotify',
'conversion_data': tracklist
def getTrack(self, track_id, spotifyTrack=None):
if not self.enabled: raise Exception("Spotify plugin not enabled")
cachedTrack = {
'isrc': None,
'data': None
if not spotifyTrack:
spotifyTrack = self.sp.track(track_id)
if 'isrc' in spotifyTrack.get('external_ids', {}):
cachedTrack['isrc'] = spotifyTrack['external_ids']['isrc']
cachedTrack['data'] = {
'title': spotifyTrack['name'],
'artist': spotifyTrack['artists'][0]['name'],
'album': spotifyTrack['album']['name']
return cachedTrack
def getAlbum(self, album_id, spotifyAlbum=None):
if not self.enabled: raise Exception("Spotify plugin not enabled")
cachedAlbum = {
'upc': None,
'data': None
if not spotifyAlbum:
spotifyAlbum = self.sp.album(album_id)
if 'upc' in spotifyAlbum.get('external_ids', {}):
cachedAlbum['upc'] = spotifyAlbum['external_ids']['upc']
cachedAlbum['data'] = {
'title': spotifyAlbum['name'],
'artist': spotifyAlbum['artists'][0]['name']
return cachedAlbum
def convertTrack(self, dz, downloadObject, track, pos, conversion, conversionNext, cache, listener):
if downloadObject.isCanceled: return
if track['id'] in cache['tracks']:
cachedTrack = cache['tracks'][track['id']]
cachedTrack = self.getTrack(track['id'], track)
cache['tracks'][track['id']] = cachedTrack
if 'isrc' in cachedTrack:
trackAPI = dz.api.get_track_by_ISRC(cachedTrack['isrc'])
if 'id' not in trackAPI or 'title' not in trackAPI: trackAPI = None
except GenerationError: pass
if self.settings['fallbackSearch'] and not trackAPI:
if 'id' not in cachedTrack or cachedTrack['id'] == "0":
trackID = dz.api.get_track_id_from_metadata(
if trackID != "0":
cachedTrack['id'] = trackID
cache['tracks'][track['id']] = cachedTrack
if cachedTrack['id'] != "0": trackAPI = dz.api.get_track(cachedTrack['id'])
if not trackAPI:
deezerTrack = {
'SNG_ID': "0",
'SNG_TITLE': track['name'],
'MD5_ORIGIN': 0,
'ALB_TITLE': track['album']['name'],
'ART_ID': 0,
'ART_NAME': track['artists'][0]['name']
deezerTrack = dz.gw.get_track_with_fallback(trackAPI['id'])
deezerTrack['_EXTRA_TRACK'] = trackAPI
deezerTrack['POSITION'] = pos+1
conversionNext += (1 / downloadObject.size) * 100
if round(conversionNext) != conversion and round(conversionNext) % 2 == 0:
conversion = round(conversionNext)
if listener: listener.send("updateQueue", {'uuid': downloadObject.uuid, 'conversion': conversion})
def convert(self, dz, downloadObject, settings, listener=None):
cache = self.loadCache()
conversion = 0
conversionNext = 0
collection = [None] * len(downloadObject.conversion_data)
with ThreadPoolExecutor(settings['queueConcurrency']) as executor:
for pos, track in enumerate(downloadObject.conversion_data, start=0):
collection[pos] = executor.submit(self.convertTrack,
dz, downloadObject,
track, pos,
conversion, conversionNext,
cache, listener
def _convertPlaylistStructure(cls, spotifyPlaylist):
cover = None
if len(spotifyPlaylist['images']): cover = spotifyPlaylist['images'][0]['url']
deezerPlaylist = {
'checksum': spotifyPlaylist['snapshot_id'],
'collaborative': spotifyPlaylist['collaborative'],
'creation_date': "XXXX-00-00",
'creator': {
'id': spotifyPlaylist['owner']['id'],
'name': spotifyPlaylist['owner']['display_name'],
'tracklist': spotifyPlaylist['owner']['href'],
'type': "user"
'description': spotifyPlaylist['description'],
'duration': 0,
'fans': spotifyPlaylist['followers']['total'] if 'followers' in spotifyPlaylist else 0,
'id': spotifyPlaylist['id'],
'is_loved_track': False,
'link': spotifyPlaylist['external_urls']['spotify'],
'nb_tracks': spotifyPlaylist['tracks']['total'],
'picture': cover,
'picture_small': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/56x56-000000-80-0-0.jpg",
'picture_medium': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/250x250-000000-80-0-0.jpg",
'picture_big': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/500x500-000000-80-0-0.jpg",
'picture_xl': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/1000x1000-000000-80-0-0.jpg",
'public': spotifyPlaylist['public'],
'share': spotifyPlaylist['external_urls']['spotify'],
'title': spotifyPlaylist['name'],
'tracklist': spotifyPlaylist['tracks']['href'],
'type': "playlist"
return deezerPlaylist
def loadSettings(self):
if not (self.configFolder / 'settings.json').is_file():
with open(self.configFolder / 'settings.json', 'w') as f:
json.dump({**self.credentials, **self.settings}, f, indent=2)
with open(self.configFolder / 'settings.json', 'r') as settingsFile:
settings = json.load(settingsFile)
def saveSettings(self, newSettings=None):
if newSettings: self.setSettings(newSettings)
with open(self.configFolder / 'settings.json', 'w') as f:
json.dump({**self.credentials, **self.settings}, f, indent=2)
def getSettings(self):
return {**self.credentials, **self.settings}
def setSettings(self, newSettings):
self.credentials = { 'clientId': newSettings['clientId'], 'clientSecret': newSettings['clientSecret'] }
settings = {**newSettings}
del settings['clientId']
del settings['clientSecret']
self.settings = settings
def loadCache(self):
if (self.configFolder / 'cache.json').is_file():
with open(self.configFolder / 'cache.json', 'r') as f:
cache = json.load(f)
cache = {'tracks': {}, 'albums': {}}
return cache
def saveCache(self, newCache):
with open(self.configFolder / 'cache.json', 'w') as spotifyCache:
json.dump(newCache, spotifyCache)
def checkCredentials(self):
if self.credentials['clientId'] == "" or self.credentials['clientSecret'] == "":
self.enabled = False
client_credentials_manager = SpotifyClientCredentials(client_id=self.credentials['clientId'],
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
self.enabled = True
except Exception:
self.enabled = False
def getCredentials(self):
return self.credentials
def setCredentials(self, clientId, clientSecret):
# Remove extra spaces, just to be sure
clientId = clientId.strip()
clientSecret = clientSecret.strip()
# Save them to disk
self.credentials = { 'clientId': clientId, 'clientSecret': clientSecret}