From 4515446a65b5d960bbc30be067d09171e7dbcd5f Mon Sep 17 00:00:00 2001 From: Stefano Date: Mon, 10 Feb 2020 11:20:02 +0100 Subject: [PATCH] Auto-refresh AuthCode flow token. (#435) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Auto-refresh AuthCode flow token. * Reformatted. * Reformatted. * Removed invalid syntax. * Removed abstract class SpotifyAuthManager. * Fix typo on docstrings. * Optionally try to fetch main input parameters from environment. Implements the capability of trying to fetch the following parameters from the environment, when they're not directly passed to the authorization handler. The affected parameters are: client_id, client_secret, redirect_uri. An SpotifyOauthError is raised if no value gets found. * Removed f-string for Python2 compatibility. * Fix line-too-long. * Remove useless import. * Add username to docstring. * Remove redundant return. * Fix empty lines print statement for backward compatibility with Python2. * Update simple4 example. * Set optional 'as_dict' parameter on OAuth 'get_access_token'. * Update changelog. Co-authored-by: Stéphane Bruckert --- CHANGELOG.md | 7 + examples/simple4.py | 12 + spotipy/client.py | 731 ++++++++++++++++++++++++++------------------ spotipy/oauth2.py | 285 ++++++++++++----- spotipy/util.py | 117 ++++--- 5 files changed, 715 insertions(+), 437 deletions(-) create mode 100644 examples/simple4.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2416b64..b6acc73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support for `playlist_cover_image` - Support `after` and `before` parameter in `current_user_recently_played` - CI for unit tests + - Automatic `token` refresh + - `auth_manager` and `oauth_manager` optional parameters added to `Spotify`'s init. + - Optional `username` parameter to be passed to SpotifyOAuth, to infer a `cache_path` automatically + - Optional `as_dict` parameter to control `SpotifyOAuth`'s `get_access_token` output type. However, this is going to be deprecated in the future, and the method will always return a token string + +### Changed + - Both `SpotifyClientCredentials` and `SpotifyOAuth` inherit from a common `SpotifyAuthBase` which handles common parameters and logics. ## [2.7.1] - 2020-01-20 diff --git a/examples/simple4.py b/examples/simple4.py new file mode 100644 index 0000000..c8761c7 --- /dev/null +++ b/examples/simple4.py @@ -0,0 +1,12 @@ +import spotipy +import os +from pprint import pprint + +def main(): + spotify = spotipy.Spotify(auth_manager=spotipy.SpotifyOAuth()) + me = spotify.me() + pprint(me) + + +if __name__ == "__main__": + main() diff --git a/spotipy/client.py b/spotipy/client.py index 621b4e6..ca7aeee 100644 --- a/spotipy/client.py +++ b/spotipy/client.py @@ -4,10 +4,7 @@ from __future__ import print_function -__all__ = [ - 'Spotify', - 'SpotifyException' -] +__all__ = ["Spotify", "SpotifyException"] import json import sys @@ -30,8 +27,9 @@ class SpotifyException(Exception): self.headers = headers def __str__(self): - return 'http status: {0}, code:{1} - {2}'.format( - self.http_status, self.code, self.msg) + return "http status: {0}, code:{1} - {2}".format( + self.http_status, self.code, self.msg + ) class Spotify(object): @@ -57,9 +55,16 @@ class Spotify(object): trace_out = False max_get_retries = 10 - def __init__(self, auth=None, requests_session=True, - client_credentials_manager=None, proxies=None, - requests_timeout=None): + def __init__( + self, + auth=None, + requests_session=True, + client_credentials_manager=None, + oauth_manager=None, + auth_manager=None, + proxies=None, + requests_timeout=None, + ): """ Creates a Spotify API client. @@ -71,15 +76,21 @@ class Spotify(object): for performance reasons (connection pooling). :param client_credentials_manager: SpotifyClientCredentials object + :param oauth_manager: + SpotifyOAuth object + :param auth_manager: + SpotifyOauth object or SpotifyClientCredentials object :param proxies: Definition of proxies (optional) :param requests_timeout: Tell Requests to stop waiting for a response after a given number of seconds """ - self.prefix = 'https://api.spotify.com/v1/' + self.prefix = "https://api.spotify.com/v1/" self._auth = auth self.client_credentials_manager = client_credentials_manager + self.oauth_manager = oauth_manager + self.auth_manager = auth_manager self.proxies = proxies self.requests_timeout = requests_timeout @@ -90,45 +101,60 @@ class Spotify(object): self._session = requests.Session() else: # Use the Requests API module as a "session". from requests import api + self._session = api + @property + def auth_manager(self): + return self._auth_manager + + @auth_manager.setter + def auth_manager(self, auth_manager): + if auth_manager is not None: + self._auth_manager = auth_manager + else: + self._auth_manager = ( + self.client_credentials_manager or self.oauth_manager + ) + def _auth_headers(self): if self._auth: - return {'Authorization': 'Bearer {0}'.format(self._auth)} - elif self.client_credentials_manager: - token = self.client_credentials_manager.get_access_token() - return {'Authorization': 'Bearer {0}'.format(token)} + return {"Authorization": "Bearer {0}".format(self._auth)} + elif self.auth_manager: + token = self.auth_manager.get_access_token(as_dict=False) + return {"Authorization": "Bearer {0}".format(token)} else: return {} def _internal_call(self, method, url, payload, params): args = dict(params=params) args["timeout"] = self.requests_timeout - if not url.startswith('http'): + if not url.startswith("http"): url = self.prefix + url headers = self._auth_headers() - if 'content_type' in args['params']: - headers['Content-Type'] = args['params']['content_type'] - del args['params']['content_type'] + if "content_type" in args["params"]: + headers["Content-Type"] = args["params"]["content_type"] + del args["params"]["content_type"] if payload: args["data"] = payload else: - headers['Content-Type'] = 'application/json' + headers["Content-Type"] = "application/json" if payload: args["data"] = json.dumps(payload) if self.trace_out: print(url) - with self._session.request(method, url, headers=headers, - proxies=self.proxies, **args) as r: + with self._session.request( + method, url, headers=headers, proxies=self.proxies, **args + ) as r: if self.trace: # pragma: no cover print() - print('Request headers:', headers) - print('Response headers:', r.headers) - print('HTTP status', r.status_code) + print("Request headers:", headers) + print("Response headers:", r.headers) + print("HTTP status", r.status_code) print(method, r.url) if payload: print("Data", json.dumps(payload)) @@ -137,12 +163,15 @@ class Spotify(object): r.raise_for_status() except BaseException: try: - msg = r.json()['error']['message'] + msg = r.json()["error"]["message"] except BaseException: - msg = 'error' - raise SpotifyException(r.status_code, - -1, '%s:\n %s' % (r.url, msg), - headers=r.headers) + msg = "error" + raise SpotifyException( + r.status_code, + -1, + "%s:\n %s" % (r.url, msg), + headers=r.headers, + ) try: results = r.json() @@ -150,7 +179,7 @@ class Spotify(object): results = None if self.trace: # pragma: no cover - print('Response:', results) + print("Response:", results) print() return results @@ -161,7 +190,7 @@ class Spotify(object): delay = 1 while retries > 0: try: - return self._internal_call('GET', url, payload, kwargs) + return self._internal_call("GET", url, payload, kwargs) except SpotifyException as e: retries -= 1 status = e.http_status @@ -171,21 +200,22 @@ class Spotify(object): raise else: sleep_seconds = int( - e.headers.get('Retry-After', delay)) - print('retrying ...' + str(sleep_seconds) + 'secs') + e.headers.get("Retry-After", delay) + ) + print("retrying ..." + str(sleep_seconds) + "secs") time.sleep(sleep_seconds + 1) delay += 1 else: raise except Exception as e: raise - print('exception', str(e)) + print("exception", str(e)) # some other exception. Requests have # been know to throw a BadStatusLine exception retries -= 1 if retries >= 0: - sleep_seconds = int(e.headers.get('Retry-After', delay)) - print('retrying ...' + str(delay) + 'secs') + sleep_seconds = int(e.headers.get("Retry-After", delay)) + print("retrying ..." + str(delay) + "secs") time.sleep(sleep_seconds + 1) delay += 1 else: @@ -194,17 +224,17 @@ class Spotify(object): def _post(self, url, args=None, payload=None, **kwargs): if args: kwargs.update(args) - return self._internal_call('POST', url, payload, kwargs) + return self._internal_call("POST", url, payload, kwargs) def _delete(self, url, args=None, payload=None, **kwargs): if args: kwargs.update(args) - return self._internal_call('DELETE', url, payload, kwargs) + return self._internal_call("DELETE", url, payload, kwargs) def _put(self, url, args=None, payload=None, **kwargs): if args: kwargs.update(args) - return self._internal_call('PUT', url, payload, kwargs) + return self._internal_call("PUT", url, payload, kwargs) def next(self, result): """ returns the next result given a paged result @@ -212,8 +242,8 @@ class Spotify(object): Parameters: - result - a previously returned paged result """ - if result['next']: - return self._get(result['next']) + if result["next"]: + return self._get(result["next"]) else: return None @@ -223,16 +253,16 @@ class Spotify(object): Parameters: - result - a previously returned paged result """ - if result['previous']: - return self._get(result['previous']) + if result["previous"]: + return self._get(result["previous"]) else: return None def _warn_old(self, msg): - print('warning:' + msg, file=sys.stderr) + print("warning:" + msg, file=sys.stderr) def _warn(self, msg, *args): - print('warning:' + msg.format(*args), file=sys.stderr) + print("warning:" + msg.format(*args), file=sys.stderr) def track(self, track_id): """ returns a single track given the track's ID, URI or URL @@ -241,8 +271,8 @@ class Spotify(object): - track_id - a spotify URI, URL or ID """ - trid = self._get_id('track', track_id) - return self._get('tracks/' + trid) + trid = self._get_id("track", track_id) + return self._get("tracks/" + trid) def tracks(self, tracks, market=None): """ returns a list of tracks given a list of track IDs, URIs, or URLs @@ -252,8 +282,8 @@ class Spotify(object): - market - an ISO 3166-1 alpha-2 country code. """ - tlist = [self._get_id('track', t) for t in tracks] - return self._get('tracks/?ids=' + ','.join(tlist), market=market) + tlist = [self._get_id("track", t) for t in tracks] + return self._get("tracks/?ids=" + ",".join(tlist), market=market) def artist(self, artist_id): """ returns a single artist given the artist's ID, URI or URL @@ -262,8 +292,8 @@ class Spotify(object): - artist_id - an artist ID, URI or URL """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid) + trid = self._get_id("artist", artist_id) + return self._get("artists/" + trid) def artists(self, artists): """ returns a list of artists given the artist IDs, URIs, or URLs @@ -272,11 +302,12 @@ class Spotify(object): - artists - a list of artist IDs, URIs or URLs """ - tlist = [self._get_id('artist', a) for a in artists] - return self._get('artists/?ids=' + ','.join(tlist)) + tlist = [self._get_id("artist", a) for a in artists] + return self._get("artists/?ids=" + ",".join(tlist)) - def artist_albums(self, artist_id, album_type=None, country=None, limit=20, - offset=0): + def artist_albums( + self, artist_id, album_type=None, country=None, limit=20, offset=0 + ): """ Get Spotify catalog information about an artist's albums Parameters: @@ -287,11 +318,16 @@ class Spotify(object): - offset - the index of the first album to return """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/albums', album_type=album_type, - country=country, limit=limit, offset=offset) + trid = self._get_id("artist", artist_id) + return self._get( + "artists/" + trid + "/albums", + album_type=album_type, + country=country, + limit=limit, + offset=offset, + ) - def artist_top_tracks(self, artist_id, country='US'): + def artist_top_tracks(self, artist_id, country="US"): """ Get Spotify catalog information about an artist's top 10 tracks by country. @@ -300,8 +336,8 @@ class Spotify(object): - country - limit the response to one particular country. """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/top-tracks', country=country) + trid = self._get_id("artist", artist_id) + return self._get("artists/" + trid + "/top-tracks", country=country) def artist_related_artists(self, artist_id): """ Get Spotify catalog information about artists similar to an @@ -311,8 +347,8 @@ class Spotify(object): Parameters: - artist_id - the artist ID, URI or URL """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/related-artists') + trid = self._get_id("artist", artist_id) + return self._get("artists/" + trid + "/related-artists") def album(self, album_id): """ returns a single album given the album's ID, URIs or URL @@ -321,8 +357,8 @@ class Spotify(object): - album_id - the album ID, URI or URL """ - trid = self._get_id('album', album_id) - return self._get('albums/' + trid) + trid = self._get_id("album", album_id) + return self._get("albums/" + trid) def album_tracks(self, album_id, limit=50, offset=0): """ Get Spotify catalog information about an album's tracks @@ -333,9 +369,10 @@ class Spotify(object): - offset - the index of the first item to return """ - trid = self._get_id('album', album_id) - return self._get('albums/' + trid + '/tracks/', limit=limit, - offset=offset) + trid = self._get_id("album", album_id) + return self._get( + "albums/" + trid + "/tracks/", limit=limit, offset=offset + ) def albums(self, albums): """ returns a list of albums given the album IDs, URIs, or URLs @@ -344,10 +381,10 @@ class Spotify(object): - albums - a list of album IDs, URIs or URLs """ - tlist = [self._get_id('album', a) for a in albums] - return self._get('albums/?ids=' + ','.join(tlist)) + tlist = [self._get_id("album", a) for a in albums] + return self._get("albums/?ids=" + ",".join(tlist)) - def search(self, q, limit=10, offset=0, type='track', market=None): + def search(self, q, limit=10, offset=0, type="track", market=None): """ searches for an item Parameters: @@ -360,8 +397,9 @@ class Spotify(object): - market - An ISO 3166-1 alpha-2 country code or the string from_token. """ - return self._get('search', q=q, limit=limit, - offset=offset, type=type, market=market) + return self._get( + "search", q=q, limit=limit, offset=offset, type=type, market=market + ) def user(self, user): """ Gets basic profile information about a Spotify User @@ -369,7 +407,7 @@ class Spotify(object): Parameters: - user - the id of the usr """ - return self._get('users/' + user) + return self._get("users/" + user) def current_user_playlists(self, limit=50, offset=0): """ Get current user playlists without required getting his profile @@ -388,11 +426,12 @@ class Spotify(object): - market - An ISO 3166-1 alpha-2 country code or the string from_token. """ - plid = self._get_id('playlist', playlist_id) + plid = self._get_id("playlist", playlist_id) return self._get("playlists/%s" % (plid), fields=fields, market=market) - def playlist_tracks(self, playlist_id, fields=None, - limit=100, offset=0, market=None): + def playlist_tracks( + self, playlist_id, fields=None, limit=100, offset=0, market=None + ): """ Get full details of the tracks of a playlist. Parameters: @@ -402,10 +441,14 @@ class Spotify(object): - offset - the index of the first track to return - market - an ISO 3166-1 alpha-2 country code. """ - plid = self._get_id('playlist', playlist_id) - return self._get("playlists/%s/tracks" % (plid), - limit=limit, offset=offset, fields=fields, - market=market) + plid = self._get_id("playlist", playlist_id) + return self._get( + "playlists/%s/tracks" % (plid), + limit=limit, + offset=offset, + fields=fields, + market=market, + ) def playlist_cover_image(self, playlist_id): """ Get cover of a playlist. @@ -413,12 +456,10 @@ class Spotify(object): Parameters: - playlist_id - the id of the playlist """ - plid = self._get_id('playlist', playlist_id) + plid = self._get_id("playlist", playlist_id) return self._get("playlists/%s/images" % (plid)) - def playlist_upload_cover_image(self, - playlist_id, - image_b64): + def playlist_upload_cover_image(self, playlist_id, image_b64): """ Replace the image used to represent a specific playlist Parameters: @@ -426,16 +467,18 @@ class Spotify(object): - image_b64 - image data as a Base64 encoded JPEG image string (maximum payload size is 256 KB) """ - plid = self._get_id('playlist', playlist_id) - return self._put("playlists/{}/images".format(plid), - payload=image_b64, - content_type="image/jpeg") + plid = self._get_id("playlist", playlist_id) + return self._put( + "playlists/{}/images".format(plid), + payload=image_b64, + content_type="image/jpeg", + ) - def user_playlist(self, user, playlist_id=None, - fields=None, market=None): + def user_playlist(self, user, playlist_id=None, fields=None, market=None): warnings.warn( "You should use `playlist(playlist_id)` instead", - DeprecationWarning) + DeprecationWarning, + ) """ Gets playlist of a user @@ -448,11 +491,19 @@ class Spotify(object): return self._get("users/%s/starred" % user) return self.playlist(playlist_id, fields=fields, market=market) - def user_playlist_tracks(self, user=None, playlist_id=None, fields=None, - limit=100, offset=0, market=None): + def user_playlist_tracks( + self, + user=None, + playlist_id=None, + fields=None, + limit=100, + offset=0, + market=None, + ): warnings.warn( "You should use `playlist_tracks(playlist_id)` instead", - DeprecationWarning) + DeprecationWarning, + ) """ Get full details of the tracks of a playlist owned by a user. @@ -464,8 +515,13 @@ class Spotify(object): - offset - the index of the first track to return - market - an ISO 3166-1 alpha-2 country code. """ - return self.playlist_tracks(playlist_id, limit=limit, offset=offset, - fields=fields, market=market) + return self.playlist_tracks( + playlist_id, + limit=limit, + offset=offset, + fields=fields, + market=market, + ) def user_playlists(self, user, limit=50, offset=0): """ Gets playlists of a user @@ -475,10 +531,11 @@ class Spotify(object): - limit - the number of items to return - offset - the index of the first item to return """ - return self._get("users/%s/playlists" % user, limit=limit, - offset=offset) + return self._get( + "users/%s/playlists" % user, limit=limit, offset=offset + ) - def user_playlist_create(self, user, name, public=True, description=''): + def user_playlist_create(self, user, name, public=True, description=""): """ Creates a playlist for a user Parameters: @@ -487,13 +544,19 @@ class Spotify(object): - public - is the created playlist public - description - the description of the playlist """ - data = {'name': name, 'public': public, 'description': description} + data = {"name": name, "public": public, "description": description} return self._post("users/%s/playlists" % (user,), payload=data) def user_playlist_change_details( - self, user, playlist_id, name=None, public=None, - collaborative=None, description=None): + self, + user, + playlist_id, + name=None, + public=None, + collaborative=None, + description=None, + ): """ Changes a playlist's name and/or public/private state Parameters: @@ -507,15 +570,16 @@ class Spotify(object): data = {} if isinstance(name, six.string_types): - data['name'] = name + data["name"] = name if isinstance(public, bool): - data['public'] = public + data["public"] = public if isinstance(collaborative, bool): - data['collaborative'] = collaborative + data["collaborative"] = collaborative if isinstance(description, six.string_types): - data['description'] = description - return self._put("users/%s/playlists/%s" % (user, playlist_id), - payload=data) + data["description"] = description + return self._put( + "users/%s/playlists/%s" % (user, playlist_id), payload=data + ) def user_playlist_unfollow(self, user, playlist_id): """ Unfollows (deletes) a playlist for a user @@ -524,11 +588,13 @@ class Spotify(object): - user - the id of the user - name - the name of the playlist """ - return self._delete("users/%s/playlists/%s/followers" % - (user, playlist_id)) + return self._delete( + "users/%s/playlists/%s/followers" % (user, playlist_id) + ) - def user_playlist_add_tracks(self, user, playlist_id, tracks, - position=None): + def user_playlist_add_tracks( + self, user, playlist_id, tracks, position=None + ): """ Adds tracks to a playlist Parameters: @@ -537,10 +603,13 @@ class Spotify(object): - tracks - a list of track URIs, URLs or IDs - position - the position to add the tracks """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] - return self._post("users/%s/playlists/%s/tracks" % (user, plid), - payload=ftracks, position=position) + plid = self._get_id("playlist", playlist_id) + ftracks = [self._get_uri("track", tid) for tid in tracks] + return self._post( + "users/%s/playlists/%s/tracks" % (user, plid), + payload=ftracks, + position=position, + ) def user_playlist_replace_tracks(self, user, playlist_id, tracks): """ Replace all tracks in a playlist @@ -550,15 +619,22 @@ class Spotify(object): - playlist_id - the id of the playlist - tracks - the list of track ids to add to the playlist """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] + plid = self._get_id("playlist", playlist_id) + ftracks = [self._get_uri("track", tid) for tid in tracks] payload = {"uris": ftracks} - return self._put("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._put( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_reorder_tracks( - self, user, playlist_id, range_start, insert_before, - range_length=1, snapshot_id=None): + self, + user, + playlist_id, + range_start, + insert_before, + range_length=1, + snapshot_id=None, + ): """ Reorder tracks in a playlist Parameters: @@ -571,17 +647,21 @@ class Spotify(object): inserted - snapshot_id - optional playlist's snapshot ID """ - plid = self._get_id('playlist', playlist_id) - payload = {"range_start": range_start, - "range_length": range_length, - "insert_before": insert_before} + plid = self._get_id("playlist", playlist_id) + payload = { + "range_start": range_start, + "range_length": range_length, + "insert_before": insert_before, + } if snapshot_id: payload["snapshot_id"] = snapshot_id - return self._put("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._put( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_remove_all_occurrences_of_tracks( - self, user, playlist_id, tracks, snapshot_id=None): + self, user, playlist_id, tracks, snapshot_id=None + ): """ Removes all occurrences of the given tracks from the given playlist Parameters: @@ -592,16 +672,18 @@ class Spotify(object): """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] + plid = self._get_id("playlist", playlist_id) + ftracks = [self._get_uri("track", tid) for tid in tracks] payload = {"tracks": [{"uri": track} for track in ftracks]} if snapshot_id: payload["snapshot_id"] = snapshot_id - return self._delete("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._delete( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_remove_specific_occurrences_of_tracks( - self, user, playlist_id, tracks, snapshot_id=None): + self, user, playlist_id, tracks, snapshot_id=None + ): """ Removes all occurrences of the given tracks from the given playlist Parameters: @@ -615,18 +697,21 @@ class Spotify(object): - snapshot_id - optional id of the playlist snapshot """ - plid = self._get_id('playlist', playlist_id) + plid = self._get_id("playlist", playlist_id) ftracks = [] for tr in tracks: - ftracks.append({ - "uri": self._get_uri("track", tr["uri"]), - "positions": tr["positions"], - }) + ftracks.append( + { + "uri": self._get_uri("track", tr["uri"]), + "positions": tr["positions"], + } + ) payload = {"tracks": ftracks} if snapshot_id: payload["snapshot_id"] = snapshot_id - return self._delete("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._delete( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_follow_playlist(self, playlist_owner_id, playlist_id): """ @@ -638,11 +723,14 @@ class Spotify(object): """ return self._put( - "users/{}/playlists/{}/followers".format(playlist_owner_id, - playlist_id)) + "users/{}/playlists/{}/followers".format( + playlist_owner_id, playlist_id + ) + ) def user_playlist_is_following( - self, playlist_owner_id, playlist_id, user_ids): + self, playlist_owner_id, playlist_id, user_ids + ): """ Check to see if the given users are following the given playlist @@ -654,15 +742,15 @@ class Spotify(object): """ endpoint = "users/{}/playlists/{}/followers/contains?ids={}" - return self._get(endpoint.format(playlist_owner_id, - playlist_id, - ','.join(user_ids))) + return self._get( + endpoint.format(playlist_owner_id, playlist_id, ",".join(user_ids)) + ) def me(self): """ Get detailed profile information about the current user. An alias for the 'current_user' method. """ - return self._get('me/') + return self._get("me/") def current_user(self): """ Get detailed profile information about the current user. @@ -671,9 +759,9 @@ class Spotify(object): return self.me() def current_user_playing_track(self): - ''' Get information about the current users currently playing track. - ''' - return self._get('me/player/currently-playing') + """ Get information about the current users currently playing track. + """ + return self._get("me/player/currently-playing") def current_user_saved_tracks(self, limit=20, offset=0): """ Gets a list of the tracks saved in the current authorized user's @@ -684,7 +772,7 @@ class Spotify(object): - offset - the index of the first track to return """ - return self._get('me/tracks', limit=limit, offset=offset) + return self._get("me/tracks", limit=limit, offset=offset) def current_user_followed_artists(self, limit=20, after=None): """ Gets a list of the artists followed by the current authorized user @@ -695,8 +783,9 @@ class Spotify(object): request """ - return self._get('me/following', type='artist', limit=limit, - after=after) + return self._get( + "me/following", type="artist", limit=limit, after=after + ) def current_user_saved_tracks_delete(self, tracks=None): """ Remove one or more tracks from the current user's @@ -707,8 +796,8 @@ class Spotify(object): """ tlist = [] if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._delete('me/tracks/?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + return self._delete("me/tracks/?ids=" + ",".join(tlist)) def current_user_saved_tracks_contains(self, tracks=None): """ Check if one or more tracks is already saved in @@ -719,8 +808,8 @@ class Spotify(object): """ tlist = [] if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._get('me/tracks/contains?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + return self._get("me/tracks/contains?ids=" + ",".join(tlist)) def current_user_saved_tracks_add(self, tracks=None): """ Add one or more tracks to the current user's @@ -731,11 +820,12 @@ class Spotify(object): """ tlist = [] if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._put('me/tracks/?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + return self._put("me/tracks/?ids=" + ",".join(tlist)) - def current_user_top_artists(self, limit=20, offset=0, - time_range='medium_term'): + def current_user_top_artists( + self, limit=20, offset=0, time_range="medium_term" + ): """ Get the current user's top artists Parameters: @@ -744,11 +834,13 @@ class Spotify(object): - time_range - Over what time frame are the affinities computed Valid-values: short_term, medium_term, long_term """ - return self._get('me/top/artists', time_range=time_range, limit=limit, - offset=offset) + return self._get( + "me/top/artists", time_range=time_range, limit=limit, offset=offset + ) - def current_user_top_tracks(self, limit=20, offset=0, - time_range='medium_term'): + def current_user_top_tracks( + self, limit=20, offset=0, time_range="medium_term" + ): """ Get the current user's top tracks Parameters: @@ -757,12 +849,12 @@ class Spotify(object): - time_range - Over what time frame are the affinities computed Valid-values: short_term, medium_term, long_term """ - return self._get('me/top/tracks', time_range=time_range, limit=limit, - offset=offset) + return self._get( + "me/top/tracks", time_range=time_range, limit=limit, offset=offset + ) - def current_user_recently_played(self, limit=50, after=None, - before=None): - ''' Get the current user's recently played tracks + def current_user_recently_played(self, limit=50, after=None, before=None): + """ Get the current user's recently played tracks Parameters: - limit - the number of entities to return @@ -772,9 +864,13 @@ class Spotify(object): - before - unix timestamp in milliseconds. Returns all items before (but not including) this cursor position. Cannot be used if after is specified - ''' - return self._get('me/player/recently-played', limit=limit, after=after, - before=before) + """ + return self._get( + "me/player/recently-played", + limit=limit, + after=after, + before=before, + ) def current_user_saved_albums(self, limit=20, offset=0): """ Gets a list of the albums saved in the current authorized user's @@ -785,7 +881,7 @@ class Spotify(object): - offset - the index of the first album to return """ - return self._get('me/albums', limit=limit, offset=offset) + return self._get("me/albums", limit=limit, offset=offset) def current_user_saved_albums_contains(self, albums=[]): """ Check if one or more albums is already saved in @@ -794,8 +890,8 @@ class Spotify(object): Parameters: - albums - a list of album URIs, URLs or IDs """ - alist = [self._get_id('album', a) for a in albums] - return self._get('me/albums/contains?ids=' + ','.join(alist)) + alist = [self._get_id("album", a) for a in albums] + return self._get("me/albums/contains?ids=" + ",".join(alist)) def current_user_saved_albums_add(self, albums=[]): """ Add one or more albums to the current user's @@ -803,8 +899,8 @@ class Spotify(object): Parameters: - albums - a list of album URIs, URLs or IDs """ - alist = [self._get_id('album', a) for a in albums] - return self._put('me/albums?ids=' + ','.join(alist)) + alist = [self._get_id("album", a) for a in albums] + return self._put("me/albums?ids=" + ",".join(alist)) def current_user_saved_albums_delete(self, albums=[]): """ Remove one or more albums from the current user's @@ -813,39 +909,40 @@ class Spotify(object): Parameters: - albums - a list of album URIs, URLs or IDs """ - alist = [self._get_id('album', a) for a in albums] - return self._delete('me/albums/?ids=' + ','.join(alist)) + alist = [self._get_id("album", a) for a in albums] + return self._delete("me/albums/?ids=" + ",".join(alist)) def user_follow_artists(self, ids=[]): - ''' Follow one or more artists + """ Follow one or more artists Parameters: - ids - a list of artist IDs - ''' - return self._put('me/following?type=artist&ids=' + ','.join(ids)) + """ + return self._put("me/following?type=artist&ids=" + ",".join(ids)) def user_follow_users(self, ids=[]): - ''' Follow one or more users + """ Follow one or more users Parameters: - ids - a list of user IDs - ''' - return self._put('me/following?type=user&ids=' + ','.join(ids)) + """ + return self._put("me/following?type=user&ids=" + ",".join(ids)) def user_unfollow_artists(self, ids=[]): - ''' Unfollow one or more artists + """ Unfollow one or more artists Parameters: - ids - a list of artist IDs - ''' - return self._delete('me/following?type=artist&ids=' + ','.join(ids)) + """ + return self._delete("me/following?type=artist&ids=" + ",".join(ids)) def user_unfollow_users(self, ids=[]): - ''' Unfollow one or more users + """ Unfollow one or more users Parameters: - ids - a list of user IDs - ''' - return self._delete('me/following?type=user&ids=' + ','.join(ids)) + """ + return self._delete("me/following?type=user&ids=" + ",".join(ids)) - def featured_playlists(self, locale=None, country=None, timestamp=None, - limit=20, offset=0): + def featured_playlists( + self, locale=None, country=None, timestamp=None, limit=20, offset=0 + ): """ Get a list of Spotify featured playlists Parameters: @@ -867,9 +964,14 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/featured-playlists', locale=locale, - country=country, timestamp=timestamp, limit=limit, - offset=offset) + return self._get( + "browse/featured-playlists", + locale=locale, + country=country, + timestamp=timestamp, + limit=limit, + offset=offset, + ) def new_releases(self, country=None, limit=20, offset=0): """ Get a list of new album releases featured in Spotify @@ -884,8 +986,9 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/new-releases', country=country, limit=limit, - offset=offset) + return self._get( + "browse/new-releases", country=country, limit=limit, offset=offset + ) def categories(self, country=None, locale=None, limit=20, offset=0): """ Get a list of new album releases featured in Spotify @@ -903,11 +1006,17 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/categories', country=country, locale=locale, - limit=limit, offset=offset) + return self._get( + "browse/categories", + country=country, + locale=locale, + limit=limit, + offset=offset, + ) - def category_playlists(self, category_id=None, country=None, limit=20, - offset=0): + def category_playlists( + self, category_id=None, country=None, limit=20, offset=0 + ): """ Get a list of new album releases featured in Spotify Parameters: @@ -922,11 +1031,22 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/categories/' + category_id + '/playlists', - country=country, limit=limit, offset=offset) + return self._get( + "browse/categories/" + category_id + "/playlists", + country=country, + limit=limit, + offset=offset, + ) - def recommendations(self, seed_artists=None, seed_genres=None, - seed_tracks=None, limit=20, country=None, **kwargs): + def recommendations( + self, + seed_artists=None, + seed_genres=None, + seed_tracks=None, + limit=20, + country=None, + **kwargs + ): """ Get a list of recommended tracks for one to five seeds. Parameters: @@ -950,38 +1070,52 @@ class Spotify(object): """ params = dict(limit=limit) if seed_artists: - params['seed_artists'] = ','.join( - [self._get_id('artist', a) for a in seed_artists]) + params["seed_artists"] = ",".join( + [self._get_id("artist", a) for a in seed_artists] + ) if seed_genres: - params['seed_genres'] = ','.join(seed_genres) + params["seed_genres"] = ",".join(seed_genres) if seed_tracks: - params['seed_tracks'] = ','.join( - [self._get_id('track', t) for t in seed_tracks]) + params["seed_tracks"] = ",".join( + [self._get_id("track", t) for t in seed_tracks] + ) if country: - params['market'] = country + params["market"] = country - for attribute in ["acousticness", "danceability", "duration_ms", - "energy", "instrumentalness", "key", "liveness", - "loudness", "mode", "popularity", "speechiness", - "tempo", "time_signature", "valence"]: + for attribute in [ + "acousticness", + "danceability", + "duration_ms", + "energy", + "instrumentalness", + "key", + "liveness", + "loudness", + "mode", + "popularity", + "speechiness", + "tempo", + "time_signature", + "valence", + ]: for prefix in ["min_", "max_", "target_"]: param = prefix + attribute if param in kwargs: params[param] = kwargs[param] - return self._get('recommendations', **params) + return self._get("recommendations", **params) def recommendation_genre_seeds(self): """ Get a list of genres available for the recommendations function. """ - return self._get('recommendations/available-genre-seeds') + return self._get("recommendations/available-genre-seeds") def audio_analysis(self, track_id): """ Get audio analysis for a track based upon its Spotify ID Parameters: - track_id - a track URI, URL or ID """ - trid = self._get_id('track', track_id) - return self._get('audio-analysis/' + trid) + trid = self._get_id("track", track_id) + return self._get("audio-analysis/" + trid) def audio_features(self, tracks=[]): """ Get audio features for one or multiple tracks based upon their Spotify IDs @@ -989,41 +1123,41 @@ class Spotify(object): - tracks - a list of track URIs, URLs or IDs, maximum: 50 ids """ if isinstance(tracks, str): - trackid = self._get_id('track', tracks) - results = self._get('audio-features/?ids=' + trackid) + trackid = self._get_id("track", tracks) + results = self._get("audio-features/?ids=" + trackid) else: - tlist = [self._get_id('track', t) for t in tracks] - results = self._get('audio-features/?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + results = self._get("audio-features/?ids=" + ",".join(tlist)) # the response has changed, look for the new style first, and if # its not there, fallback on the old style - if 'audio_features' in results: - return results['audio_features'] + if "audio_features" in results: + return results["audio_features"] else: return results def devices(self): - ''' Get a list of user's available devices. - ''' + """ Get a list of user's available devices. + """ return self._get("me/player/devices") def current_playback(self, market=None): - ''' Get information about user's current playback. + """ Get information about user's current playback. Parameters: - market - an ISO 3166-1 alpha-2 country code. - ''' + """ return self._get("me/player", market=market) def currently_playing(self, market=None): - ''' Get user's currently playing track. + """ Get user's currently playing track. Parameters: - market - an ISO 3166-1 alpha-2 country code. - ''' + """ return self._get("me/player/currently-playing", market=market) def transfer_playback(self, device_id, force_play=True): - ''' Transfer playback to another device. + """ Transfer playback to another device. Note that the API accepts a list of device ids, but only actually supports one. @@ -1031,16 +1165,14 @@ class Spotify(object): - device_id - transfer playback to this device - force_play - true: after transfer, play. false: keep current state. - ''' - data = { - 'device_ids': [device_id], - 'play': force_play - } + """ + data = {"device_ids": [device_id], "play": force_play} return self._put("me/player", payload=data) - def start_playback(self, device_id=None, - context_uri=None, uris=None, offset=None): - ''' Start or resume user's playback. + def start_playback( + self, device_id=None, context_uri=None, uris=None, offset=None + ): + """ Start or resume user's playback. Provide a `context_uri` to start playback or a album, artist, or playlist. @@ -1056,138 +1188,151 @@ class Spotify(object): - context_uri - spotify context uri to play - uris - spotify track uris - offset - offset into context by index or track - ''' + """ if context_uri is not None and uris is not None: - self._warn('specify either context uri or uris, not both') + self._warn("specify either context uri or uris, not both") return if uris is not None and not isinstance(uris, list): - self._warn('uris must be a list') + self._warn("uris must be a list") return data = {} if context_uri is not None: - data['context_uri'] = context_uri + data["context_uri"] = context_uri if uris is not None: - data['uris'] = uris + data["uris"] = uris if offset is not None: - data['offset'] = offset - return self._put(self._append_device_id( - "me/player/play", device_id), payload=data) + data["offset"] = offset + return self._put( + self._append_device_id("me/player/play", device_id), payload=data + ) def pause_playback(self, device_id=None): - ''' Pause user's playback. + """ Pause user's playback. Parameters: - device_id - device target for playback - ''' + """ return self._put(self._append_device_id("me/player/pause", device_id)) def next_track(self, device_id=None): - ''' Skip user's playback to next track. + """ Skip user's playback to next track. Parameters: - device_id - device target for playback - ''' + """ return self._post(self._append_device_id("me/player/next", device_id)) def previous_track(self, device_id=None): - ''' Skip user's playback to previous track. + """ Skip user's playback to previous track. Parameters: - device_id - device target for playback - ''' - return self._post(self._append_device_id( - "me/player/previous", device_id)) + """ + return self._post( + self._append_device_id("me/player/previous", device_id) + ) def seek_track(self, position_ms, device_id=None): - ''' Seek to position in current track. + """ Seek to position in current track. Parameters: - position_ms - position in milliseconds to seek to - device_id - device target for playback - ''' + """ if not isinstance(position_ms, int): - self._warn('position_ms must be an integer') + self._warn("position_ms must be an integer") return - return self._put(self._append_device_id( - "me/player/seek?position_ms=%s" % position_ms, device_id)) + return self._put( + self._append_device_id( + "me/player/seek?position_ms=%s" % position_ms, device_id + ) + ) def repeat(self, state, device_id=None): - ''' Set repeat mode for playback. + """ Set repeat mode for playback. Parameters: - state - `track`, `context`, or `off` - device_id - device target for playback - ''' - if state not in ['track', 'context', 'off']: - self._warn('invalid state') + """ + if state not in ["track", "context", "off"]: + self._warn("invalid state") return self._put( self._append_device_id( - "me/player/repeat?state=%s" % - state, device_id)) + "me/player/repeat?state=%s" % state, device_id + ) + ) def volume(self, volume_percent, device_id=None): - ''' Set playback volume. + """ Set playback volume. Parameters: - volume_percent - volume between 0 and 100 - device_id - device target for playback - ''' + """ if not isinstance(volume_percent, int): - self._warn('volume must be an integer') + self._warn("volume must be an integer") return if volume_percent < 0 or volume_percent > 100: - self._warn('volume must be between 0 and 100, inclusive') + self._warn("volume must be between 0 and 100, inclusive") return self._put( self._append_device_id( - "me/player/volume?volume_percent=%s" % - volume_percent, device_id)) + "me/player/volume?volume_percent=%s" % volume_percent, + device_id, + ) + ) def shuffle(self, state, device_id=None): - ''' Toggle playback shuffling. + """ Toggle playback shuffling. Parameters: - state - true or false - device_id - device target for playback - ''' + """ if not isinstance(state, bool): - self._warn('state must be a boolean') + self._warn("state must be a boolean") return state = str(state).lower() self._put( self._append_device_id( - "me/player/shuffle?state=%s" % - state, device_id)) + "me/player/shuffle?state=%s" % state, device_id + ) + ) def _append_device_id(self, path, device_id): - ''' Append device ID to API path. + """ Append device ID to API path. Parameters: - device_id - device id to append - ''' + """ if device_id: - if '?' in path: + if "?" in path: path += "&device_id=%s" % device_id else: path += "?device_id=%s" % device_id return path def _get_id(self, type, id): - fields = id.split(':') + fields = id.split(":") if len(fields) >= 3: if type != fields[-2]: - self._warn('expected id of type %s but found type %s %s' % - (type, fields[-2], id)) + self._warn( + "expected id of type %s but found type %s %s" + % (type, fields[-2], id) + ) return fields[-1] - fields = id.split('/') + fields = id.split("/") if len(fields) >= 3: itype = fields[-2] if type != itype: - self._warn('expected id of type %s but found type %s %s' % - (type, itype, id)) - return fields[-1].split('?')[0] + self._warn( + "expected id of type %s but found type %s %s" + % (type, itype, id) + ) + return fields[-1].split("?")[0] return id def _get_uri(self, type, id): - return 'spotify:' + type + ":" + self._get_id(type, id) + return "spotify:" + type + ":" + self._get_id(type, id) diff --git a/spotipy/oauth2.py b/spotipy/oauth2.py index 1c8c3d1..1f00da2 100644 --- a/spotipy/oauth2.py +++ b/spotipy/oauth2.py @@ -3,10 +3,10 @@ from __future__ import print_function __all__ = [ - 'is_token_expired', - 'SpotifyClientCredentials', - 'SpotifyOAuth', - 'SpotifyOauthError' + "is_token_expired", + "SpotifyClientCredentials", + "SpotifyOAuth", + "SpotifyOauthError", ] import base64 @@ -14,8 +14,10 @@ import json import os import sys import time +import warnings import requests +from spotipy.util import CLIENT_CREDS_ENV_VARS # Workaround to support both python 2 & 3 import six @@ -28,19 +30,56 @@ class SpotifyOauthError(Exception): def _make_authorization_headers(client_id, client_secret): auth_header = base64.b64encode( - six.text_type( - '{}:{}'.format(client_id, client_secret) - ).encode('ascii')) - return {'Authorization': 'Basic %s' % auth_header.decode('ascii')} + six.text_type(client_id + ":" + client_secret).encode("ascii") + ) + return {"Authorization": "Basic %s" % auth_header.decode("ascii")} def is_token_expired(token_info): now = int(time.time()) - return token_info['expires_at'] - now < 60 + return token_info["expires_at"] - now < 60 -class SpotifyClientCredentials(object): - OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' +def _ensure_value(value, env_key): + env_val = CLIENT_CREDS_ENV_VARS[env_key] + _val = value or os.getenv(env_val) + if _val is None: + msg = "No %s. Pass it or set a %s environment variable." % ( + env_key, + env_val, + ) + raise SpotifyOauthError(msg) + return _val + + +class SpotifyAuthBase(object): + @property + def client_id(self): + return self._client_id + + @client_id.setter + def client_id(self, val): + self._client_id = _ensure_value(val, "client_id") + + @property + def client_secret(self): + return self._client_secret + + @client_secret.setter + def client_secret(self, val): + self._client_secret = _ensure_value(val, "client_secret") + + @property + def redirect_uri(self): + return self._redirect_uri + + @redirect_uri.setter + def redirect_uri(self, val): + self._redirect_uri = _ensure_value(val, "redirect_uri") + + +class SpotifyClientCredentials(SpotifyAuthBase): + OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token" def __init__(self, client_id=None, client_secret=None, proxies=None): """ @@ -48,17 +87,6 @@ class SpotifyClientCredentials(object): constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET environment variables """ - if not client_id: - client_id = os.getenv('SPOTIPY_CLIENT_ID') - - if not client_secret: - client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') - - if not client_id: - raise SpotifyOauthError('No client id') - - if not client_secret: - raise SpotifyOauthError('No client secret') self.client_id = client_id self.client_secret = client_secret @@ -71,23 +99,28 @@ class SpotifyClientCredentials(object): Else feches a new token and returns it """ if self.token_info and not self.is_token_expired(self.token_info): - return self.token_info['access_token'] + return self.token_info["access_token"] token_info = self._request_access_token() token_info = self._add_custom_values_to_token_info(token_info) self.token_info = token_info - return self.token_info['access_token'] + return self.token_info["access_token"] def _request_access_token(self): """Gets client credentials access token """ - payload = {'grant_type': 'client_credentials'} + payload = {"grant_type": "client_credentials"} headers = _make_authorization_headers( - self.client_id, self.client_secret) + self.client_id, self.client_secret + ) - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, verify=True, - proxies=self.proxies) + response = requests.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + verify=True, + proxies=self.proxies, + ) if response.status_code != 200: raise SpotifyOauthError(response.reason) token_info = response.json() @@ -101,21 +134,30 @@ class SpotifyClientCredentials(object): Store some values that aren't directly provided by a Web API response. """ - token_info['expires_at'] = int(time.time()) + token_info['expires_in'] + token_info["expires_at"] = int(time.time()) + token_info["expires_in"] return token_info -class SpotifyOAuth(object): - ''' +class SpotifyOAuth(SpotifyAuthBase): + """ Implements Authorization Code Flow for Spotify's OAuth implementation. - ''' + """ - OAUTH_AUTHORIZE_URL = 'https://accounts.spotify.com/authorize' - OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' + OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize" + OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token" - def __init__(self, client_id, client_secret, redirect_uri, - state=None, scope=None, cache_path=None, proxies=None): - ''' + def __init__( + self, + client_id=None, + client_secret=None, + redirect_uri=None, + state=None, + scope=None, + cache_path=None, + username=None, + proxies=None, + ): + """ Creates a SpotifyOAuth object Parameters: @@ -125,20 +167,32 @@ class SpotifyOAuth(object): - state - security state - scope - the desired scope of the request - cache_path - path to location to save tokens - ''' + - username - username of current client + """ self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.state = state self.cache_path = cache_path + self.username = username or os.getenv( + CLIENT_CREDS_ENV_VARS["client_username"] + ) self.scope = self._normalize_scope(scope) self.proxies = proxies def get_cached_token(self): - ''' Gets a cached auth token - ''' + """ Gets a cached auth token + """ token_info = None + + if not self.cache_path and self.username: + self.cache_path = ".cache-" + str(self.username) + elif not self.cache_path and not self.username: + raise SpotifyOauthError( + "You must either set a cache_path or a username." + ) + if self.cache_path: try: f = open(self.cache_path) @@ -147,13 +201,15 @@ class SpotifyOAuth(object): token_info = json.loads(token_info_string) # if scopes don't match, then bail - if 'scope' not in token_info or not self._is_scope_subset( - self.scope, token_info['scope']): + if "scope" not in token_info or not self._is_scope_subset( + self.scope, token_info["scope"] + ): return None if self.is_token_expired(token_info): token_info = self.refresh_access_token( - token_info['refresh_token']) + token_info["refresh_token"] + ) except IOError: pass @@ -162,7 +218,7 @@ class SpotifyOAuth(object): def _save_token_info(self, token_info): if self.cache_path: try: - f = open(self.cache_path, 'w') + f = open(self.cache_path, "w") f.write(json.dumps(token_info)) f.close() except IOError: @@ -171,8 +227,9 @@ class SpotifyOAuth(object): def _is_scope_subset(self, needle_scope, haystack_scope): needle_scope = set(needle_scope.split()) if needle_scope else set() - haystack_scope = set( - haystack_scope.split()) if haystack_scope else set() + haystack_scope = ( + set(haystack_scope.split()) if haystack_scope else set() + ) return needle_scope <= haystack_scope def is_token_expired(self, token_info): @@ -181,17 +238,19 @@ class SpotifyOAuth(object): def get_authorize_url(self, state=None, show_dialog=False): """ Gets the URL to use to authorize this app """ - payload = {'client_id': self.client_id, - 'response_type': 'code', - 'redirect_uri': self.redirect_uri} + payload = { + "client_id": self.client_id, + "response_type": "code", + "redirect_uri": self.redirect_uri, + } if self.scope: - payload['scope'] = self.scope + payload["scope"] = self.scope if state is None: state = self.state if state is not None: - payload['state'] = state + payload["state"] = state if show_dialog: - payload['show_dialog'] = True + payload["show_dialog"] = True urlparams = urllibparse.urlencode(payload) @@ -212,70 +271,138 @@ class SpotifyOAuth(object): def _make_authorization_headers(self): return _make_authorization_headers(self.client_id, self.client_secret) - def get_access_token(self, code): + def get_auth_response(self): + print( + """ + + User authentication requires interaction with your + web browser. Once you enter your credentials and + give authorization, you will be redirected to + a url. Paste that url you were directed to to + complete the authorization. + + """ + ) + auth_url = self.get_authorize_url() + try: + import webbrowser + + webbrowser.open(auth_url) + print("Opened %s in your browser" % auth_url) + except BaseException: + print("Please navigate here: %s" % auth_url) + print("") + print("") + try: + response = raw_input("Enter the URL you were redirected to: ") + except NameError: + response = input("Enter the URL you were redirected to: ") + print("") + print("") + return response + + def get_authorization_code(self, response=None): + return self.parse_response_code(response or self.get_auth_response()) + + def get_access_token(self, code=None, as_dict=True): """ Gets the access token for the app given the code Parameters: - code - the response code + - as_dict - a boolean indicating if returning the access token + as a token_info dictionary, otherwise it will be returned + as a string. """ + if as_dict: + print("") + warnings.warn( + "You're using 'as_dict = True'." + "get_access_token will return the token string directly in future " + "versions. Please adjust your code accordingly, or use " + "get_cached_token instead.", + DeprecationWarning, + stacklevel=2, + ) + print("") + token_info = self.get_cached_token() + if token_info is not None: + if is_token_expired(token_info): + token_info = self.refresh_access_token( + token_info["refresh_token"] + ) + return token_info if as_dict else token_info["access_token"] - payload = {'redirect_uri': self.redirect_uri, - 'code': code, - 'grant_type': 'authorization_code'} + payload = { + "redirect_uri": self.redirect_uri, + "code": code or self.get_authorization_code(), + "grant_type": "authorization_code", + } if self.scope: - payload['scope'] = self.scope + payload["scope"] = self.scope if self.state: - payload['state'] = self.state + payload["state"] = self.state headers = self._make_authorization_headers() - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, verify=True, - proxies=self.proxies) + response = requests.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + verify=True, + proxies=self.proxies, + ) if response.status_code != 200: raise SpotifyOauthError(response.reason) token_info = response.json() token_info = self._add_custom_values_to_token_info(token_info) self._save_token_info(token_info) - return token_info + return token_info if as_dict else token_info["access_token"] def _normalize_scope(self, scope): if scope: scopes = sorted(scope.split()) - return ' '.join(scopes) + return " ".join(scopes) else: return None def refresh_access_token(self, refresh_token): - payload = {'refresh_token': refresh_token, - 'grant_type': 'refresh_token'} + payload = { + "refresh_token": refresh_token, + "grant_type": "refresh_token", + } headers = self._make_authorization_headers() - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, proxies=self.proxies) + response = requests.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + proxies=self.proxies, + ) if response.status_code != 200: if False: # debugging code - print('headers', headers) - print('request', response.url) - self._warn("couldn't refresh token: code:%d reason:%s" - % (response.status_code, response.reason)) + print("headers", headers) + print("request", response.url) + self._warn( + "couldn't refresh token: code:%d reason:%s" + % (response.status_code, response.reason) + ) return None token_info = response.json() token_info = self._add_custom_values_to_token_info(token_info) - if 'refresh_token' not in token_info: - token_info['refresh_token'] = refresh_token + if "refresh_token" not in token_info: + token_info["refresh_token"] = refresh_token self._save_token_info(token_info) return token_info def _add_custom_values_to_token_info(self, token_info): - ''' + """ Store some values that aren't directly provided by a Web API response. - ''' - token_info['expires_at'] = int(time.time()) + token_info['expires_in'] - token_info['scope'] = self.scope + """ + token_info["expires_at"] = int(time.time()) + token_info["expires_in"] + token_info["scope"] = self.scope return token_info def _warn(self, msg): - print('warning:' + msg, file=sys.stderr) + print("warning:" + msg, file=sys.stderr) diff --git a/spotipy/util.py b/spotipy/util.py index d8b7585..ebe434b 100644 --- a/spotipy/util.py +++ b/spotipy/util.py @@ -4,29 +4,30 @@ from __future__ import print_function -__all__ = [ - 'CLIENT_CREDS_ENV_VARS', - 'prompt_for_user_token' -] +__all__ = ["CLIENT_CREDS_ENV_VARS", "prompt_for_user_token"] import os -from . import oauth2 - import spotipy CLIENT_CREDS_ENV_VARS = { - 'client_id': 'SPOTIPY_CLIENT_ID', - 'client_secret': 'SPOTIPY_CLIENT_SECRET', - 'client_username': 'SPOTIPY_CLIENT_USERNAME', - 'redirect_uri': 'SPOTIPY_REDIRECT_URI' + "client_id": "SPOTIPY_CLIENT_ID", + "client_secret": "SPOTIPY_CLIENT_SECRET", + "client_username": "SPOTIPY_CLIENT_USERNAME", + "redirect_uri": "SPOTIPY_REDIRECT_URI", } -def prompt_for_user_token(username, scope=None, client_id=None, - client_secret=None, redirect_uri=None, - cache_path=None): - ''' prompts the user to login if necessary and returns +def prompt_for_user_token( + username, + scope=None, + client_id=None, + client_secret=None, + redirect_uri=None, + cache_path=None, + oauth_manager=None, +): + """ prompts the user to login if necessary and returns the user token suitable for use with the spotipy.Spotify constructor @@ -38,35 +39,44 @@ def prompt_for_user_token(username, scope=None, client_id=None, - client_secret - the client secret of your app - redirect_uri - the redirect URI of your app - cache_path - path to location to save tokens + - oauth_manager - Oauth manager object. - ''' + """ + if not oauth_manager: + if not client_id: + client_id = os.getenv("SPOTIPY_CLIENT_ID") - if not client_id: - client_id = os.getenv('SPOTIPY_CLIENT_ID') + if not client_secret: + client_secret = os.getenv("SPOTIPY_CLIENT_SECRET") - if not client_secret: - client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') + if not redirect_uri: + redirect_uri = os.getenv("SPOTIPY_REDIRECT_URI") - if not redirect_uri: - redirect_uri = os.getenv('SPOTIPY_REDIRECT_URI') + if not client_id: + print( + """ + You need to set your Spotify API credentials. + You can do this by setting environment variables like so: - if not client_id: - print(''' - You need to set your Spotify API credentials. You can do this by - setting environment variables like so: + export SPOTIPY_CLIENT_ID='your-spotify-client-id' + export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret' + export SPOTIPY_REDIRECT_URI='your-app-redirect-url' - export SPOTIPY_CLIENT_ID='your-spotify-client-id' - export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret' - export SPOTIPY_REDIRECT_URI='your-app-redirect-url' + Get your credentials at + https://developer.spotify.com/my-applications + """ + ) + raise spotipy.SpotifyException(550, -1, "no credentials set") - Get your credentials at - https://developer.spotify.com/my-applications - ''') - raise spotipy.SpotifyException(550, -1, 'no credentials set') + cache_path = cache_path or ".cache-" + username - cache_path = cache_path or ".cache-" + username - sp_oauth = oauth2.SpotifyOAuth(client_id, client_secret, redirect_uri, - scope=scope, cache_path=cache_path) + sp_oauth = oauth_manager or spotipy.SpotifyOAuth( + client_id, + client_secret, + redirect_uri, + scope=scope, + cache_path=cache_path, + ) # try to get a valid token for this user, from the cache, # if not in the cache, the create a new (this will send @@ -75,37 +85,14 @@ def prompt_for_user_token(username, scope=None, client_id=None, token_info = sp_oauth.get_cached_token() if not token_info: - print(''' + url = sp_oauth.get_auth_response() + code = sp_oauth.parse_response_code(url) + token = sp_oauth.get_access_token(code, as_dict=False) + else: + return token_info["access_token"] - User authentication requires interaction with your - web browser. Once you enter your credentials and - give authorization, you will be redirected to - a url. Paste that url you were directed to to - complete the authorization. - - ''') - auth_url = sp_oauth.get_authorize_url() - try: - import webbrowser - webbrowser.open(auth_url) - print("Opened %s in your browser" % auth_url) - except BaseException: - print("Please navigate here: %s" % auth_url) - - print() - print() - try: - response = raw_input("Enter the URL you were redirected to: ") - except NameError: - response = input("Enter the URL you were redirected to: ") - - print() - print() - - code = sp_oauth.parse_response_code(response) - token_info = sp_oauth.get_access_token(code) # Auth'ed API request - if token_info: - return token_info['access_token'] + if token: + return token else: return None