mirror of
https://gitlab.com/RemixDev/deemix-py.git
synced 2025-01-01 12:46:11 +00:00
f530a4e89f
Removed saveDownloadQueue and tagsLanguage from lib settings Revert embedded cover change Fixed bitrate fallback check Use overwriteFile setting when downloading embedded covers Fixed bitrate fallback not working Fixed some issues to make the lib work Implemented spotify plugin back Better handling of albums upcs Fixed queue item not cancelling correctly Code parity with deemix-js Code cleanup with pylint Even more rework on the library More work on the library (WIP) Total rework of the library (WIP) Some rework done on types Added start queue function Made nextitem work on a thread Removed dz as first parameter Started queuemanager refactoring Removed eventlet Co-authored-by: RemixDev <RemixDev64@gmail.com> Reviewed-on: https://git.freezer.life/RemixDev/deemix-py/pulls/4 Co-Authored-By: RemixDev <remixdev@noreply.localhost> Co-Committed-By: RemixDev <remixdev@noreply.localhost>
347 lines
14 KiB
Python
347 lines
14 KiB
Python
from concurrent.futures import ThreadPoolExecutor
|
|
import json
|
|
from pathlib import Path
|
|
import re
|
|
from urllib.request import urlopen
|
|
from deemix.plugins import Plugin
|
|
from deemix.utils.localpaths import getConfigFolder
|
|
from deemix.itemgen import generateTrackItem, generateAlbumItem, GenerationError, TrackNotOnDeezer, AlbumNotOnDeezer
|
|
from deemix.types.DownloadObjects import Convertable
|
|
|
|
import spotipy
|
|
SpotifyClientCredentials = spotipy.oauth2.SpotifyClientCredentials
|
|
|
|
class Spotify(Plugin):
|
|
def __init__(self, configFolder=None):
|
|
super().__init__()
|
|
self.credentials = {'clientId': "", 'clientSecret': ""}
|
|
self.settings = {
|
|
'fallbackSearch': False
|
|
}
|
|
self.enabled = False
|
|
self.sp = None
|
|
self.configFolder = Path(configFolder or getConfigFolder())
|
|
self.configFolder /= 'spotify'
|
|
|
|
def setup(self):
|
|
if not self.configFolder.is_dir(): self.configFolder.mkdir()
|
|
|
|
self.loadSettings()
|
|
return self
|
|
|
|
@classmethod
|
|
def parseLink(cls, link):
|
|
if 'link.tospotify.com' in link: link = urlopen(link).url
|
|
# Remove extra stuff
|
|
if '?' in link: link = link[:link.find('?')]
|
|
if '&' in link: link = link[:link.find('&')]
|
|
if link.endswith('/'): link = link[:-1] # Remove last slash if present
|
|
|
|
link_type = None
|
|
link_id = None
|
|
|
|
if not 'spotify' in link: return (link, link_type, link_id) # return if not a spotify link
|
|
|
|
if re.search(r"[/:]track[/:](.+)", link):
|
|
link_type = 'track'
|
|
link_id = re.search(r"[/:]track[/:](.+)", link).group(1)
|
|
elif re.search(r"[/:]album[/:](.+)", link):
|
|
link_type = 'album'
|
|
link_id = re.search(r"[/:]album[/:](.+)", link).group(1)
|
|
elif re.search(r"[/:]playlist[/:](.+)", link):
|
|
link_type = 'playlist'
|
|
link_id = re.search(r"[/:]playlist[/:](.+)", link).group(1)
|
|
|
|
return (link, link_type, link_id)
|
|
|
|
def generateDownloadObject(self, dz, link, bitrate):
|
|
(link, link_type, link_id) = self.parseLink(link)
|
|
|
|
if link_type is None or link_id is None: return None
|
|
|
|
if link_type == "track":
|
|
return self.generateTrackItem(dz, link_id, bitrate)
|
|
if link_type == "album":
|
|
return self.generateAlbumItem(dz, link_id, bitrate)
|
|
if link_type == "playlist":
|
|
return self.generatePlaylistItem(dz, link_id, bitrate)
|
|
return None
|
|
|
|
def generateTrackItem(self, dz, link_id, bitrate):
|
|
cache = self.loadCache()
|
|
|
|
if link_id in cache['tracks']:
|
|
cachedTrack = cache['tracks'][link_id]
|
|
else:
|
|
cachedTrack = self.getTrack(link_id)
|
|
cache['tracks'][link_id] = cachedTrack
|
|
self.saveCache(cache)
|
|
|
|
if 'isrc' in cachedTrack:
|
|
try: return generateTrackItem(dz, f"isrc:{cachedTrack['isrc']}", bitrate)
|
|
except GenerationError: pass
|
|
if self.settings['fallbackSearch']:
|
|
if 'id' not in cachedTrack or cachedTrack['id'] == "0":
|
|
trackID = dz.api.get_track_id_from_metadata(
|
|
cachedTrack['data']['artist'],
|
|
cachedTrack['data']['title'],
|
|
cachedTrack['data']['album'],
|
|
)
|
|
if trackID != "0":
|
|
cachedTrack['id'] = trackID
|
|
cache['tracks'][link_id] = cachedTrack
|
|
self.saveCache(cache)
|
|
if cachedTrack['id'] != "0": return generateTrackItem(dz, cachedTrack['id'], bitrate)
|
|
raise TrackNotOnDeezer(f"https://open.spotify.com/track/{link_id}")
|
|
|
|
def generateAlbumItem(self, dz, link_id, bitrate):
|
|
cache = self.loadCache()
|
|
|
|
if link_id in cache['albums']:
|
|
cachedAlbum = cache['albums'][link_id]
|
|
else:
|
|
cachedAlbum = self.getAlbum(link_id)
|
|
cache['albums'][link_id] = cachedAlbum
|
|
self.saveCache(cache)
|
|
|
|
try: return generateAlbumItem(dz, f"upc:{cachedAlbum['upc']}", bitrate)
|
|
except GenerationError as e: raise AlbumNotOnDeezer(f"https://open.spotify.com/album/{link_id}") from e
|
|
|
|
def generatePlaylistItem(self, dz, link_id, bitrate):
|
|
if not self.enabled: raise Exception("Spotify plugin not enabled")
|
|
spotifyPlaylist = self.sp.playlist(link_id)
|
|
|
|
playlistAPI = self._convertPlaylistStructure(spotifyPlaylist)
|
|
playlistAPI.various_artist = dz.api.get_artist(5080) # Useful for save as compilation
|
|
|
|
tracklistTemp = spotifyPlaylist.track.items
|
|
while spotifyPlaylist['tracks']['next']:
|
|
spotifyPlaylist['tracks'] = self.sp.next(spotifyPlaylist['tracks'])
|
|
tracklistTemp += spotifyPlaylist['tracks']['items']
|
|
|
|
tracklist = []
|
|
for item in tracklistTemp:
|
|
if item['track']:
|
|
if item['track']['explicit']:
|
|
playlistAPI['explicit'] = True
|
|
tracklist.append(item['track'])
|
|
if 'explicit' not in playlistAPI: playlistAPI['explicit'] = False
|
|
|
|
return Convertable({
|
|
'type': 'spotify_playlist',
|
|
'id': link_id,
|
|
'bitrate': bitrate,
|
|
'title': spotifyPlaylist['name'],
|
|
'artist': spotifyPlaylist['owner']['display_name'],
|
|
'cover': playlistAPI['picture_thumbnail'],
|
|
'explicit': playlistAPI['explicit'],
|
|
'size': len(tracklist),
|
|
'collection': {
|
|
'tracks_gw': [],
|
|
'playlistAPI': playlistAPI
|
|
},
|
|
'plugin': 'spotify',
|
|
'conversion_data': tracklist
|
|
})
|
|
|
|
def getTrack(self, track_id, spotifyTrack=None):
|
|
if not self.enabled: raise Exception("Spotify plugin not enabled")
|
|
cachedTrack = {
|
|
'isrc': None,
|
|
'data': None
|
|
}
|
|
|
|
if not spotifyTrack:
|
|
spotifyTrack = self.sp.track(track_id)
|
|
if 'isrc' in spotifyTrack.get('external_ids', {}):
|
|
cachedTrack['isrc'] = spotifyTrack['external_ids']['isrc']
|
|
cachedTrack['data'] = {
|
|
'title': spotifyTrack['name'],
|
|
'artist': spotifyTrack['artists'][0]['name'],
|
|
'album': spotifyTrack['album']['name']
|
|
}
|
|
return cachedTrack
|
|
|
|
def getAlbum(self, album_id, spotifyAlbum=None):
|
|
if not self.enabled: raise Exception("Spotify plugin not enabled")
|
|
cachedAlbum = {
|
|
'upc': None,
|
|
'data': None
|
|
}
|
|
|
|
if not spotifyAlbum:
|
|
spotifyAlbum = self.sp.album(album_id)
|
|
if 'upc' in spotifyAlbum.get('external_ids', {}):
|
|
cachedAlbum['upc'] = spotifyAlbum['external_ids']['upc']
|
|
cachedAlbum['data'] = {
|
|
'title': spotifyAlbum['name'],
|
|
'artist': spotifyAlbum['artists'][0]['name']
|
|
}
|
|
return cachedAlbum
|
|
|
|
def convertTrack(self, dz, downloadObject, track, pos, conversion, conversionNext, cache, listener):
|
|
if downloadObject.isCanceled: return
|
|
|
|
if track['id'] in cache['tracks']:
|
|
cachedTrack = cache['tracks'][track['id']]
|
|
else:
|
|
cachedTrack = self.getTrack(track['id'], track)
|
|
cache['tracks'][track['id']] = cachedTrack
|
|
self.saveCache(cache)
|
|
|
|
if 'isrc' in cachedTrack:
|
|
try:
|
|
trackAPI = dz.api.get_track_by_ISRC(cachedTrack['isrc'])
|
|
if 'id' not in trackAPI or 'title' not in trackAPI: trackAPI = None
|
|
except GenerationError: pass
|
|
if self.settings['fallbackSearch'] and not trackAPI:
|
|
if 'id' not in cachedTrack or cachedTrack['id'] == "0":
|
|
trackID = dz.api.get_track_id_from_metadata(
|
|
cachedTrack['data']['artist'],
|
|
cachedTrack['data']['title'],
|
|
cachedTrack['data']['album'],
|
|
)
|
|
if trackID != "0":
|
|
cachedTrack['id'] = trackID
|
|
cache['tracks'][track['id']] = cachedTrack
|
|
self.saveCache(cache)
|
|
if cachedTrack['id'] != "0": trackAPI = dz.api.get_track(cachedTrack['id'])
|
|
|
|
if not trackAPI:
|
|
deezerTrack = {
|
|
'SNG_ID': "0",
|
|
'SNG_TITLE': track['name'],
|
|
'DURATION': 0,
|
|
'MD5_ORIGIN': 0,
|
|
'MEDIA_VERSION': 0,
|
|
'FILESIZE': 0,
|
|
'ALB_TITLE': track['album']['name'],
|
|
'ALB_PICTURE': "",
|
|
'ART_ID': 0,
|
|
'ART_NAME': track['artists'][0]['name']
|
|
}
|
|
else:
|
|
deezerTrack = dz.gw.get_track_with_fallback(trackAPI['id'])
|
|
deezerTrack['_EXTRA_TRACK'] = trackAPI
|
|
deezerTrack['POSITION'] = pos+1
|
|
|
|
conversionNext += (1 / downloadObject.size) * 100
|
|
if round(conversionNext) != conversion and round(conversionNext) % 2 == 0:
|
|
conversion = round(conversionNext)
|
|
if listener: listener.send("updateQueue", {'uuid': downloadObject.uuid, 'conversion': conversion})
|
|
|
|
def convert(self, dz, downloadObject, settings, listener=None):
|
|
cache = self.loadCache()
|
|
|
|
conversion = 0
|
|
conversionNext = 0
|
|
|
|
collection = [None] * len(downloadObject.conversion_data)
|
|
with ThreadPoolExecutor(settings['queueConcurrency']) as executor:
|
|
for pos, track in enumerate(downloadObject.conversion_data, start=0):
|
|
collection[pos] = executor.submit(self.convertTrack,
|
|
dz, downloadObject,
|
|
track, pos,
|
|
conversion, conversionNext,
|
|
cache, listener
|
|
)
|
|
|
|
@classmethod
|
|
def _convertPlaylistStructure(cls, spotifyPlaylist):
|
|
cover = None
|
|
if len(spotifyPlaylist['images']): cover = spotifyPlaylist['images'][0]['url']
|
|
|
|
deezerPlaylist = {
|
|
'checksum': spotifyPlaylist['snapshot_id'],
|
|
'collaborative': spotifyPlaylist['collaborative'],
|
|
'creation_date': "XXXX-00-00",
|
|
'creator': {
|
|
'id': spotifyPlaylist['owner']['id'],
|
|
'name': spotifyPlaylist['owner']['display_name'],
|
|
'tracklist': spotifyPlaylist['owner']['href'],
|
|
'type': "user"
|
|
},
|
|
'description': spotifyPlaylist['description'],
|
|
'duration': 0,
|
|
'fans': spotifyPlaylist['followers']['total'] if 'followers' in spotifyPlaylist else 0,
|
|
'id': spotifyPlaylist['id'],
|
|
'is_loved_track': False,
|
|
'link': spotifyPlaylist['external_urls']['spotify'],
|
|
'nb_tracks': spotifyPlaylist['tracks']['total'],
|
|
'picture': cover,
|
|
'picture_small': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/56x56-000000-80-0-0.jpg",
|
|
'picture_medium': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/250x250-000000-80-0-0.jpg",
|
|
'picture_big': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/500x500-000000-80-0-0.jpg",
|
|
'picture_xl': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/1000x1000-000000-80-0-0.jpg",
|
|
'public': spotifyPlaylist['public'],
|
|
'share': spotifyPlaylist['external_urls']['spotify'],
|
|
'title': spotifyPlaylist['name'],
|
|
'tracklist': spotifyPlaylist['tracks']['href'],
|
|
'type': "playlist"
|
|
}
|
|
return deezerPlaylist
|
|
|
|
def loadSettings(self):
|
|
if not (self.configFolder / 'settings.json').is_file():
|
|
with open(self.configFolder / 'settings.json', 'w') as f:
|
|
json.dump({**self.credentials, **self.settings}, f, indent=2)
|
|
|
|
with open(self.configFolder / 'settings.json', 'r') as settingsFile:
|
|
settings = json.load(settingsFile)
|
|
self.setSettings(settings)
|
|
self.checkCredentials()
|
|
|
|
def saveSettings(self, newSettings=None):
|
|
if newSettings: self.setSettings(newSettings)
|
|
self.checkCredentials()
|
|
with open(self.configFolder / 'settings.json', 'w') as f:
|
|
json.dump({**self.credentials, **self.settings}, f, indent=2)
|
|
|
|
def getSettings(self):
|
|
return {**self.credentials, **self.settings}
|
|
|
|
def setSettings(self, newSettings):
|
|
self.credentials = { 'clientId': newSettings['clientId'], 'clientSecret': newSettings['clientSecret'] }
|
|
settings = {**newSettings}
|
|
del settings['clientId']
|
|
del settings['clientSecret']
|
|
self.settings = settings
|
|
|
|
def loadCache(self):
|
|
if (self.configFolder / 'cache.json').is_file():
|
|
with open(self.configFolder / 'cache.json', 'r') as f:
|
|
cache = json.load(f)
|
|
else:
|
|
cache = {'tracks': {}, 'albums': {}}
|
|
return cache
|
|
|
|
def saveCache(self, newCache):
|
|
with open(self.configFolder / 'cache.json', 'w') as spotifyCache:
|
|
json.dump(newCache, spotifyCache)
|
|
|
|
def checkCredentials(self):
|
|
if self.credentials['clientId'] == "" or self.credentials['clientSecret'] == "":
|
|
self.enabled = False
|
|
return
|
|
|
|
try:
|
|
client_credentials_manager = SpotifyClientCredentials(client_id=self.credentials['clientId'],
|
|
client_secret=self.credentials['clientSecret'])
|
|
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
|
|
self.sp.user_playlists('spotify')
|
|
self.enabled = True
|
|
except Exception:
|
|
self.enabled = False
|
|
|
|
def getCredentials(self):
|
|
return self.credentials
|
|
|
|
def setCredentials(self, clientId, clientSecret):
|
|
# Remove extra spaces, just to be sure
|
|
clientId = clientId.strip()
|
|
clientSecret = clientSecret.strip()
|
|
|
|
# Save them to disk
|
|
self.credentials = { 'clientId': clientId, 'clientSecret': clientSecret}
|
|
self.saveSettings()
|