deemix-py/deemix/plugins/spotify.py
RemixDev f530a4e89f Merge refactoring (#4)
Removed saveDownloadQueue and tagsLanguage from lib settings

Revert embedded cover change

Fixed bitrate fallback check

Use overwriteFile setting when downloading embedded covers

Fixed bitrate fallback not working

Fixed some issues to make the lib work

Implemented spotify plugin back

Better handling of albums upcs

Fixed queue item not cancelling correctly

Code parity with deemix-js

Code cleanup with pylint

Even more rework on the library

More work on the library (WIP)

Total rework of the library (WIP)

Some rework done on types

Added start queue function

Made nextitem work on a thread

Removed dz as first parameter

Started queuemanager refactoring

Removed eventlet

Co-authored-by: RemixDev <RemixDev64@gmail.com>
Reviewed-on: https://git.freezer.life/RemixDev/deemix-py/pulls/4
Co-Authored-By: RemixDev <remixdev@noreply.localhost>
Co-Committed-By: RemixDev <remixdev@noreply.localhost>
2021-06-27 16:29:41 -04:00

347 lines
14 KiB
Python

from concurrent.futures import ThreadPoolExecutor
import json
from pathlib import Path
import re
from urllib.request import urlopen
from deemix.plugins import Plugin
from deemix.utils.localpaths import getConfigFolder
from deemix.itemgen import generateTrackItem, generateAlbumItem, GenerationError, TrackNotOnDeezer, AlbumNotOnDeezer
from deemix.types.DownloadObjects import Convertable
import spotipy
SpotifyClientCredentials = spotipy.oauth2.SpotifyClientCredentials
class Spotify(Plugin):
def __init__(self, configFolder=None):
super().__init__()
self.credentials = {'clientId': "", 'clientSecret': ""}
self.settings = {
'fallbackSearch': False
}
self.enabled = False
self.sp = None
self.configFolder = Path(configFolder or getConfigFolder())
self.configFolder /= 'spotify'
def setup(self):
if not self.configFolder.is_dir(): self.configFolder.mkdir()
self.loadSettings()
return self
@classmethod
def parseLink(cls, link):
if 'link.tospotify.com' in link: link = urlopen(link).url
# Remove extra stuff
if '?' in link: link = link[:link.find('?')]
if '&' in link: link = link[:link.find('&')]
if link.endswith('/'): link = link[:-1] # Remove last slash if present
link_type = None
link_id = None
if not 'spotify' in link: return (link, link_type, link_id) # return if not a spotify link
if re.search(r"[/:]track[/:](.+)", link):
link_type = 'track'
link_id = re.search(r"[/:]track[/:](.+)", link).group(1)
elif re.search(r"[/:]album[/:](.+)", link):
link_type = 'album'
link_id = re.search(r"[/:]album[/:](.+)", link).group(1)
elif re.search(r"[/:]playlist[/:](.+)", link):
link_type = 'playlist'
link_id = re.search(r"[/:]playlist[/:](.+)", link).group(1)
return (link, link_type, link_id)
def generateDownloadObject(self, dz, link, bitrate):
(link, link_type, link_id) = self.parseLink(link)
if link_type is None or link_id is None: return None
if link_type == "track":
return self.generateTrackItem(dz, link_id, bitrate)
if link_type == "album":
return self.generateAlbumItem(dz, link_id, bitrate)
if link_type == "playlist":
return self.generatePlaylistItem(dz, link_id, bitrate)
return None
def generateTrackItem(self, dz, link_id, bitrate):
cache = self.loadCache()
if link_id in cache['tracks']:
cachedTrack = cache['tracks'][link_id]
else:
cachedTrack = self.getTrack(link_id)
cache['tracks'][link_id] = cachedTrack
self.saveCache(cache)
if 'isrc' in cachedTrack:
try: return generateTrackItem(dz, f"isrc:{cachedTrack['isrc']}", bitrate)
except GenerationError: pass
if self.settings['fallbackSearch']:
if 'id' not in cachedTrack or cachedTrack['id'] == "0":
trackID = dz.api.get_track_id_from_metadata(
cachedTrack['data']['artist'],
cachedTrack['data']['title'],
cachedTrack['data']['album'],
)
if trackID != "0":
cachedTrack['id'] = trackID
cache['tracks'][link_id] = cachedTrack
self.saveCache(cache)
if cachedTrack['id'] != "0": return generateTrackItem(dz, cachedTrack['id'], bitrate)
raise TrackNotOnDeezer(f"https://open.spotify.com/track/{link_id}")
def generateAlbumItem(self, dz, link_id, bitrate):
cache = self.loadCache()
if link_id in cache['albums']:
cachedAlbum = cache['albums'][link_id]
else:
cachedAlbum = self.getAlbum(link_id)
cache['albums'][link_id] = cachedAlbum
self.saveCache(cache)
try: return generateAlbumItem(dz, f"upc:{cachedAlbum['upc']}", bitrate)
except GenerationError as e: raise AlbumNotOnDeezer(f"https://open.spotify.com/album/{link_id}") from e
def generatePlaylistItem(self, dz, link_id, bitrate):
if not self.enabled: raise Exception("Spotify plugin not enabled")
spotifyPlaylist = self.sp.playlist(link_id)
playlistAPI = self._convertPlaylistStructure(spotifyPlaylist)
playlistAPI.various_artist = dz.api.get_artist(5080) # Useful for save as compilation
tracklistTemp = spotifyPlaylist.track.items
while spotifyPlaylist['tracks']['next']:
spotifyPlaylist['tracks'] = self.sp.next(spotifyPlaylist['tracks'])
tracklistTemp += spotifyPlaylist['tracks']['items']
tracklist = []
for item in tracklistTemp:
if item['track']:
if item['track']['explicit']:
playlistAPI['explicit'] = True
tracklist.append(item['track'])
if 'explicit' not in playlistAPI: playlistAPI['explicit'] = False
return Convertable({
'type': 'spotify_playlist',
'id': link_id,
'bitrate': bitrate,
'title': spotifyPlaylist['name'],
'artist': spotifyPlaylist['owner']['display_name'],
'cover': playlistAPI['picture_thumbnail'],
'explicit': playlistAPI['explicit'],
'size': len(tracklist),
'collection': {
'tracks_gw': [],
'playlistAPI': playlistAPI
},
'plugin': 'spotify',
'conversion_data': tracklist
})
def getTrack(self, track_id, spotifyTrack=None):
if not self.enabled: raise Exception("Spotify plugin not enabled")
cachedTrack = {
'isrc': None,
'data': None
}
if not spotifyTrack:
spotifyTrack = self.sp.track(track_id)
if 'isrc' in spotifyTrack.get('external_ids', {}):
cachedTrack['isrc'] = spotifyTrack['external_ids']['isrc']
cachedTrack['data'] = {
'title': spotifyTrack['name'],
'artist': spotifyTrack['artists'][0]['name'],
'album': spotifyTrack['album']['name']
}
return cachedTrack
def getAlbum(self, album_id, spotifyAlbum=None):
if not self.enabled: raise Exception("Spotify plugin not enabled")
cachedAlbum = {
'upc': None,
'data': None
}
if not spotifyAlbum:
spotifyAlbum = self.sp.album(album_id)
if 'upc' in spotifyAlbum.get('external_ids', {}):
cachedAlbum['upc'] = spotifyAlbum['external_ids']['upc']
cachedAlbum['data'] = {
'title': spotifyAlbum['name'],
'artist': spotifyAlbum['artists'][0]['name']
}
return cachedAlbum
def convertTrack(self, dz, downloadObject, track, pos, conversion, conversionNext, cache, listener):
if downloadObject.isCanceled: return
if track['id'] in cache['tracks']:
cachedTrack = cache['tracks'][track['id']]
else:
cachedTrack = self.getTrack(track['id'], track)
cache['tracks'][track['id']] = cachedTrack
self.saveCache(cache)
if 'isrc' in cachedTrack:
try:
trackAPI = dz.api.get_track_by_ISRC(cachedTrack['isrc'])
if 'id' not in trackAPI or 'title' not in trackAPI: trackAPI = None
except GenerationError: pass
if self.settings['fallbackSearch'] and not trackAPI:
if 'id' not in cachedTrack or cachedTrack['id'] == "0":
trackID = dz.api.get_track_id_from_metadata(
cachedTrack['data']['artist'],
cachedTrack['data']['title'],
cachedTrack['data']['album'],
)
if trackID != "0":
cachedTrack['id'] = trackID
cache['tracks'][track['id']] = cachedTrack
self.saveCache(cache)
if cachedTrack['id'] != "0": trackAPI = dz.api.get_track(cachedTrack['id'])
if not trackAPI:
deezerTrack = {
'SNG_ID': "0",
'SNG_TITLE': track['name'],
'DURATION': 0,
'MD5_ORIGIN': 0,
'MEDIA_VERSION': 0,
'FILESIZE': 0,
'ALB_TITLE': track['album']['name'],
'ALB_PICTURE': "",
'ART_ID': 0,
'ART_NAME': track['artists'][0]['name']
}
else:
deezerTrack = dz.gw.get_track_with_fallback(trackAPI['id'])
deezerTrack['_EXTRA_TRACK'] = trackAPI
deezerTrack['POSITION'] = pos+1
conversionNext += (1 / downloadObject.size) * 100
if round(conversionNext) != conversion and round(conversionNext) % 2 == 0:
conversion = round(conversionNext)
if listener: listener.send("updateQueue", {'uuid': downloadObject.uuid, 'conversion': conversion})
def convert(self, dz, downloadObject, settings, listener=None):
cache = self.loadCache()
conversion = 0
conversionNext = 0
collection = [None] * len(downloadObject.conversion_data)
with ThreadPoolExecutor(settings['queueConcurrency']) as executor:
for pos, track in enumerate(downloadObject.conversion_data, start=0):
collection[pos] = executor.submit(self.convertTrack,
dz, downloadObject,
track, pos,
conversion, conversionNext,
cache, listener
)
@classmethod
def _convertPlaylistStructure(cls, spotifyPlaylist):
cover = None
if len(spotifyPlaylist['images']): cover = spotifyPlaylist['images'][0]['url']
deezerPlaylist = {
'checksum': spotifyPlaylist['snapshot_id'],
'collaborative': spotifyPlaylist['collaborative'],
'creation_date': "XXXX-00-00",
'creator': {
'id': spotifyPlaylist['owner']['id'],
'name': spotifyPlaylist['owner']['display_name'],
'tracklist': spotifyPlaylist['owner']['href'],
'type': "user"
},
'description': spotifyPlaylist['description'],
'duration': 0,
'fans': spotifyPlaylist['followers']['total'] if 'followers' in spotifyPlaylist else 0,
'id': spotifyPlaylist['id'],
'is_loved_track': False,
'link': spotifyPlaylist['external_urls']['spotify'],
'nb_tracks': spotifyPlaylist['tracks']['total'],
'picture': cover,
'picture_small': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/56x56-000000-80-0-0.jpg",
'picture_medium': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/250x250-000000-80-0-0.jpg",
'picture_big': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/500x500-000000-80-0-0.jpg",
'picture_xl': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/1000x1000-000000-80-0-0.jpg",
'public': spotifyPlaylist['public'],
'share': spotifyPlaylist['external_urls']['spotify'],
'title': spotifyPlaylist['name'],
'tracklist': spotifyPlaylist['tracks']['href'],
'type': "playlist"
}
return deezerPlaylist
def loadSettings(self):
if not (self.configFolder / 'settings.json').is_file():
with open(self.configFolder / 'settings.json', 'w') as f:
json.dump({**self.credentials, **self.settings}, f, indent=2)
with open(self.configFolder / 'settings.json', 'r') as settingsFile:
settings = json.load(settingsFile)
self.setSettings(settings)
self.checkCredentials()
def saveSettings(self, newSettings=None):
if newSettings: self.setSettings(newSettings)
self.checkCredentials()
with open(self.configFolder / 'settings.json', 'w') as f:
json.dump({**self.credentials, **self.settings}, f, indent=2)
def getSettings(self):
return {**self.credentials, **self.settings}
def setSettings(self, newSettings):
self.credentials = { 'clientId': newSettings['clientId'], 'clientSecret': newSettings['clientSecret'] }
settings = {**newSettings}
del settings['clientId']
del settings['clientSecret']
self.settings = settings
def loadCache(self):
if (self.configFolder / 'cache.json').is_file():
with open(self.configFolder / 'cache.json', 'r') as f:
cache = json.load(f)
else:
cache = {'tracks': {}, 'albums': {}}
return cache
def saveCache(self, newCache):
with open(self.configFolder / 'cache.json', 'w') as spotifyCache:
json.dump(newCache, spotifyCache)
def checkCredentials(self):
if self.credentials['clientId'] == "" or self.credentials['clientSecret'] == "":
self.enabled = False
return
try:
client_credentials_manager = SpotifyClientCredentials(client_id=self.credentials['clientId'],
client_secret=self.credentials['clientSecret'])
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
self.sp.user_playlists('spotify')
self.enabled = True
except Exception:
self.enabled = False
def getCredentials(self):
return self.credentials
def setCredentials(self, clientId, clientSecret):
# Remove extra spaces, just to be sure
clientId = clientId.strip()
clientSecret = clientSecret.strip()
# Save them to disk
self.credentials = { 'clientId': clientId, 'clientSecret': clientSecret}
self.saveSettings()