mirror of
https://gitlab.com/RemixDev/deemix-py.git
synced 2025-01-19 21:28:43 +00:00
333 lines
10 KiB
Python
Executable file
333 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
import binascii
|
||
from Cryptodome.Hash import MD5
|
||
from Cryptodome.Util.Padding import pad
|
||
|
||
from Cryptodome.Cipher import Blowfish, AES
|
||
import requests
|
||
|
||
import time
|
||
|
||
USER_AGENT_HEADER = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36"
|
||
|
||
|
||
class Deezer:
|
||
def __init__(self):
|
||
self.api_url = "http://www.deezer.com/ajax/gw-light.php"
|
||
self.legacy_api_url = "https://api.deezer.com/"
|
||
self.http_headers = {
|
||
"User-Agent": USER_AGENT_HEADER
|
||
}
|
||
self.album_pictures_host = "https://e-cdns-images.dzcdn.net/images/cover/"
|
||
self.artist_pictures_host = "https://e-cdns-images.dzcdn.net/images/artist/"
|
||
self.user = {}
|
||
self.session = requests.Session()
|
||
self.logged_in = False
|
||
self.session.post("http://www.deezer.com/", headers=self.http_headers)
|
||
self.sid = self.session.cookies.get('sid')
|
||
|
||
def get_token(self):
|
||
token_data = self.gw_api_call('deezer.getUserData')
|
||
return token_data["results"]["checkForm"]
|
||
|
||
def get_track_md5(self, sng_id):
|
||
try:
|
||
site = self.session.post(
|
||
"https://api.deezer.com/1.0/gateway.php",
|
||
params={
|
||
'api_key': "4VCYIJUCDLOUELGD1V8WBVYBNVDYOXEWSLLZDONGBBDFVXTZJRXPR29JRLQFO6ZE",
|
||
'sid': self.sid,
|
||
'input': '3',
|
||
'output': '3',
|
||
'method': 'song_getData'
|
||
},
|
||
timeout=30,
|
||
json={'sng_id': sng_id},
|
||
headers=self.http_headers
|
||
)
|
||
except:
|
||
time.sleep(2)
|
||
return self.get_track_md5(sng_id)
|
||
response = site.json()
|
||
return response['results']['PUID']
|
||
|
||
def gw_api_call(self, method, args={}):
|
||
try:
|
||
result = self.session.post(
|
||
self.api_url,
|
||
params={
|
||
'api_version': "1.0",
|
||
'api_token': 'null' if method == 'deezer.getUserData' else self.get_token(),
|
||
'input': '3',
|
||
'method': method
|
||
},
|
||
timeout=30,
|
||
json=args,
|
||
headers=self.http_headers
|
||
)
|
||
except:
|
||
time.sleep(2)
|
||
return self.gw_api_call(method, args)
|
||
return result.json()
|
||
|
||
def api_call(self, method, args={}):
|
||
try:
|
||
result = self.session.get(
|
||
self.legacy_api_url + method,
|
||
params=args,
|
||
headers=self.http_headers,
|
||
timeout=30
|
||
)
|
||
result_json = result.json()
|
||
except:
|
||
time.sleep(2)
|
||
return self.api_call(method, args)
|
||
if 'error' in result_json.keys():
|
||
raise APIError()
|
||
return result_json
|
||
|
||
def login(self, email, password, re_captcha_token):
|
||
check_form_login = self.gw_api_call("deezer.getUserData")
|
||
login = self.session.post(
|
||
"https://www.deezer.com/ajax/action.php",
|
||
data={
|
||
'type': 'login',
|
||
'mail': email,
|
||
'password': password,
|
||
'checkFormLogin': check_form_login['results']['checkFormLogin'],
|
||
'reCaptchaToken': re_captcha_token
|
||
},
|
||
headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', **self.http_headers}
|
||
)
|
||
if 'success' not in login.text:
|
||
self.logged_in = False
|
||
return False
|
||
user_data = self.gw_api_call("deezer.getUserData")
|
||
self.user = {
|
||
'email': email,
|
||
'id': user_data["results"]["USER"]["USER_ID"],
|
||
'name': user_data["results"]["USER"]["BLOG_NAME"],
|
||
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
|
||
"USER"] else ""
|
||
}
|
||
self.logged_in = True
|
||
return True
|
||
|
||
def login_via_arl(self, arl):
|
||
cookie_obj = requests.cookies.create_cookie(
|
||
domain='deezer.com',
|
||
name='arl',
|
||
value=arl,
|
||
path="/",
|
||
rest={'HttpOnly': True}
|
||
)
|
||
self.session.cookies.set_cookie(cookie_obj)
|
||
user_data = self.gw_api_call("deezer.getUserData")
|
||
if user_data["results"]["USER"]["USER_ID"] == 0:
|
||
self.logged_in = False
|
||
return 0
|
||
self.user = {
|
||
'id': user_data["results"]["USER"]["USER_ID"],
|
||
'name': user_data["results"]["USER"]["BLOG_NAME"],
|
||
'picture': user_data["results"]["USER"]["USER_PICTURE"] if "USER_PICTURE" in user_data["results"][
|
||
"USER"] else ""
|
||
}
|
||
self.logged_in = True
|
||
return 1
|
||
|
||
def get_track_gw(self, sng_id):
|
||
if int(sng_id) < 0:
|
||
body = self.gw_api_call('song.getData', {'sng_id': sng_id})
|
||
else:
|
||
body = self.gw_api_call('deezer.pageTrack', {'sng_id': sng_id})
|
||
if 'LYRICS' in body['results']:
|
||
body['results']['DATA']['LYRICS'] = body['results']['LYRICS']
|
||
body['results'] = body['results']['DATA']
|
||
return body['results']
|
||
|
||
def get_tracks_gw(self, ids):
|
||
tracks_array = []
|
||
body = self.gw_api_call('song.getListData', {'sng_ids': ids})
|
||
errors = 0
|
||
for i in range(len(ids)):
|
||
if ids[i] != 0:
|
||
tracks_array.append(body['results']['data'][i - errors])
|
||
else:
|
||
errors += 1
|
||
tracks_array.append({
|
||
'SNG_ID': 0,
|
||
'SNG_TITLE': '',
|
||
'DURATION': 0,
|
||
'MD5_ORIGIN': 0,
|
||
'MEDIA_VERSION': 0,
|
||
'FILESIZE': 0,
|
||
'ALB_TITLE': "",
|
||
'ALB_PICTURE': "",
|
||
'ART_ID': 0,
|
||
'ART_NAME': ""
|
||
})
|
||
return tracks_array
|
||
|
||
def get_album_gw(self, alb_id):
|
||
return self.gw_api_call('album.getData', {'alb_id': alb_id})['results']
|
||
|
||
def get_album_tracks_gw(self, alb_id):
|
||
tracks_array = []
|
||
body = self.gw_api_call('song.getListByAlbum', {'alb_id': alb_id, 'nb': -1})
|
||
for track in body['results']['data']:
|
||
_track = track
|
||
_track['position'] = body['results']['data'].index(track)
|
||
tracks_array.append(_track)
|
||
return tracks_array
|
||
|
||
def get_artist_gw(self, art_id):
|
||
return self.gw_api_call('deezer.pageArtist', {'art_id': art_id})
|
||
|
||
def get_playlist_gw(self, playlist_id):
|
||
return self.gw_api_call('deezer.pagePlaylist', {'playlist_id': playlist_id})
|
||
|
||
def get_playlist_tracks_gw(self, playlist_id):
|
||
tracks_array = []
|
||
body = self.gw_api_call('playlist.getSongs', {'playlist_id': playlist_id, 'nb': -1})
|
||
for track in body['results']['data']:
|
||
track['position'] = body['results']['data'].index(track)
|
||
tracks_array.append(track)
|
||
return tracks_array
|
||
|
||
def get_artist_toptracks_gw(self, art_id):
|
||
tracks_array = []
|
||
body = self.gw_api_call('artist.getTopTrack', {'art_id': art_id, 'nb': 100})
|
||
for track in body['results']['data']:
|
||
track['position'] = body['results']['data'].index(track)
|
||
tracks_array.append(track)
|
||
return tracks_array
|
||
|
||
def search_main_gw(self, term):
|
||
results = self.gw_api_call('deezer.pageSearch', {"query": term, "start": 0, "nb": 40, "suggest": True, "artist_suggest": True, "top_tracks": True})['results']
|
||
order = []
|
||
for x in results['ORDER']:
|
||
if x in ['TOP_RESULT', 'TRACK', 'ALBUM', 'ARTIST', 'PLAYLIST']:
|
||
order.append(x)
|
||
results['ORDER'] = order
|
||
return results
|
||
|
||
def search_gw(self, term, type, start, nb=20):
|
||
return self.gw_api_call('search.music', {"query": term, "filter":"ALL", "output":type, "start": start, "nb": nb})['results']
|
||
|
||
def get_lyrics_gw(self, sng_id):
|
||
return self.gw_api_call('song.getLyrics', {'sng_id': sng_id})["results"]
|
||
|
||
def get_user_playlist(self, user_id):
|
||
return self.api_call('user/' + str(user_id) + '/playlists', {'limit': -1})
|
||
|
||
def get_track(self, user_id):
|
||
return self.api_call('track/' + str(user_id))
|
||
|
||
def get_track_by_ISRC(self, isrc):
|
||
return self.api_call('track/isrc:' + isrc)
|
||
|
||
def get_charts_top_country(self):
|
||
return self.get_user_playlist('637006841')
|
||
|
||
def get_playlist(self, playlist_id):
|
||
return self.api_call('playlist/' + str(playlist_id))
|
||
|
||
def get_playlist_tracks(self, playlist_id):
|
||
return self.api_call('playlist/' + str(playlist_id) + '/tracks', {'limit': -1})
|
||
|
||
def get_album(self, album_id):
|
||
return self.api_call('album/' + str(album_id))
|
||
|
||
def get_album_by_UPC(self, upc):
|
||
return self.api_call('album/upc:' + str(upc))
|
||
|
||
def get_album_tracks(self, album_id):
|
||
return self.api_call('album/' + str(album_id) + '/tracks', {'limit': -1})
|
||
|
||
def get_artist(self, artist_id):
|
||
return self.api_call('artist/' + str(artist_id))
|
||
|
||
def get_artist_albums(self, artist_id):
|
||
return self.api_call('artist/' + str(artist_id) + '/albums', {'limit': -1})
|
||
|
||
def search(self, term, search_type, limit=30):
|
||
return self.api_call('search/' + search_type, {'q': term, 'limit': limit})
|
||
|
||
def decrypt_track(self, track_id, input, output):
|
||
response = open(input, 'rb')
|
||
outfile = open(output, 'wb')
|
||
blowfish_key = str.encode(self._get_blowfish_key(str(track_id)))
|
||
i = 0
|
||
while True:
|
||
chunk = response.read(2048)
|
||
if not chunk:
|
||
break
|
||
if (i % 3) == 0 and len(chunk) == 2048:
|
||
chunk = Blowfish.new(blowfish_key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07").decrypt(chunk)
|
||
outfile.write(chunk)
|
||
i += 1
|
||
|
||
def stream_track(self, track_id, url, stream):
|
||
try:
|
||
request = requests.get(url, headers=self.http_headers, stream=True, timeout=30)
|
||
except:
|
||
time.sleep(2)
|
||
return self.stream_track(track_id, url, stream)
|
||
request.raise_for_status()
|
||
blowfish_key = str.encode(self._get_blowfish_key(str(track_id)))
|
||
i = 0
|
||
for chunk in request.iter_content(2048):
|
||
if (i % 3) == 0 and len(chunk) == 2048:
|
||
chunk = Blowfish.new(blowfish_key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07").decrypt(chunk)
|
||
stream.write(chunk)
|
||
i += 1
|
||
|
||
def _md5(self, data):
|
||
h = MD5.new()
|
||
h.update(str.encode(data) if isinstance(data, str) else data)
|
||
return h.hexdigest()
|
||
|
||
def _get_blowfish_key(self, trackId):
|
||
SECRET = 'g4el58wc' + '0zvf9na1'
|
||
idMd5 = self._md5(trackId)
|
||
bfKey = ""
|
||
for i in range(16):
|
||
bfKey += chr(ord(idMd5[i]) ^ ord(idMd5[i + 16]) ^ ord(SECRET[i]))
|
||
return bfKey
|
||
|
||
def get_track_stream_url(self, sng_id, md5, media_version, format):
|
||
urlPart = b'\xa4'.join(
|
||
[str.encode(md5), str.encode(str(format)), str.encode(str(sng_id)), str.encode(str(media_version))])
|
||
md5val = self._md5(urlPart)
|
||
step2 = str.encode(md5val) + b'\xa4' + urlPart + b'\xa4'
|
||
step2 = pad(step2, 16)
|
||
urlPart = binascii.hexlify(AES.new(b'jo6aey6haid2Teih', AES.MODE_ECB).encrypt(step2))
|
||
return "https://e-cdns-proxy-" + md5[0] + ".dzcdn.net/mobile/1/" + urlPart.decode("utf-8")
|
||
|
||
def get_track_from_metadata(self, artist, track, album):
|
||
artist = artist.replace("–","-").replace("’", "'")
|
||
track = track.replace("–","-").replace("’", "'")
|
||
album = album.replace("–","-").replace("’", "'")
|
||
|
||
resp = self.search(f'artist:"{artist}" track:"{track}" album:"{album}"', "track", 1)
|
||
if len(resp['data'])>0:
|
||
return resp['data'][0]['id']
|
||
resp = self.search(f'artist:"{artist}" track:"{track}"', "track", 1)
|
||
if len(resp['data'])>0:
|
||
return resp['data'][0]['id']
|
||
if "(" in track and ")" in track and track.find("(") < track.find(")"):
|
||
resp = self.search(f'artist:"{artist}" track:"{track[:track.find("(")]}"', "track", 1)
|
||
if len(resp['data'])>0:
|
||
return resp['data'][0]['id']
|
||
elif " - " in track:
|
||
resp = self.search(f'artist:"{artist}" track:"{track[:track.find(" - ")]}"', "track", 1)
|
||
if len(resp['data'])>0:
|
||
return resp['data'][0]['id']
|
||
else:
|
||
return 0
|
||
return 0
|
||
|
||
|
||
class APIError(Exception):
|
||
pass
|