diff --git a/CHANGELOG.md b/CHANGELOG.md index 2416b64..b6acc73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support for `playlist_cover_image` - Support `after` and `before` parameter in `current_user_recently_played` - CI for unit tests + - Automatic `token` refresh + - `auth_manager` and `oauth_manager` optional parameters added to `Spotify`'s init. + - Optional `username` parameter to be passed to SpotifyOAuth, to infer a `cache_path` automatically + - Optional `as_dict` parameter to control `SpotifyOAuth`'s `get_access_token` output type. However, this is going to be deprecated in the future, and the method will always return a token string + +### Changed + - Both `SpotifyClientCredentials` and `SpotifyOAuth` inherit from a common `SpotifyAuthBase` which handles common parameters and logics. ## [2.7.1] - 2020-01-20 diff --git a/examples/simple4.py b/examples/simple4.py new file mode 100644 index 0000000..c8761c7 --- /dev/null +++ b/examples/simple4.py @@ -0,0 +1,12 @@ +import spotipy +import os +from pprint import pprint + +def main(): + spotify = spotipy.Spotify(auth_manager=spotipy.SpotifyOAuth()) + me = spotify.me() + pprint(me) + + +if __name__ == "__main__": + main() diff --git a/spotipy/client.py b/spotipy/client.py index 621b4e6..ca7aeee 100644 --- a/spotipy/client.py +++ b/spotipy/client.py @@ -4,10 +4,7 @@ from __future__ import print_function -__all__ = [ - 'Spotify', - 'SpotifyException' -] +__all__ = ["Spotify", "SpotifyException"] import json import sys @@ -30,8 +27,9 @@ class SpotifyException(Exception): self.headers = headers def __str__(self): - return 'http status: {0}, code:{1} - {2}'.format( - self.http_status, self.code, self.msg) + return "http status: {0}, code:{1} - {2}".format( + self.http_status, self.code, self.msg + ) class Spotify(object): @@ -57,9 +55,16 @@ class Spotify(object): trace_out = False max_get_retries = 10 - def __init__(self, auth=None, requests_session=True, - client_credentials_manager=None, proxies=None, - requests_timeout=None): + def __init__( + self, + auth=None, + requests_session=True, + client_credentials_manager=None, + oauth_manager=None, + auth_manager=None, + proxies=None, + requests_timeout=None, + ): """ Creates a Spotify API client. @@ -71,15 +76,21 @@ class Spotify(object): for performance reasons (connection pooling). :param client_credentials_manager: SpotifyClientCredentials object + :param oauth_manager: + SpotifyOAuth object + :param auth_manager: + SpotifyOauth object or SpotifyClientCredentials object :param proxies: Definition of proxies (optional) :param requests_timeout: Tell Requests to stop waiting for a response after a given number of seconds """ - self.prefix = 'https://api.spotify.com/v1/' + self.prefix = "https://api.spotify.com/v1/" self._auth = auth self.client_credentials_manager = client_credentials_manager + self.oauth_manager = oauth_manager + self.auth_manager = auth_manager self.proxies = proxies self.requests_timeout = requests_timeout @@ -90,45 +101,60 @@ class Spotify(object): self._session = requests.Session() else: # Use the Requests API module as a "session". from requests import api + self._session = api + @property + def auth_manager(self): + return self._auth_manager + + @auth_manager.setter + def auth_manager(self, auth_manager): + if auth_manager is not None: + self._auth_manager = auth_manager + else: + self._auth_manager = ( + self.client_credentials_manager or self.oauth_manager + ) + def _auth_headers(self): if self._auth: - return {'Authorization': 'Bearer {0}'.format(self._auth)} - elif self.client_credentials_manager: - token = self.client_credentials_manager.get_access_token() - return {'Authorization': 'Bearer {0}'.format(token)} + return {"Authorization": "Bearer {0}".format(self._auth)} + elif self.auth_manager: + token = self.auth_manager.get_access_token(as_dict=False) + return {"Authorization": "Bearer {0}".format(token)} else: return {} def _internal_call(self, method, url, payload, params): args = dict(params=params) args["timeout"] = self.requests_timeout - if not url.startswith('http'): + if not url.startswith("http"): url = self.prefix + url headers = self._auth_headers() - if 'content_type' in args['params']: - headers['Content-Type'] = args['params']['content_type'] - del args['params']['content_type'] + if "content_type" in args["params"]: + headers["Content-Type"] = args["params"]["content_type"] + del args["params"]["content_type"] if payload: args["data"] = payload else: - headers['Content-Type'] = 'application/json' + headers["Content-Type"] = "application/json" if payload: args["data"] = json.dumps(payload) if self.trace_out: print(url) - with self._session.request(method, url, headers=headers, - proxies=self.proxies, **args) as r: + with self._session.request( + method, url, headers=headers, proxies=self.proxies, **args + ) as r: if self.trace: # pragma: no cover print() - print('Request headers:', headers) - print('Response headers:', r.headers) - print('HTTP status', r.status_code) + print("Request headers:", headers) + print("Response headers:", r.headers) + print("HTTP status", r.status_code) print(method, r.url) if payload: print("Data", json.dumps(payload)) @@ -137,12 +163,15 @@ class Spotify(object): r.raise_for_status() except BaseException: try: - msg = r.json()['error']['message'] + msg = r.json()["error"]["message"] except BaseException: - msg = 'error' - raise SpotifyException(r.status_code, - -1, '%s:\n %s' % (r.url, msg), - headers=r.headers) + msg = "error" + raise SpotifyException( + r.status_code, + -1, + "%s:\n %s" % (r.url, msg), + headers=r.headers, + ) try: results = r.json() @@ -150,7 +179,7 @@ class Spotify(object): results = None if self.trace: # pragma: no cover - print('Response:', results) + print("Response:", results) print() return results @@ -161,7 +190,7 @@ class Spotify(object): delay = 1 while retries > 0: try: - return self._internal_call('GET', url, payload, kwargs) + return self._internal_call("GET", url, payload, kwargs) except SpotifyException as e: retries -= 1 status = e.http_status @@ -171,21 +200,22 @@ class Spotify(object): raise else: sleep_seconds = int( - e.headers.get('Retry-After', delay)) - print('retrying ...' + str(sleep_seconds) + 'secs') + e.headers.get("Retry-After", delay) + ) + print("retrying ..." + str(sleep_seconds) + "secs") time.sleep(sleep_seconds + 1) delay += 1 else: raise except Exception as e: raise - print('exception', str(e)) + print("exception", str(e)) # some other exception. Requests have # been know to throw a BadStatusLine exception retries -= 1 if retries >= 0: - sleep_seconds = int(e.headers.get('Retry-After', delay)) - print('retrying ...' + str(delay) + 'secs') + sleep_seconds = int(e.headers.get("Retry-After", delay)) + print("retrying ..." + str(delay) + "secs") time.sleep(sleep_seconds + 1) delay += 1 else: @@ -194,17 +224,17 @@ class Spotify(object): def _post(self, url, args=None, payload=None, **kwargs): if args: kwargs.update(args) - return self._internal_call('POST', url, payload, kwargs) + return self._internal_call("POST", url, payload, kwargs) def _delete(self, url, args=None, payload=None, **kwargs): if args: kwargs.update(args) - return self._internal_call('DELETE', url, payload, kwargs) + return self._internal_call("DELETE", url, payload, kwargs) def _put(self, url, args=None, payload=None, **kwargs): if args: kwargs.update(args) - return self._internal_call('PUT', url, payload, kwargs) + return self._internal_call("PUT", url, payload, kwargs) def next(self, result): """ returns the next result given a paged result @@ -212,8 +242,8 @@ class Spotify(object): Parameters: - result - a previously returned paged result """ - if result['next']: - return self._get(result['next']) + if result["next"]: + return self._get(result["next"]) else: return None @@ -223,16 +253,16 @@ class Spotify(object): Parameters: - result - a previously returned paged result """ - if result['previous']: - return self._get(result['previous']) + if result["previous"]: + return self._get(result["previous"]) else: return None def _warn_old(self, msg): - print('warning:' + msg, file=sys.stderr) + print("warning:" + msg, file=sys.stderr) def _warn(self, msg, *args): - print('warning:' + msg.format(*args), file=sys.stderr) + print("warning:" + msg.format(*args), file=sys.stderr) def track(self, track_id): """ returns a single track given the track's ID, URI or URL @@ -241,8 +271,8 @@ class Spotify(object): - track_id - a spotify URI, URL or ID """ - trid = self._get_id('track', track_id) - return self._get('tracks/' + trid) + trid = self._get_id("track", track_id) + return self._get("tracks/" + trid) def tracks(self, tracks, market=None): """ returns a list of tracks given a list of track IDs, URIs, or URLs @@ -252,8 +282,8 @@ class Spotify(object): - market - an ISO 3166-1 alpha-2 country code. """ - tlist = [self._get_id('track', t) for t in tracks] - return self._get('tracks/?ids=' + ','.join(tlist), market=market) + tlist = [self._get_id("track", t) for t in tracks] + return self._get("tracks/?ids=" + ",".join(tlist), market=market) def artist(self, artist_id): """ returns a single artist given the artist's ID, URI or URL @@ -262,8 +292,8 @@ class Spotify(object): - artist_id - an artist ID, URI or URL """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid) + trid = self._get_id("artist", artist_id) + return self._get("artists/" + trid) def artists(self, artists): """ returns a list of artists given the artist IDs, URIs, or URLs @@ -272,11 +302,12 @@ class Spotify(object): - artists - a list of artist IDs, URIs or URLs """ - tlist = [self._get_id('artist', a) for a in artists] - return self._get('artists/?ids=' + ','.join(tlist)) + tlist = [self._get_id("artist", a) for a in artists] + return self._get("artists/?ids=" + ",".join(tlist)) - def artist_albums(self, artist_id, album_type=None, country=None, limit=20, - offset=0): + def artist_albums( + self, artist_id, album_type=None, country=None, limit=20, offset=0 + ): """ Get Spotify catalog information about an artist's albums Parameters: @@ -287,11 +318,16 @@ class Spotify(object): - offset - the index of the first album to return """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/albums', album_type=album_type, - country=country, limit=limit, offset=offset) + trid = self._get_id("artist", artist_id) + return self._get( + "artists/" + trid + "/albums", + album_type=album_type, + country=country, + limit=limit, + offset=offset, + ) - def artist_top_tracks(self, artist_id, country='US'): + def artist_top_tracks(self, artist_id, country="US"): """ Get Spotify catalog information about an artist's top 10 tracks by country. @@ -300,8 +336,8 @@ class Spotify(object): - country - limit the response to one particular country. """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/top-tracks', country=country) + trid = self._get_id("artist", artist_id) + return self._get("artists/" + trid + "/top-tracks", country=country) def artist_related_artists(self, artist_id): """ Get Spotify catalog information about artists similar to an @@ -311,8 +347,8 @@ class Spotify(object): Parameters: - artist_id - the artist ID, URI or URL """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/related-artists') + trid = self._get_id("artist", artist_id) + return self._get("artists/" + trid + "/related-artists") def album(self, album_id): """ returns a single album given the album's ID, URIs or URL @@ -321,8 +357,8 @@ class Spotify(object): - album_id - the album ID, URI or URL """ - trid = self._get_id('album', album_id) - return self._get('albums/' + trid) + trid = self._get_id("album", album_id) + return self._get("albums/" + trid) def album_tracks(self, album_id, limit=50, offset=0): """ Get Spotify catalog information about an album's tracks @@ -333,9 +369,10 @@ class Spotify(object): - offset - the index of the first item to return """ - trid = self._get_id('album', album_id) - return self._get('albums/' + trid + '/tracks/', limit=limit, - offset=offset) + trid = self._get_id("album", album_id) + return self._get( + "albums/" + trid + "/tracks/", limit=limit, offset=offset + ) def albums(self, albums): """ returns a list of albums given the album IDs, URIs, or URLs @@ -344,10 +381,10 @@ class Spotify(object): - albums - a list of album IDs, URIs or URLs """ - tlist = [self._get_id('album', a) for a in albums] - return self._get('albums/?ids=' + ','.join(tlist)) + tlist = [self._get_id("album", a) for a in albums] + return self._get("albums/?ids=" + ",".join(tlist)) - def search(self, q, limit=10, offset=0, type='track', market=None): + def search(self, q, limit=10, offset=0, type="track", market=None): """ searches for an item Parameters: @@ -360,8 +397,9 @@ class Spotify(object): - market - An ISO 3166-1 alpha-2 country code or the string from_token. """ - return self._get('search', q=q, limit=limit, - offset=offset, type=type, market=market) + return self._get( + "search", q=q, limit=limit, offset=offset, type=type, market=market + ) def user(self, user): """ Gets basic profile information about a Spotify User @@ -369,7 +407,7 @@ class Spotify(object): Parameters: - user - the id of the usr """ - return self._get('users/' + user) + return self._get("users/" + user) def current_user_playlists(self, limit=50, offset=0): """ Get current user playlists without required getting his profile @@ -388,11 +426,12 @@ class Spotify(object): - market - An ISO 3166-1 alpha-2 country code or the string from_token. """ - plid = self._get_id('playlist', playlist_id) + plid = self._get_id("playlist", playlist_id) return self._get("playlists/%s" % (plid), fields=fields, market=market) - def playlist_tracks(self, playlist_id, fields=None, - limit=100, offset=0, market=None): + def playlist_tracks( + self, playlist_id, fields=None, limit=100, offset=0, market=None + ): """ Get full details of the tracks of a playlist. Parameters: @@ -402,10 +441,14 @@ class Spotify(object): - offset - the index of the first track to return - market - an ISO 3166-1 alpha-2 country code. """ - plid = self._get_id('playlist', playlist_id) - return self._get("playlists/%s/tracks" % (plid), - limit=limit, offset=offset, fields=fields, - market=market) + plid = self._get_id("playlist", playlist_id) + return self._get( + "playlists/%s/tracks" % (plid), + limit=limit, + offset=offset, + fields=fields, + market=market, + ) def playlist_cover_image(self, playlist_id): """ Get cover of a playlist. @@ -413,12 +456,10 @@ class Spotify(object): Parameters: - playlist_id - the id of the playlist """ - plid = self._get_id('playlist', playlist_id) + plid = self._get_id("playlist", playlist_id) return self._get("playlists/%s/images" % (plid)) - def playlist_upload_cover_image(self, - playlist_id, - image_b64): + def playlist_upload_cover_image(self, playlist_id, image_b64): """ Replace the image used to represent a specific playlist Parameters: @@ -426,16 +467,18 @@ class Spotify(object): - image_b64 - image data as a Base64 encoded JPEG image string (maximum payload size is 256 KB) """ - plid = self._get_id('playlist', playlist_id) - return self._put("playlists/{}/images".format(plid), - payload=image_b64, - content_type="image/jpeg") + plid = self._get_id("playlist", playlist_id) + return self._put( + "playlists/{}/images".format(plid), + payload=image_b64, + content_type="image/jpeg", + ) - def user_playlist(self, user, playlist_id=None, - fields=None, market=None): + def user_playlist(self, user, playlist_id=None, fields=None, market=None): warnings.warn( "You should use `playlist(playlist_id)` instead", - DeprecationWarning) + DeprecationWarning, + ) """ Gets playlist of a user @@ -448,11 +491,19 @@ class Spotify(object): return self._get("users/%s/starred" % user) return self.playlist(playlist_id, fields=fields, market=market) - def user_playlist_tracks(self, user=None, playlist_id=None, fields=None, - limit=100, offset=0, market=None): + def user_playlist_tracks( + self, + user=None, + playlist_id=None, + fields=None, + limit=100, + offset=0, + market=None, + ): warnings.warn( "You should use `playlist_tracks(playlist_id)` instead", - DeprecationWarning) + DeprecationWarning, + ) """ Get full details of the tracks of a playlist owned by a user. @@ -464,8 +515,13 @@ class Spotify(object): - offset - the index of the first track to return - market - an ISO 3166-1 alpha-2 country code. """ - return self.playlist_tracks(playlist_id, limit=limit, offset=offset, - fields=fields, market=market) + return self.playlist_tracks( + playlist_id, + limit=limit, + offset=offset, + fields=fields, + market=market, + ) def user_playlists(self, user, limit=50, offset=0): """ Gets playlists of a user @@ -475,10 +531,11 @@ class Spotify(object): - limit - the number of items to return - offset - the index of the first item to return """ - return self._get("users/%s/playlists" % user, limit=limit, - offset=offset) + return self._get( + "users/%s/playlists" % user, limit=limit, offset=offset + ) - def user_playlist_create(self, user, name, public=True, description=''): + def user_playlist_create(self, user, name, public=True, description=""): """ Creates a playlist for a user Parameters: @@ -487,13 +544,19 @@ class Spotify(object): - public - is the created playlist public - description - the description of the playlist """ - data = {'name': name, 'public': public, 'description': description} + data = {"name": name, "public": public, "description": description} return self._post("users/%s/playlists" % (user,), payload=data) def user_playlist_change_details( - self, user, playlist_id, name=None, public=None, - collaborative=None, description=None): + self, + user, + playlist_id, + name=None, + public=None, + collaborative=None, + description=None, + ): """ Changes a playlist's name and/or public/private state Parameters: @@ -507,15 +570,16 @@ class Spotify(object): data = {} if isinstance(name, six.string_types): - data['name'] = name + data["name"] = name if isinstance(public, bool): - data['public'] = public + data["public"] = public if isinstance(collaborative, bool): - data['collaborative'] = collaborative + data["collaborative"] = collaborative if isinstance(description, six.string_types): - data['description'] = description - return self._put("users/%s/playlists/%s" % (user, playlist_id), - payload=data) + data["description"] = description + return self._put( + "users/%s/playlists/%s" % (user, playlist_id), payload=data + ) def user_playlist_unfollow(self, user, playlist_id): """ Unfollows (deletes) a playlist for a user @@ -524,11 +588,13 @@ class Spotify(object): - user - the id of the user - name - the name of the playlist """ - return self._delete("users/%s/playlists/%s/followers" % - (user, playlist_id)) + return self._delete( + "users/%s/playlists/%s/followers" % (user, playlist_id) + ) - def user_playlist_add_tracks(self, user, playlist_id, tracks, - position=None): + def user_playlist_add_tracks( + self, user, playlist_id, tracks, position=None + ): """ Adds tracks to a playlist Parameters: @@ -537,10 +603,13 @@ class Spotify(object): - tracks - a list of track URIs, URLs or IDs - position - the position to add the tracks """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] - return self._post("users/%s/playlists/%s/tracks" % (user, plid), - payload=ftracks, position=position) + plid = self._get_id("playlist", playlist_id) + ftracks = [self._get_uri("track", tid) for tid in tracks] + return self._post( + "users/%s/playlists/%s/tracks" % (user, plid), + payload=ftracks, + position=position, + ) def user_playlist_replace_tracks(self, user, playlist_id, tracks): """ Replace all tracks in a playlist @@ -550,15 +619,22 @@ class Spotify(object): - playlist_id - the id of the playlist - tracks - the list of track ids to add to the playlist """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] + plid = self._get_id("playlist", playlist_id) + ftracks = [self._get_uri("track", tid) for tid in tracks] payload = {"uris": ftracks} - return self._put("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._put( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_reorder_tracks( - self, user, playlist_id, range_start, insert_before, - range_length=1, snapshot_id=None): + self, + user, + playlist_id, + range_start, + insert_before, + range_length=1, + snapshot_id=None, + ): """ Reorder tracks in a playlist Parameters: @@ -571,17 +647,21 @@ class Spotify(object): inserted - snapshot_id - optional playlist's snapshot ID """ - plid = self._get_id('playlist', playlist_id) - payload = {"range_start": range_start, - "range_length": range_length, - "insert_before": insert_before} + plid = self._get_id("playlist", playlist_id) + payload = { + "range_start": range_start, + "range_length": range_length, + "insert_before": insert_before, + } if snapshot_id: payload["snapshot_id"] = snapshot_id - return self._put("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._put( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_remove_all_occurrences_of_tracks( - self, user, playlist_id, tracks, snapshot_id=None): + self, user, playlist_id, tracks, snapshot_id=None + ): """ Removes all occurrences of the given tracks from the given playlist Parameters: @@ -592,16 +672,18 @@ class Spotify(object): """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] + plid = self._get_id("playlist", playlist_id) + ftracks = [self._get_uri("track", tid) for tid in tracks] payload = {"tracks": [{"uri": track} for track in ftracks]} if snapshot_id: payload["snapshot_id"] = snapshot_id - return self._delete("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._delete( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_remove_specific_occurrences_of_tracks( - self, user, playlist_id, tracks, snapshot_id=None): + self, user, playlist_id, tracks, snapshot_id=None + ): """ Removes all occurrences of the given tracks from the given playlist Parameters: @@ -615,18 +697,21 @@ class Spotify(object): - snapshot_id - optional id of the playlist snapshot """ - plid = self._get_id('playlist', playlist_id) + plid = self._get_id("playlist", playlist_id) ftracks = [] for tr in tracks: - ftracks.append({ - "uri": self._get_uri("track", tr["uri"]), - "positions": tr["positions"], - }) + ftracks.append( + { + "uri": self._get_uri("track", tr["uri"]), + "positions": tr["positions"], + } + ) payload = {"tracks": ftracks} if snapshot_id: payload["snapshot_id"] = snapshot_id - return self._delete("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) + return self._delete( + "users/%s/playlists/%s/tracks" % (user, plid), payload=payload + ) def user_playlist_follow_playlist(self, playlist_owner_id, playlist_id): """ @@ -638,11 +723,14 @@ class Spotify(object): """ return self._put( - "users/{}/playlists/{}/followers".format(playlist_owner_id, - playlist_id)) + "users/{}/playlists/{}/followers".format( + playlist_owner_id, playlist_id + ) + ) def user_playlist_is_following( - self, playlist_owner_id, playlist_id, user_ids): + self, playlist_owner_id, playlist_id, user_ids + ): """ Check to see if the given users are following the given playlist @@ -654,15 +742,15 @@ class Spotify(object): """ endpoint = "users/{}/playlists/{}/followers/contains?ids={}" - return self._get(endpoint.format(playlist_owner_id, - playlist_id, - ','.join(user_ids))) + return self._get( + endpoint.format(playlist_owner_id, playlist_id, ",".join(user_ids)) + ) def me(self): """ Get detailed profile information about the current user. An alias for the 'current_user' method. """ - return self._get('me/') + return self._get("me/") def current_user(self): """ Get detailed profile information about the current user. @@ -671,9 +759,9 @@ class Spotify(object): return self.me() def current_user_playing_track(self): - ''' Get information about the current users currently playing track. - ''' - return self._get('me/player/currently-playing') + """ Get information about the current users currently playing track. + """ + return self._get("me/player/currently-playing") def current_user_saved_tracks(self, limit=20, offset=0): """ Gets a list of the tracks saved in the current authorized user's @@ -684,7 +772,7 @@ class Spotify(object): - offset - the index of the first track to return """ - return self._get('me/tracks', limit=limit, offset=offset) + return self._get("me/tracks", limit=limit, offset=offset) def current_user_followed_artists(self, limit=20, after=None): """ Gets a list of the artists followed by the current authorized user @@ -695,8 +783,9 @@ class Spotify(object): request """ - return self._get('me/following', type='artist', limit=limit, - after=after) + return self._get( + "me/following", type="artist", limit=limit, after=after + ) def current_user_saved_tracks_delete(self, tracks=None): """ Remove one or more tracks from the current user's @@ -707,8 +796,8 @@ class Spotify(object): """ tlist = [] if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._delete('me/tracks/?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + return self._delete("me/tracks/?ids=" + ",".join(tlist)) def current_user_saved_tracks_contains(self, tracks=None): """ Check if one or more tracks is already saved in @@ -719,8 +808,8 @@ class Spotify(object): """ tlist = [] if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._get('me/tracks/contains?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + return self._get("me/tracks/contains?ids=" + ",".join(tlist)) def current_user_saved_tracks_add(self, tracks=None): """ Add one or more tracks to the current user's @@ -731,11 +820,12 @@ class Spotify(object): """ tlist = [] if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._put('me/tracks/?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + return self._put("me/tracks/?ids=" + ",".join(tlist)) - def current_user_top_artists(self, limit=20, offset=0, - time_range='medium_term'): + def current_user_top_artists( + self, limit=20, offset=0, time_range="medium_term" + ): """ Get the current user's top artists Parameters: @@ -744,11 +834,13 @@ class Spotify(object): - time_range - Over what time frame are the affinities computed Valid-values: short_term, medium_term, long_term """ - return self._get('me/top/artists', time_range=time_range, limit=limit, - offset=offset) + return self._get( + "me/top/artists", time_range=time_range, limit=limit, offset=offset + ) - def current_user_top_tracks(self, limit=20, offset=0, - time_range='medium_term'): + def current_user_top_tracks( + self, limit=20, offset=0, time_range="medium_term" + ): """ Get the current user's top tracks Parameters: @@ -757,12 +849,12 @@ class Spotify(object): - time_range - Over what time frame are the affinities computed Valid-values: short_term, medium_term, long_term """ - return self._get('me/top/tracks', time_range=time_range, limit=limit, - offset=offset) + return self._get( + "me/top/tracks", time_range=time_range, limit=limit, offset=offset + ) - def current_user_recently_played(self, limit=50, after=None, - before=None): - ''' Get the current user's recently played tracks + def current_user_recently_played(self, limit=50, after=None, before=None): + """ Get the current user's recently played tracks Parameters: - limit - the number of entities to return @@ -772,9 +864,13 @@ class Spotify(object): - before - unix timestamp in milliseconds. Returns all items before (but not including) this cursor position. Cannot be used if after is specified - ''' - return self._get('me/player/recently-played', limit=limit, after=after, - before=before) + """ + return self._get( + "me/player/recently-played", + limit=limit, + after=after, + before=before, + ) def current_user_saved_albums(self, limit=20, offset=0): """ Gets a list of the albums saved in the current authorized user's @@ -785,7 +881,7 @@ class Spotify(object): - offset - the index of the first album to return """ - return self._get('me/albums', limit=limit, offset=offset) + return self._get("me/albums", limit=limit, offset=offset) def current_user_saved_albums_contains(self, albums=[]): """ Check if one or more albums is already saved in @@ -794,8 +890,8 @@ class Spotify(object): Parameters: - albums - a list of album URIs, URLs or IDs """ - alist = [self._get_id('album', a) for a in albums] - return self._get('me/albums/contains?ids=' + ','.join(alist)) + alist = [self._get_id("album", a) for a in albums] + return self._get("me/albums/contains?ids=" + ",".join(alist)) def current_user_saved_albums_add(self, albums=[]): """ Add one or more albums to the current user's @@ -803,8 +899,8 @@ class Spotify(object): Parameters: - albums - a list of album URIs, URLs or IDs """ - alist = [self._get_id('album', a) for a in albums] - return self._put('me/albums?ids=' + ','.join(alist)) + alist = [self._get_id("album", a) for a in albums] + return self._put("me/albums?ids=" + ",".join(alist)) def current_user_saved_albums_delete(self, albums=[]): """ Remove one or more albums from the current user's @@ -813,39 +909,40 @@ class Spotify(object): Parameters: - albums - a list of album URIs, URLs or IDs """ - alist = [self._get_id('album', a) for a in albums] - return self._delete('me/albums/?ids=' + ','.join(alist)) + alist = [self._get_id("album", a) for a in albums] + return self._delete("me/albums/?ids=" + ",".join(alist)) def user_follow_artists(self, ids=[]): - ''' Follow one or more artists + """ Follow one or more artists Parameters: - ids - a list of artist IDs - ''' - return self._put('me/following?type=artist&ids=' + ','.join(ids)) + """ + return self._put("me/following?type=artist&ids=" + ",".join(ids)) def user_follow_users(self, ids=[]): - ''' Follow one or more users + """ Follow one or more users Parameters: - ids - a list of user IDs - ''' - return self._put('me/following?type=user&ids=' + ','.join(ids)) + """ + return self._put("me/following?type=user&ids=" + ",".join(ids)) def user_unfollow_artists(self, ids=[]): - ''' Unfollow one or more artists + """ Unfollow one or more artists Parameters: - ids - a list of artist IDs - ''' - return self._delete('me/following?type=artist&ids=' + ','.join(ids)) + """ + return self._delete("me/following?type=artist&ids=" + ",".join(ids)) def user_unfollow_users(self, ids=[]): - ''' Unfollow one or more users + """ Unfollow one or more users Parameters: - ids - a list of user IDs - ''' - return self._delete('me/following?type=user&ids=' + ','.join(ids)) + """ + return self._delete("me/following?type=user&ids=" + ",".join(ids)) - def featured_playlists(self, locale=None, country=None, timestamp=None, - limit=20, offset=0): + def featured_playlists( + self, locale=None, country=None, timestamp=None, limit=20, offset=0 + ): """ Get a list of Spotify featured playlists Parameters: @@ -867,9 +964,14 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/featured-playlists', locale=locale, - country=country, timestamp=timestamp, limit=limit, - offset=offset) + return self._get( + "browse/featured-playlists", + locale=locale, + country=country, + timestamp=timestamp, + limit=limit, + offset=offset, + ) def new_releases(self, country=None, limit=20, offset=0): """ Get a list of new album releases featured in Spotify @@ -884,8 +986,9 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/new-releases', country=country, limit=limit, - offset=offset) + return self._get( + "browse/new-releases", country=country, limit=limit, offset=offset + ) def categories(self, country=None, locale=None, limit=20, offset=0): """ Get a list of new album releases featured in Spotify @@ -903,11 +1006,17 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/categories', country=country, locale=locale, - limit=limit, offset=offset) + return self._get( + "browse/categories", + country=country, + locale=locale, + limit=limit, + offset=offset, + ) - def category_playlists(self, category_id=None, country=None, limit=20, - offset=0): + def category_playlists( + self, category_id=None, country=None, limit=20, offset=0 + ): """ Get a list of new album releases featured in Spotify Parameters: @@ -922,11 +1031,22 @@ class Spotify(object): (the first object). Use with limit to get the next set of items. """ - return self._get('browse/categories/' + category_id + '/playlists', - country=country, limit=limit, offset=offset) + return self._get( + "browse/categories/" + category_id + "/playlists", + country=country, + limit=limit, + offset=offset, + ) - def recommendations(self, seed_artists=None, seed_genres=None, - seed_tracks=None, limit=20, country=None, **kwargs): + def recommendations( + self, + seed_artists=None, + seed_genres=None, + seed_tracks=None, + limit=20, + country=None, + **kwargs + ): """ Get a list of recommended tracks for one to five seeds. Parameters: @@ -950,38 +1070,52 @@ class Spotify(object): """ params = dict(limit=limit) if seed_artists: - params['seed_artists'] = ','.join( - [self._get_id('artist', a) for a in seed_artists]) + params["seed_artists"] = ",".join( + [self._get_id("artist", a) for a in seed_artists] + ) if seed_genres: - params['seed_genres'] = ','.join(seed_genres) + params["seed_genres"] = ",".join(seed_genres) if seed_tracks: - params['seed_tracks'] = ','.join( - [self._get_id('track', t) for t in seed_tracks]) + params["seed_tracks"] = ",".join( + [self._get_id("track", t) for t in seed_tracks] + ) if country: - params['market'] = country + params["market"] = country - for attribute in ["acousticness", "danceability", "duration_ms", - "energy", "instrumentalness", "key", "liveness", - "loudness", "mode", "popularity", "speechiness", - "tempo", "time_signature", "valence"]: + for attribute in [ + "acousticness", + "danceability", + "duration_ms", + "energy", + "instrumentalness", + "key", + "liveness", + "loudness", + "mode", + "popularity", + "speechiness", + "tempo", + "time_signature", + "valence", + ]: for prefix in ["min_", "max_", "target_"]: param = prefix + attribute if param in kwargs: params[param] = kwargs[param] - return self._get('recommendations', **params) + return self._get("recommendations", **params) def recommendation_genre_seeds(self): """ Get a list of genres available for the recommendations function. """ - return self._get('recommendations/available-genre-seeds') + return self._get("recommendations/available-genre-seeds") def audio_analysis(self, track_id): """ Get audio analysis for a track based upon its Spotify ID Parameters: - track_id - a track URI, URL or ID """ - trid = self._get_id('track', track_id) - return self._get('audio-analysis/' + trid) + trid = self._get_id("track", track_id) + return self._get("audio-analysis/" + trid) def audio_features(self, tracks=[]): """ Get audio features for one or multiple tracks based upon their Spotify IDs @@ -989,41 +1123,41 @@ class Spotify(object): - tracks - a list of track URIs, URLs or IDs, maximum: 50 ids """ if isinstance(tracks, str): - trackid = self._get_id('track', tracks) - results = self._get('audio-features/?ids=' + trackid) + trackid = self._get_id("track", tracks) + results = self._get("audio-features/?ids=" + trackid) else: - tlist = [self._get_id('track', t) for t in tracks] - results = self._get('audio-features/?ids=' + ','.join(tlist)) + tlist = [self._get_id("track", t) for t in tracks] + results = self._get("audio-features/?ids=" + ",".join(tlist)) # the response has changed, look for the new style first, and if # its not there, fallback on the old style - if 'audio_features' in results: - return results['audio_features'] + if "audio_features" in results: + return results["audio_features"] else: return results def devices(self): - ''' Get a list of user's available devices. - ''' + """ Get a list of user's available devices. + """ return self._get("me/player/devices") def current_playback(self, market=None): - ''' Get information about user's current playback. + """ Get information about user's current playback. Parameters: - market - an ISO 3166-1 alpha-2 country code. - ''' + """ return self._get("me/player", market=market) def currently_playing(self, market=None): - ''' Get user's currently playing track. + """ Get user's currently playing track. Parameters: - market - an ISO 3166-1 alpha-2 country code. - ''' + """ return self._get("me/player/currently-playing", market=market) def transfer_playback(self, device_id, force_play=True): - ''' Transfer playback to another device. + """ Transfer playback to another device. Note that the API accepts a list of device ids, but only actually supports one. @@ -1031,16 +1165,14 @@ class Spotify(object): - device_id - transfer playback to this device - force_play - true: after transfer, play. false: keep current state. - ''' - data = { - 'device_ids': [device_id], - 'play': force_play - } + """ + data = {"device_ids": [device_id], "play": force_play} return self._put("me/player", payload=data) - def start_playback(self, device_id=None, - context_uri=None, uris=None, offset=None): - ''' Start or resume user's playback. + def start_playback( + self, device_id=None, context_uri=None, uris=None, offset=None + ): + """ Start or resume user's playback. Provide a `context_uri` to start playback or a album, artist, or playlist. @@ -1056,138 +1188,151 @@ class Spotify(object): - context_uri - spotify context uri to play - uris - spotify track uris - offset - offset into context by index or track - ''' + """ if context_uri is not None and uris is not None: - self._warn('specify either context uri or uris, not both') + self._warn("specify either context uri or uris, not both") return if uris is not None and not isinstance(uris, list): - self._warn('uris must be a list') + self._warn("uris must be a list") return data = {} if context_uri is not None: - data['context_uri'] = context_uri + data["context_uri"] = context_uri if uris is not None: - data['uris'] = uris + data["uris"] = uris if offset is not None: - data['offset'] = offset - return self._put(self._append_device_id( - "me/player/play", device_id), payload=data) + data["offset"] = offset + return self._put( + self._append_device_id("me/player/play", device_id), payload=data + ) def pause_playback(self, device_id=None): - ''' Pause user's playback. + """ Pause user's playback. Parameters: - device_id - device target for playback - ''' + """ return self._put(self._append_device_id("me/player/pause", device_id)) def next_track(self, device_id=None): - ''' Skip user's playback to next track. + """ Skip user's playback to next track. Parameters: - device_id - device target for playback - ''' + """ return self._post(self._append_device_id("me/player/next", device_id)) def previous_track(self, device_id=None): - ''' Skip user's playback to previous track. + """ Skip user's playback to previous track. Parameters: - device_id - device target for playback - ''' - return self._post(self._append_device_id( - "me/player/previous", device_id)) + """ + return self._post( + self._append_device_id("me/player/previous", device_id) + ) def seek_track(self, position_ms, device_id=None): - ''' Seek to position in current track. + """ Seek to position in current track. Parameters: - position_ms - position in milliseconds to seek to - device_id - device target for playback - ''' + """ if not isinstance(position_ms, int): - self._warn('position_ms must be an integer') + self._warn("position_ms must be an integer") return - return self._put(self._append_device_id( - "me/player/seek?position_ms=%s" % position_ms, device_id)) + return self._put( + self._append_device_id( + "me/player/seek?position_ms=%s" % position_ms, device_id + ) + ) def repeat(self, state, device_id=None): - ''' Set repeat mode for playback. + """ Set repeat mode for playback. Parameters: - state - `track`, `context`, or `off` - device_id - device target for playback - ''' - if state not in ['track', 'context', 'off']: - self._warn('invalid state') + """ + if state not in ["track", "context", "off"]: + self._warn("invalid state") return self._put( self._append_device_id( - "me/player/repeat?state=%s" % - state, device_id)) + "me/player/repeat?state=%s" % state, device_id + ) + ) def volume(self, volume_percent, device_id=None): - ''' Set playback volume. + """ Set playback volume. Parameters: - volume_percent - volume between 0 and 100 - device_id - device target for playback - ''' + """ if not isinstance(volume_percent, int): - self._warn('volume must be an integer') + self._warn("volume must be an integer") return if volume_percent < 0 or volume_percent > 100: - self._warn('volume must be between 0 and 100, inclusive') + self._warn("volume must be between 0 and 100, inclusive") return self._put( self._append_device_id( - "me/player/volume?volume_percent=%s" % - volume_percent, device_id)) + "me/player/volume?volume_percent=%s" % volume_percent, + device_id, + ) + ) def shuffle(self, state, device_id=None): - ''' Toggle playback shuffling. + """ Toggle playback shuffling. Parameters: - state - true or false - device_id - device target for playback - ''' + """ if not isinstance(state, bool): - self._warn('state must be a boolean') + self._warn("state must be a boolean") return state = str(state).lower() self._put( self._append_device_id( - "me/player/shuffle?state=%s" % - state, device_id)) + "me/player/shuffle?state=%s" % state, device_id + ) + ) def _append_device_id(self, path, device_id): - ''' Append device ID to API path. + """ Append device ID to API path. Parameters: - device_id - device id to append - ''' + """ if device_id: - if '?' in path: + if "?" in path: path += "&device_id=%s" % device_id else: path += "?device_id=%s" % device_id return path def _get_id(self, type, id): - fields = id.split(':') + fields = id.split(":") if len(fields) >= 3: if type != fields[-2]: - self._warn('expected id of type %s but found type %s %s' % - (type, fields[-2], id)) + self._warn( + "expected id of type %s but found type %s %s" + % (type, fields[-2], id) + ) return fields[-1] - fields = id.split('/') + fields = id.split("/") if len(fields) >= 3: itype = fields[-2] if type != itype: - self._warn('expected id of type %s but found type %s %s' % - (type, itype, id)) - return fields[-1].split('?')[0] + self._warn( + "expected id of type %s but found type %s %s" + % (type, itype, id) + ) + return fields[-1].split("?")[0] return id def _get_uri(self, type, id): - return 'spotify:' + type + ":" + self._get_id(type, id) + return "spotify:" + type + ":" + self._get_id(type, id) diff --git a/spotipy/oauth2.py b/spotipy/oauth2.py index 1c8c3d1..1f00da2 100644 --- a/spotipy/oauth2.py +++ b/spotipy/oauth2.py @@ -3,10 +3,10 @@ from __future__ import print_function __all__ = [ - 'is_token_expired', - 'SpotifyClientCredentials', - 'SpotifyOAuth', - 'SpotifyOauthError' + "is_token_expired", + "SpotifyClientCredentials", + "SpotifyOAuth", + "SpotifyOauthError", ] import base64 @@ -14,8 +14,10 @@ import json import os import sys import time +import warnings import requests +from spotipy.util import CLIENT_CREDS_ENV_VARS # Workaround to support both python 2 & 3 import six @@ -28,19 +30,56 @@ class SpotifyOauthError(Exception): def _make_authorization_headers(client_id, client_secret): auth_header = base64.b64encode( - six.text_type( - '{}:{}'.format(client_id, client_secret) - ).encode('ascii')) - return {'Authorization': 'Basic %s' % auth_header.decode('ascii')} + six.text_type(client_id + ":" + client_secret).encode("ascii") + ) + return {"Authorization": "Basic %s" % auth_header.decode("ascii")} def is_token_expired(token_info): now = int(time.time()) - return token_info['expires_at'] - now < 60 + return token_info["expires_at"] - now < 60 -class SpotifyClientCredentials(object): - OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' +def _ensure_value(value, env_key): + env_val = CLIENT_CREDS_ENV_VARS[env_key] + _val = value or os.getenv(env_val) + if _val is None: + msg = "No %s. Pass it or set a %s environment variable." % ( + env_key, + env_val, + ) + raise SpotifyOauthError(msg) + return _val + + +class SpotifyAuthBase(object): + @property + def client_id(self): + return self._client_id + + @client_id.setter + def client_id(self, val): + self._client_id = _ensure_value(val, "client_id") + + @property + def client_secret(self): + return self._client_secret + + @client_secret.setter + def client_secret(self, val): + self._client_secret = _ensure_value(val, "client_secret") + + @property + def redirect_uri(self): + return self._redirect_uri + + @redirect_uri.setter + def redirect_uri(self, val): + self._redirect_uri = _ensure_value(val, "redirect_uri") + + +class SpotifyClientCredentials(SpotifyAuthBase): + OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token" def __init__(self, client_id=None, client_secret=None, proxies=None): """ @@ -48,17 +87,6 @@ class SpotifyClientCredentials(object): constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET environment variables """ - if not client_id: - client_id = os.getenv('SPOTIPY_CLIENT_ID') - - if not client_secret: - client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') - - if not client_id: - raise SpotifyOauthError('No client id') - - if not client_secret: - raise SpotifyOauthError('No client secret') self.client_id = client_id self.client_secret = client_secret @@ -71,23 +99,28 @@ class SpotifyClientCredentials(object): Else feches a new token and returns it """ if self.token_info and not self.is_token_expired(self.token_info): - return self.token_info['access_token'] + return self.token_info["access_token"] token_info = self._request_access_token() token_info = self._add_custom_values_to_token_info(token_info) self.token_info = token_info - return self.token_info['access_token'] + return self.token_info["access_token"] def _request_access_token(self): """Gets client credentials access token """ - payload = {'grant_type': 'client_credentials'} + payload = {"grant_type": "client_credentials"} headers = _make_authorization_headers( - self.client_id, self.client_secret) + self.client_id, self.client_secret + ) - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, verify=True, - proxies=self.proxies) + response = requests.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + verify=True, + proxies=self.proxies, + ) if response.status_code != 200: raise SpotifyOauthError(response.reason) token_info = response.json() @@ -101,21 +134,30 @@ class SpotifyClientCredentials(object): Store some values that aren't directly provided by a Web API response. """ - token_info['expires_at'] = int(time.time()) + token_info['expires_in'] + token_info["expires_at"] = int(time.time()) + token_info["expires_in"] return token_info -class SpotifyOAuth(object): - ''' +class SpotifyOAuth(SpotifyAuthBase): + """ Implements Authorization Code Flow for Spotify's OAuth implementation. - ''' + """ - OAUTH_AUTHORIZE_URL = 'https://accounts.spotify.com/authorize' - OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' + OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize" + OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token" - def __init__(self, client_id, client_secret, redirect_uri, - state=None, scope=None, cache_path=None, proxies=None): - ''' + def __init__( + self, + client_id=None, + client_secret=None, + redirect_uri=None, + state=None, + scope=None, + cache_path=None, + username=None, + proxies=None, + ): + """ Creates a SpotifyOAuth object Parameters: @@ -125,20 +167,32 @@ class SpotifyOAuth(object): - state - security state - scope - the desired scope of the request - cache_path - path to location to save tokens - ''' + - username - username of current client + """ self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.state = state self.cache_path = cache_path + self.username = username or os.getenv( + CLIENT_CREDS_ENV_VARS["client_username"] + ) self.scope = self._normalize_scope(scope) self.proxies = proxies def get_cached_token(self): - ''' Gets a cached auth token - ''' + """ Gets a cached auth token + """ token_info = None + + if not self.cache_path and self.username: + self.cache_path = ".cache-" + str(self.username) + elif not self.cache_path and not self.username: + raise SpotifyOauthError( + "You must either set a cache_path or a username." + ) + if self.cache_path: try: f = open(self.cache_path) @@ -147,13 +201,15 @@ class SpotifyOAuth(object): token_info = json.loads(token_info_string) # if scopes don't match, then bail - if 'scope' not in token_info or not self._is_scope_subset( - self.scope, token_info['scope']): + if "scope" not in token_info or not self._is_scope_subset( + self.scope, token_info["scope"] + ): return None if self.is_token_expired(token_info): token_info = self.refresh_access_token( - token_info['refresh_token']) + token_info["refresh_token"] + ) except IOError: pass @@ -162,7 +218,7 @@ class SpotifyOAuth(object): def _save_token_info(self, token_info): if self.cache_path: try: - f = open(self.cache_path, 'w') + f = open(self.cache_path, "w") f.write(json.dumps(token_info)) f.close() except IOError: @@ -171,8 +227,9 @@ class SpotifyOAuth(object): def _is_scope_subset(self, needle_scope, haystack_scope): needle_scope = set(needle_scope.split()) if needle_scope else set() - haystack_scope = set( - haystack_scope.split()) if haystack_scope else set() + haystack_scope = ( + set(haystack_scope.split()) if haystack_scope else set() + ) return needle_scope <= haystack_scope def is_token_expired(self, token_info): @@ -181,17 +238,19 @@ class SpotifyOAuth(object): def get_authorize_url(self, state=None, show_dialog=False): """ Gets the URL to use to authorize this app """ - payload = {'client_id': self.client_id, - 'response_type': 'code', - 'redirect_uri': self.redirect_uri} + payload = { + "client_id": self.client_id, + "response_type": "code", + "redirect_uri": self.redirect_uri, + } if self.scope: - payload['scope'] = self.scope + payload["scope"] = self.scope if state is None: state = self.state if state is not None: - payload['state'] = state + payload["state"] = state if show_dialog: - payload['show_dialog'] = True + payload["show_dialog"] = True urlparams = urllibparse.urlencode(payload) @@ -212,70 +271,138 @@ class SpotifyOAuth(object): def _make_authorization_headers(self): return _make_authorization_headers(self.client_id, self.client_secret) - def get_access_token(self, code): + def get_auth_response(self): + print( + """ + + User authentication requires interaction with your + web browser. Once you enter your credentials and + give authorization, you will be redirected to + a url. Paste that url you were directed to to + complete the authorization. + + """ + ) + auth_url = self.get_authorize_url() + try: + import webbrowser + + webbrowser.open(auth_url) + print("Opened %s in your browser" % auth_url) + except BaseException: + print("Please navigate here: %s" % auth_url) + print("") + print("") + try: + response = raw_input("Enter the URL you were redirected to: ") + except NameError: + response = input("Enter the URL you were redirected to: ") + print("") + print("") + return response + + def get_authorization_code(self, response=None): + return self.parse_response_code(response or self.get_auth_response()) + + def get_access_token(self, code=None, as_dict=True): """ Gets the access token for the app given the code Parameters: - code - the response code + - as_dict - a boolean indicating if returning the access token + as a token_info dictionary, otherwise it will be returned + as a string. """ + if as_dict: + print("") + warnings.warn( + "You're using 'as_dict = True'." + "get_access_token will return the token string directly in future " + "versions. Please adjust your code accordingly, or use " + "get_cached_token instead.", + DeprecationWarning, + stacklevel=2, + ) + print("") + token_info = self.get_cached_token() + if token_info is not None: + if is_token_expired(token_info): + token_info = self.refresh_access_token( + token_info["refresh_token"] + ) + return token_info if as_dict else token_info["access_token"] - payload = {'redirect_uri': self.redirect_uri, - 'code': code, - 'grant_type': 'authorization_code'} + payload = { + "redirect_uri": self.redirect_uri, + "code": code or self.get_authorization_code(), + "grant_type": "authorization_code", + } if self.scope: - payload['scope'] = self.scope + payload["scope"] = self.scope if self.state: - payload['state'] = self.state + payload["state"] = self.state headers = self._make_authorization_headers() - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, verify=True, - proxies=self.proxies) + response = requests.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + verify=True, + proxies=self.proxies, + ) if response.status_code != 200: raise SpotifyOauthError(response.reason) token_info = response.json() token_info = self._add_custom_values_to_token_info(token_info) self._save_token_info(token_info) - return token_info + return token_info if as_dict else token_info["access_token"] def _normalize_scope(self, scope): if scope: scopes = sorted(scope.split()) - return ' '.join(scopes) + return " ".join(scopes) else: return None def refresh_access_token(self, refresh_token): - payload = {'refresh_token': refresh_token, - 'grant_type': 'refresh_token'} + payload = { + "refresh_token": refresh_token, + "grant_type": "refresh_token", + } headers = self._make_authorization_headers() - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, proxies=self.proxies) + response = requests.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + proxies=self.proxies, + ) if response.status_code != 200: if False: # debugging code - print('headers', headers) - print('request', response.url) - self._warn("couldn't refresh token: code:%d reason:%s" - % (response.status_code, response.reason)) + print("headers", headers) + print("request", response.url) + self._warn( + "couldn't refresh token: code:%d reason:%s" + % (response.status_code, response.reason) + ) return None token_info = response.json() token_info = self._add_custom_values_to_token_info(token_info) - if 'refresh_token' not in token_info: - token_info['refresh_token'] = refresh_token + if "refresh_token" not in token_info: + token_info["refresh_token"] = refresh_token self._save_token_info(token_info) return token_info def _add_custom_values_to_token_info(self, token_info): - ''' + """ Store some values that aren't directly provided by a Web API response. - ''' - token_info['expires_at'] = int(time.time()) + token_info['expires_in'] - token_info['scope'] = self.scope + """ + token_info["expires_at"] = int(time.time()) + token_info["expires_in"] + token_info["scope"] = self.scope return token_info def _warn(self, msg): - print('warning:' + msg, file=sys.stderr) + print("warning:" + msg, file=sys.stderr) diff --git a/spotipy/util.py b/spotipy/util.py index d8b7585..ebe434b 100644 --- a/spotipy/util.py +++ b/spotipy/util.py @@ -4,29 +4,30 @@ from __future__ import print_function -__all__ = [ - 'CLIENT_CREDS_ENV_VARS', - 'prompt_for_user_token' -] +__all__ = ["CLIENT_CREDS_ENV_VARS", "prompt_for_user_token"] import os -from . import oauth2 - import spotipy CLIENT_CREDS_ENV_VARS = { - 'client_id': 'SPOTIPY_CLIENT_ID', - 'client_secret': 'SPOTIPY_CLIENT_SECRET', - 'client_username': 'SPOTIPY_CLIENT_USERNAME', - 'redirect_uri': 'SPOTIPY_REDIRECT_URI' + "client_id": "SPOTIPY_CLIENT_ID", + "client_secret": "SPOTIPY_CLIENT_SECRET", + "client_username": "SPOTIPY_CLIENT_USERNAME", + "redirect_uri": "SPOTIPY_REDIRECT_URI", } -def prompt_for_user_token(username, scope=None, client_id=None, - client_secret=None, redirect_uri=None, - cache_path=None): - ''' prompts the user to login if necessary and returns +def prompt_for_user_token( + username, + scope=None, + client_id=None, + client_secret=None, + redirect_uri=None, + cache_path=None, + oauth_manager=None, +): + """ prompts the user to login if necessary and returns the user token suitable for use with the spotipy.Spotify constructor @@ -38,35 +39,44 @@ def prompt_for_user_token(username, scope=None, client_id=None, - client_secret - the client secret of your app - redirect_uri - the redirect URI of your app - cache_path - path to location to save tokens + - oauth_manager - Oauth manager object. - ''' + """ + if not oauth_manager: + if not client_id: + client_id = os.getenv("SPOTIPY_CLIENT_ID") - if not client_id: - client_id = os.getenv('SPOTIPY_CLIENT_ID') + if not client_secret: + client_secret = os.getenv("SPOTIPY_CLIENT_SECRET") - if not client_secret: - client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') + if not redirect_uri: + redirect_uri = os.getenv("SPOTIPY_REDIRECT_URI") - if not redirect_uri: - redirect_uri = os.getenv('SPOTIPY_REDIRECT_URI') + if not client_id: + print( + """ + You need to set your Spotify API credentials. + You can do this by setting environment variables like so: - if not client_id: - print(''' - You need to set your Spotify API credentials. You can do this by - setting environment variables like so: + export SPOTIPY_CLIENT_ID='your-spotify-client-id' + export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret' + export SPOTIPY_REDIRECT_URI='your-app-redirect-url' - export SPOTIPY_CLIENT_ID='your-spotify-client-id' - export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret' - export SPOTIPY_REDIRECT_URI='your-app-redirect-url' + Get your credentials at + https://developer.spotify.com/my-applications + """ + ) + raise spotipy.SpotifyException(550, -1, "no credentials set") - Get your credentials at - https://developer.spotify.com/my-applications - ''') - raise spotipy.SpotifyException(550, -1, 'no credentials set') + cache_path = cache_path or ".cache-" + username - cache_path = cache_path or ".cache-" + username - sp_oauth = oauth2.SpotifyOAuth(client_id, client_secret, redirect_uri, - scope=scope, cache_path=cache_path) + sp_oauth = oauth_manager or spotipy.SpotifyOAuth( + client_id, + client_secret, + redirect_uri, + scope=scope, + cache_path=cache_path, + ) # try to get a valid token for this user, from the cache, # if not in the cache, the create a new (this will send @@ -75,37 +85,14 @@ def prompt_for_user_token(username, scope=None, client_id=None, token_info = sp_oauth.get_cached_token() if not token_info: - print(''' + url = sp_oauth.get_auth_response() + code = sp_oauth.parse_response_code(url) + token = sp_oauth.get_access_token(code, as_dict=False) + else: + return token_info["access_token"] - User authentication requires interaction with your - web browser. Once you enter your credentials and - give authorization, you will be redirected to - a url. Paste that url you were directed to to - complete the authorization. - - ''') - auth_url = sp_oauth.get_authorize_url() - try: - import webbrowser - webbrowser.open(auth_url) - print("Opened %s in your browser" % auth_url) - except BaseException: - print("Please navigate here: %s" % auth_url) - - print() - print() - try: - response = raw_input("Enter the URL you were redirected to: ") - except NameError: - response = input("Enter the URL you were redirected to: ") - - print() - print() - - code = sp_oauth.parse_response_code(response) - token_info = sp_oauth.get_access_token(code) # Auth'ed API request - if token_info: - return token_info['access_token'] + if token: + return token else: return None