deezer-py/deezer/gw.py

382 lines
13 KiB
Python

import requests
from time import sleep
import json
from deezer.utils import map_artist_album, map_user_track, map_user_artist, map_user_album, map_user_playlist
from deezer.errors import GWAPIError
class PlaylistStatus():
PUBLIC = 0
PRIVATE = 1
COLLABORATIVE = 2
EMPTY_TRACK_OBJ = {
'SNG_ID': 0,
'SNG_TITLE': '',
'DURATION': 0,
'MD5_ORIGIN': 0,
'MEDIA_VERSION': 0,
'FILESIZE': 0,
'ALB_TITLE': "",
'ALB_PICTURE': "",
'ART_ID': 0,
'ART_NAME': ""
}
class GW:
def __init__(self, session, headers):
self.http_headers = headers
self.session = session
self.api_token = None
def api_call(self, method, args=None, params=None):
if args is None: args = {}
if params is None: params = {}
if not self.api_token and method != 'deezer.getUserData': self.api_token = self._get_token()
p = {'api_version': "1.0",
'api_token': 'null' if method == 'deezer.getUserData' else self.api_token,
'input': '3',
'method': method}
p.update(params)
try:
result_json = self.session.post(
"http://www.deezer.com/ajax/gw-light.php",
params=p,
timeout=30,
json=args,
headers=self.http_headers
).json()
except (requests.ConnectionError, requests.Timeout):
sleep(2)
return self.api_call(method, args, params)
if len(result_json['error']):
if (
result_json['error'] == {"GATEWAY_ERROR": "invalid api token"} or
result_json['error'] == {"VALID_TOKEN_REQUIRED": "Invalid CSRF token"}
):
self.api_token = self._get_token()
return self.api_call(method, args, params)
if result_json.get('payload', {}) and result_json['payload'].get('FALLBACK', {}):
for key in result_json['payload']['FALLBACK'].keys():
args[key] = result_json['payload']['FALLBACK'][key]
return self.api_call(method, args, params)
raise GWAPIError(json.dumps(result_json['error']))
if not self.api_token and method == 'deezer.getUserData': self.api_token = result_json['results']['checkForm']
return result_json['results']
def _get_token(self):
token_data = self.get_user_data()
return token_data['checkForm']
def get_user_data(self):
return self.api_call('deezer.getUserData')
def get_user_profile_page(self, user_id, tab, limit=10):
return self.api_call('deezer.pageProfile', {'USER_ID': user_id, 'tab': tab, 'nb': limit})
def get_user_favorite_ids(self, checksum = None, limit = 10000, start = 0):
return self.api_call('song.getFavoriteIds', {'nb': limit, 'start': start, 'checksum': checksum})
def get_child_accounts(self):
return self.api_call('deezer.getChildAccounts')
def get_track(self, sng_id):
return self.api_call('song.getData', {'SNG_ID': sng_id})
def get_track_page(self, sng_id):
return self.api_call('deezer.pageTrack', {'SNG_ID': sng_id})
def get_track_lyrics(self, sng_id):
return self.api_call('song.getLyrics', {'SNG_ID': sng_id})
def get_tracks(self, sng_ids):
tracks_array = []
body = self.api_call('song.getListData', {'SNG_IDS': sng_ids})
errors = 0
for i in range(len(sng_ids)):
if sng_ids[i] != 0:
tracks_array.append(body['data'][i - errors])
else:
errors += 1
tracks_array.append(EMPTY_TRACK_OBJ)
return tracks_array
def get_album(self, alb_id):
return self.api_call('album.getData', {'ALB_ID': alb_id})
def get_album_page(self, alb_id):
return self.api_call('deezer.pageAlbum', {
'ALB_ID': alb_id,
'lang': 'en',
'header': True,
'tab': 0
})
def get_album_tracks(self, alb_id):
tracks_array = []
body = self.api_call('song.getListByAlbum', {'ALB_ID': alb_id, 'nb': -1})
for track in body['data']:
track['POSITION'] = body['data'].index(track)
tracks_array.append(track)
return tracks_array
def get_artist(self, art_id):
return self.api_call('artist.getData', {'ART_ID': art_id})
def get_artist_page(self, art_id):
return self.api_call('deezer.pageArtist', {
'ART_ID': art_id,
'lang': 'en',
'header': True,
'tab': 0
})
def get_artist_top_tracks(self, art_id, limit=100):
tracks_array = []
body = self.api_call('artist.getTopTrack', {'ART_ID': art_id, 'nb': limit})
for track in body['data']:
track['POSITION'] = body['data'].index(track)
tracks_array.append(track)
return tracks_array
def get_artist_discography(self, art_id, index=0, limit=25):
return self.api_call('album.getDiscography', {
'ART_ID': art_id,
"discography_mode":"all",
'nb': limit,
'nb_songs': 0,
'start': index
})
def get_playlist(self, playlist_id):
return self.get_playlist_page(playlist_id)
def get_playlist_page(self, playlist_id):
return self.api_call('deezer.pagePlaylist', {
'PLAYLIST_ID': playlist_id,
'lang': 'en',
'header': True,
'tab': 0
})
def get_playlist_tracks(self, playlist_id):
tracks_array = []
body = self.api_call('playlist.getSongs', {'PLAYLIST_ID': playlist_id, 'nb': -1})
for track in body['data']:
track['POSITION'] = body['data'].index(track)
tracks_array.append(track)
return tracks_array
def create_playlist(self, title, status=PlaylistStatus.PUBLIC, description=None, songs=[]):
newSongs = []
for song in songs:
newSongs.append([song, 0])
return self.api_call('playlist.create', {
'title': title,
'status': status,
'description': description,
'songs': newSongs
})
def edit_playlist(self, playlist_id, title, status=None, description=None, songs=[]):
newSongs = []
for song in songs:
newSongs.append([song, 0])
return self.api_call('playlist.update', {
'PLAYLIST_ID': playlist_id,
'title': title,
'status': status,
'description': description,
'songs': newSongs
})
def add_songs_to_playlist(self, playlist_id, songs, offset=-1):
newSongs = []
for song in songs:
newSongs.append([song, 0])
return self.api_call('playlist.addSongs', {
'PLAYLIST_ID': playlist_id,
'songs': newSongs,
'offset': offset
})
def add_song_to_playlist(self, playlist_id, sng_id, offset=-1):
return self.add_songs_to_playlist(playlist_id, [sng_id], offset)
def remove_songs_from_playlist(self, playlist_id, songs):
newSongs = []
for song in songs:
newSongs.append([song, 0])
return self.api_call('playlist.deleteSongs', {
'PLAYLIST_ID': playlist_id,
'songs': newSongs
})
def remove_song_from_playlist(self, playlist_id, sng_id):
return self.remove_songs_from_playlist(playlist_id, [sng_id])
def delete_playlist(self, playlist_id):
return self.api_call('playlist.delete', {'PLAYLIST_ID': playlist_id})
def add_song_to_favorites(self, sng_id):
return self.api_call('favorite_song.add', {'SNG_ID': sng_id})
def remove_song_from_favorites(self, sng_id):
return self.api_call('favorite_song.remove', {'SNG_ID': sng_id})
def add_album_to_favorites(self, alb_id):
return self.api_call('album.addFavorite', {'ALB_ID': alb_id})
def remove_album_from_favorites(self, alb_id):
return self.api_call('album.deleteFavorite', {'ALB_ID': alb_id})
def add_artist_to_favorites(self, art_id):
return self.api_call('artist.addFavorite', {'ART_ID': art_id})
def remove_artist_from_favorites(self, art_id):
return self.api_call('artist.deleteFavorite', {'ART_ID': art_id})
def add_playlist_to_favorites(self, playlist_id):
return self.api_call('playlist.addFavorite', {'PARENT_PLAYLIST_ID': playlist_id})
def remove_playlist_from_favorites(self, playlist_id):
return self.api_call('playlist.deleteFavorite', {'PLAYLIST_ID': playlist_id})
def get_page(self, page):
params = {
'gateway_input': json.dumps({
'PAGE': page,
'VERSION': '2.3',
'SUPPORT': {
'grid': [
'channel',
'album'
],
'horizontal-grid': [
'album'
],
},
'LANG': 'en'
})
}
return self.api_call('page.get', params=params)
def search(self, query, index=0, limit=10, suggest=True, artist_suggest=True, top_tracks=True):
return self.api_call('deezer.pageSearch', {
"query": query,
"start": index,
"nb": limit,
"suggest": suggest,
"artist_suggest": artist_suggest,
"top_tracks": top_tracks
})
def search_music(self, query, type, index=0, limit=10):
return self.api_call('search.music', {
"query": query,
"filter": "ALL",
"output": type,
"start": index,
"nb": limit
})
# Extra calls
def get_artist_discography_tabs(self, art_id, limit=100):
index = 0
releases = []
result = {'all': []}
ids = []
# Get all releases
while True:
response = self.get_artist_discography(art_id, index=index, limit=limit)
releases += response['data']
index += limit
if index > response['total']:
break
for release in releases:
if release['ALB_ID'] not in ids:
ids.append(release['ALB_ID'])
obj = map_artist_album(release)
if (release['ART_ID'] == art_id or release['ART_ID'] != art_id and release['ROLE_ID'] == 0) and release['ARTISTS_ALBUMS_IS_OFFICIAL']:
# Handle all base record types
if not obj['record_type'] in result:
result[obj['record_type']] = []
result[obj['record_type']].append(obj)
result['all'].append(obj)
else:
# Handle albums where the artist is featured
if release['ROLE_ID'] == 5:
if not 'featured' in result:
result['featured'] = []
result['featured'].append(obj)
# Handle "more" albums
elif release['ROLE_ID'] == 0:
if not 'more' in result:
result['more'] = []
result['more'].append(obj)
result['all'].append(obj)
return result
def get_track_with_fallback(self, sng_id):
body = None
if int(sng_id) > 0:
try:
body = self.get_track_page(sng_id)
except:
pass
if body:
if 'LYRICS' in body:
body['DATA']['LYRICS'] = body['LYRICS']
if 'ISRC' in body:
body['DATA']['ALBUM_FALLBACK'] = body['ISRC']
body = body['DATA']
else:
body = self.get_track(sng_id)
return body
def get_user_playlists(self, user_id, limit=25):
user_profile_page = self.get_user_profile_page(user_id, 'playlists', limit=limit)
blog_name = user_profile_page['DATA']['USER'].get('BLOG_NAME', "Unknown")
data = user_profile_page['TAB']['playlists']['data']
result = []
for playlist in data:
result.append(map_user_playlist(playlist, blog_name))
return result
def get_user_albums(self, user_id, limit=25):
data = self.get_user_profile_page(user_id, 'albums', limit=limit)['TAB']['albums']['data']
result = []
for album in data:
result.append(map_user_album(album))
return result
def get_user_artists(self, user_id, limit=25):
data = self.get_user_profile_page(user_id, 'artists', limit=limit)['TAB']['artists']['data']
result = []
for artist in data:
result.append(map_user_artist(artist))
return result
def get_user_tracks(self, user_id, limit=25):
user_data = self.get_user_data()
if user_data['USER']['USER_ID'] == user_id: return self.get_my_favorite_tracks()
data = self.get_user_profile_page(user_id, 'loved', limit=limit)['TAB']['loved']['data']
result = []
for track in data:
result.append(map_user_track(track))
return result
def get_my_favorite_tracks(self, limit=25):
ids_raw = self.get_user_favorite_ids(limit=limit)
ids = [ x['SNG_ID'] for x in ids_raw['data'] ]
if len(ids) == 0: return []
data = self.get_tracks(ids)
result = []
for (i, track) in enumerate(data):
track = dict(track, **ids_raw['data'][i])
result.append(map_user_track(track))
return result