From 38515689bc505e714825f0095577225e51179fdc Mon Sep 17 00:00:00 2001 From: IdmFoundInHim Date: Thu, 25 Jun 2020 16:39:51 -0400 Subject: [PATCH] Support Implicit Grant Authorization Flow (Fix #499) (#515) * Add SpotifyImplicitGrant with get_access_token and get_cached_token (and minimum related functions) * Add some overlooked necessary methods/values in SpotifyImplicitGrant * Remove unsuppported functionality and make SpotifyImplicitGrant public * Allow/Expose integration of SpotifyImplicitGrant in client * Add Implicit Grant tests and decrease abilities of prompt_for_user_token Remove Implicit Grant and state support from prompt_for_user_token * Add documentation and changelog entry * Touch up PEP8 compliance * Ignore long line with link for flake8 * Correct changelog * Restore compatibility with Python 2.7 * Correct help(SpotifyImplicitGrant.get_access_token) * Remove as_dict from SpotifyImplicitGrant.get_access_token * Combine status check functionality with implicit grant support In oauth2.py: * Add state checking to SpotifyImplicitGrant * Add dedicated SpotifyStateError as subclass of SpotifyOauthError * Moved `_get_user_input` from SpotifyOAuth to superclass SpotifyAuthBase * Renamed `parse_oauth_response_url` to `parse_auth_response_url` * Moved error handling into `parse_auth_response_url` Made minor changes in tests and client.py accordingly * Update changelog * Trim down tests for SpotifyImplicitGrant * Fix trailing whitespace --- CHANGELOG.md | 6 + spotipy/client.py | 13 +- spotipy/oauth2.py | 294 +++++++++++++++++++++-- spotipy/util.py | 3 +- tests/integration/test_user_endpoints.py | 34 +++ tests/unit/test_oauth.py | 130 +++++++++- 6 files changed, 451 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f035a6..f34fdc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added + - Added `SpotifyImplicitGrant` as an auth manager option. It provides + user authentication without a client secret but sacrifices the ability + to refresh the token without user input. (However, read the class + docstring for security advisory.) + - Added built-in verification of the `state` query parameter - Added two new attributes: error and error_description to `SpotifyOauthError` exception class to show authorization/authentication web api errors details. + - Added `SpotifyStateError` subclass of `SpotifyOauthError` - Allow extending `SpotifyClientCredentials` and `SpotifyOAuth` - Added the market paramter to `album_tracks` diff --git a/spotipy/client.py b/spotipy/client.py index ac72eb0..91a001b 100644 --- a/spotipy/client.py +++ b/spotipy/client.py @@ -63,7 +63,8 @@ class Spotify(object): :param oauth_manager: SpotifyOAuth object :param auth_manager: - SpotifyOauth object or SpotifyClientCredentials object + SpotifyOauth, SpotifyClientCredentials, + or SpotifyImplicitGrant object :param proxies: Definition of proxies (optional). See Requests doc https://2.python-requests.org/en/master/user/advanced/#proxies @@ -138,11 +139,13 @@ class Spotify(object): def _auth_headers(self): if self._auth: return {"Authorization": "Bearer {0}".format(self._auth)} - elif self.auth_manager: - token = self.auth_manager.get_access_token(as_dict=False) - return {"Authorization": "Bearer {0}".format(token)} - else: + if not self.auth_manager: return {} + try: + token = self.auth_manager.get_access_token(as_dict=False) + except TypeError: + token = self.auth_manager.get_access_token() + return {"Authorization": "Bearer {0}".format(token)} def _internal_call(self, method, url, payload, params): args = dict(params=params) diff --git a/spotipy/oauth2.py b/spotipy/oauth2.py index 06dd886..4dbefd7 100644 --- a/spotipy/oauth2.py +++ b/spotipy/oauth2.py @@ -5,6 +5,8 @@ __all__ = [ "SpotifyClientCredentials", "SpotifyOAuth", "SpotifyOauthError", + "SpotifyStateError", + "SpotifyImplicitGrant", ] import base64 @@ -29,12 +31,26 @@ logger = logging.getLogger(__name__) class SpotifyOauthError(Exception): + """ Error during Auth Code or Implicit Grant flow """ def __init__(self, message, error=None, error_description=None, *args, **kwargs): self.error = error self.error_description = error_description + self.__dict__.update(kwargs) super(SpotifyOauthError, self).__init__(message, *args, **kwargs) +class SpotifyStateError(SpotifyOauthError): + """ The state sent and state recieved were different """ + def __init__(self, local_state=None, remote_state=None, message=None, + error=None, error_description=None, *args, **kwargs): + if not message: + message = ("Expected " + local_state + " but recieved " + + remote_state) + super(SpotifyOauthError, self).__init__(message, error, + error_description, *args, + **kwargs) + + def _make_authorization_headers(client_id, client_secret): auth_header = base64.b64encode( six.text_type(client_id + ":" + client_secret).encode("ascii") @@ -94,6 +110,13 @@ class SpotifyAuthBase(object): def redirect_uri(self, val): self._redirect_uri = _ensure_value(val, "redirect_uri") + @staticmethod + def _get_user_input(prompt): + try: + return raw_input(prompt) + except NameError: + return input(prompt) + def __del__(self): """Make sure the connection (pool) gets closed""" if isinstance(self._session, requests.Session): @@ -322,24 +345,21 @@ class SpotifyOAuth(SpotifyAuthBase): Parameters: - url - the response url """ - _, code, _ = self.parse_oauth_response_url(url) + _, code = self.parse_auth_response_url(url) if code is None: return url else: return code @staticmethod - def parse_oauth_response_url(url): + def parse_auth_response_url(url): query_s = urlparse(url).query form = dict(parse_qsl(query_s)) - return tuple(form.get(param) for param in ['state', 'code', 'error']) - - @staticmethod - def _get_user_input(prompt): - try: - return raw_input(prompt) - except NameError: - return input(prompt) + if "error" in form: + raise SpotifyOauthError("Received error from auth server: " + "{}".format(form["error"]), + error=form["error"]) + return tuple(form.get(param) for param in ["state", "code"]) def _make_authorization_headers(self): return _make_authorization_headers(self.client_id, self.client_secret) @@ -355,9 +375,9 @@ class SpotifyOAuth(SpotifyAuthBase): def _get_auth_response_interactive(self): self._open_auth_url() response = SpotifyOAuth._get_user_input("Enter the URL you were redirected to: ") - state, code, _ = SpotifyOAuth.parse_oauth_response_url(response) + state, code = SpotifyOAuth.parse_auth_response_url(response) if self.state is not None and self.state != state: - raise SpotifyOauthError("Received inconsistent state from OAuth server.") + raise SpotifyStateError(self.state, state) return code def _get_auth_response_local_server(self, redirect_port): @@ -366,7 +386,7 @@ class SpotifyOAuth(SpotifyAuthBase): server.handle_request() if self.state is not None and server.state != self.state: - raise SpotifyOauthError("Received inconsistent state from OAuth server.") + raise SpotifyStateError(self.state, server.state) if server.auth_code is not None: return server.auth_code @@ -520,12 +540,251 @@ class SpotifyOAuth(SpotifyAuthBase): return token_info +class SpotifyImplicitGrant(SpotifyAuthBase): + """ Implements Implicit Grant Flow for client apps + + This auth manager enables *user and non-user* endpoints with only + a client secret, redirect uri, and username. The user will need to + copy and paste a URI from the browser every hour. + + Security Advisory + ----------------- + The Implicit Grant Flow is part of the + [OAuth 2.0 standard](https://oauth.net/2/grant-types/implicit/). + It is intended for client-side (running in browser or a native app) + interactions where the client secret would have to be hard-coded and + exposed. OAuth no longer recommends its use because sensitive + info (the auth token) can be yanked from the browser address bar or + history, instead recommending the Auth Code flow with PKCE. However, + Spotify [does not support PKCE](https://community.spotify.com/t5/Spotify-for-Developers/Authentication-API-failing-in-production-right-now/m-p/4960693/highlight/true#M234), + so Implicit Grant is the only viable options for client-side Spotify + API requests. + """ + OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize" + + def __init__(self, + client_id=None, + redirect_uri=None, + state=None, + scope=None, + cache_path=None, + username=None, + show_dialog=False): + """ Creates Auth Manager using the Implicit Grant flow + + **See help(SpotifyImplictGrant) for Security Advisory** + + Parameters + ---------- + * client_id: Must be supplied or set as environment variable + * redirect_uri: Must be supplied or set as environment variable + * state: May be supplied, no verification is performed + * scope: May be supplied, intuitively converted to proper format + * cache_path: May be supplied, will otherwise be generated + * username: Must be supplied or set as environment variable + * show_dialog: Interpreted as boolean + """ + self.client_id = client_id + self.redirect_uri = redirect_uri + self.state = state + self.cache_path = cache_path + self.username = username or os.getenv( + CLIENT_CREDS_ENV_VARS["client_username"] + ) + self.scope = self._normalize_scope(scope) + self.show_dialog = show_dialog + self._session = None # As to not break inherited __del__ + + def get_cached_token(self): + """ Gets a cached auth token + """ + token_info = None + + if not self.cache_path and self.username: + self.cache_path = ".cache-" + str(self.username) + elif not self.cache_path and not self.username: + raise SpotifyOauthError( + "You must either set a cache_path or a username." + ) + + if self.cache_path: + try: + f = open(self.cache_path) + token_info_string = f.read() + f.close() + token_info = json.loads(token_info_string) + + # if scopes don't match, then bail + if "scope" not in token_info or not self._is_scope_subset( + self.scope, token_info["scope"] + ): + return None + + if self.is_token_expired(token_info): + return None + + except IOError: + pass + return token_info + + def _save_token_info(self, token_info): + if not self.cache_path and self.username: + self.cache_path = ".cache-" + str(self.username) + if self.cache_path: + try: + f = open(self.cache_path, "w") + f.write(json.dumps(token_info)) + f.close() + except IOError: + logger.warning('Couldn\'t write token to cache at: %s', + self.cache_path) + + def _is_scope_subset(self, needle_scope, haystack_scope): + needle_scope = set(needle_scope.split()) if needle_scope else set() + haystack_scope = ( + set(haystack_scope.split()) if haystack_scope else set() + ) + return needle_scope <= haystack_scope + + def is_token_expired(self, token_info): + return is_token_expired(token_info) + + def get_access_token(self, + state=None, + response=None, + check_cache=True): + """ Gets Auth Token from cache (preferred) or user interaction + + Parameters + ---------- + * state: May be given, overrides (without changing) self.state + * response: URI with token, can break expiration checks + * check_cache: Interpreted as boolean + """ + if check_cache: + token_info = self.get_cached_token() + if not (token_info is None or is_token_expired(token_info)): + return token_info["access_token"] + + if response: + token_info = self.parse_response_token(response) + else: + token_info = self.get_auth_response(state) + token_info = self._add_custom_values_to_token_info(token_info) + self._save_token_info(token_info) + + return token_info["access_token"] + + def _normalize_scope(self, scope): + if scope: + scopes = sorted(scope.split()) + return " ".join(scopes) + else: + return None + + def get_authorize_url(self, state=None): + """ Gets the URL to use to authorize this app """ + payload = { + "client_id": self.client_id, + "response_type": "token", + "redirect_uri": self.redirect_uri, + } + if self.scope: + payload["scope"] = self.scope + if state is None: + state = self.state + if state is not None: + payload["state"] = state + if self.show_dialog: + payload["show_dialog"] = True + + urlparams = urllibparse.urlencode(payload) + + return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams) + + def parse_response_token(self, url, state=None): + """ Parse the response code in the given response url """ + remote_state, token, t_type, exp_in = self.parse_auth_response_url(url) + if state is None: + state = self.state + if state is not None and remote_state != state: + raise SpotifyStateError(state, remote_state) + return {"access_token": token, "token_type": t_type, + "expires_in": exp_in, "state": state} + + @staticmethod + def parse_auth_response_url(url): + url_components = urlparse(url) + fragment_s = url_components.fragment + query_s = url_components.query + form = dict(i.split('=') for i + in (fragment_s or query_s or url).split('&')) + if "error" in form: + raise SpotifyOauthError("Received error from auth server: " + "{}".format(form["error"]), + state=form["state"]) + if "expires_in" in form: + form["expires_in"] = int(form["expires_in"]) + return tuple(form.get(param) for param in ["state", "access_token", + "token_type", "expires_in"]) + + def _open_auth_url(self, state=None): + auth_url = self.get_authorize_url(state) + try: + webbrowser.open(auth_url) + logger.info("Opened %s in your browser", auth_url) + except webbrowser.Error: + logger.error("Please navigate here: %s", auth_url) + + def get_auth_response(self, state=None): + """ Gets a new auth **token** with user interaction """ + logger.info('User authentication requires interaction with your ' + 'web browser. Once you enter your credentials and ' + 'give authorization, you will be redirected to ' + 'a url. Paste that url you were directed to to ' + 'complete the authorization.') + + redirect_info = urlparse(self.redirect_uri) + redirect_host, redirect_port = get_host_port(redirect_info.netloc) + # Implicit Grant tokens are returned in a hash fragment + # which is only available to the browser. Therefore, interactive + # URL retrival is required. + if (redirect_host in ("127.0.0.1", "localhost") + and redirect_info.scheme == "http" and redirect_port): + logger.warning('Using a local redirect URI with a ' + 'port, likely expecting automatic ' + 'retrieval. Due to technical limitations, ' + 'the authentication token cannot be ' + 'automatically retrieved and must be ' + 'copied and pasted.') + + self._open_auth_url(state) + logger.info('Paste that url you were directed to in order to ' + 'complete the authorization') + response = SpotifyImplicitGrant._get_user_input("Enter the URL you " + "were redirected to: ") + return self.parse_response_token(response, state) + + def _add_custom_values_to_token_info(self, token_info): + """ + Store some values that aren't directly provided by a Web API + response. + """ + token_info["expires_at"] = int(time.time()) + token_info["expires_in"] + token_info["scope"] = self.scope + return token_info + + class RequestHandler(BaseHTTPRequestHandler): def do_GET(self): - state, auth_code, error = SpotifyOAuth.parse_oauth_response_url(self.path) - self.server.state = state - self.server.auth_code = auth_code - self.server.error = error + self.server.auth_code = self.server.error = None + try: + state, auth_code = SpotifyOAuth.parse_auth_response_url(self.path) + self.server.state = state + self.server.auth_code = auth_code + except SpotifyOauthError as err: + self.server.state = err.state + self.server.error = err.error self.send_response(200) self.send_header("Content-Type", "text/html") @@ -560,5 +819,6 @@ def start_local_http_server(port, handler=RequestHandler): server = HTTPServer(("127.0.0.1", port), handler) server.allow_reuse_address = True server.auth_code = None + server.auth_token_form = None server.error = None return server diff --git a/spotipy/util.py b/spotipy/util.py index dd8e0d2..3a842cf 100644 --- a/spotipy/util.py +++ b/spotipy/util.py @@ -26,7 +26,6 @@ def prompt_for_user_token( client_id=None, client_secret=None, redirect_uri=None, - state=None, cache_path=None, oauth_manager=None, show_dialog=False @@ -51,6 +50,7 @@ def prompt_for_user_token( - redirect_uri - the redirect URI of your app - cache_path - path to location to save tokens - oauth_manager - Oauth manager object. + - show_dialog - If true, a login prompt always shows """ if not oauth_manager: @@ -85,7 +85,6 @@ def prompt_for_user_token( client_id, client_secret, redirect_uri, - state=state, scope=scope, cache_path=cache_path, show_dialog=show_dialog diff --git a/tests/integration/test_user_endpoints.py b/tests/integration/test_user_endpoints.py index 7b70398..490e8dd 100644 --- a/tests/integration/test_user_endpoints.py +++ b/tests/integration/test_user_endpoints.py @@ -5,6 +5,7 @@ from spotipy import ( prompt_for_user_token, Spotify, SpotifyException, + SpotifyImplicitGrant ) import unittest import requests @@ -395,3 +396,36 @@ class SpotipyPlayerApiTests(unittest.TestCase): after=res['cursors']['before']) self.assertLessEqual(len(res['items']), 50) self.assertGreater(res['items'][0]['played_at'], played_at) + + +class SpotipyImplicitGrantTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + scope = ( + 'user-follow-read ' + 'user-follow-modify ' + ) + auth_manager = SpotifyImplicitGrant(scope=scope, + cache_path=".cache-implicittest") + cls.spotify = Spotify(auth_manager=auth_manager) + + def test_user_follows_and_unfollows_artist(self): + # Initially follows 1 artist + current_user_followed_artists = self.spotify.current_user_followed_artists()[ + 'artists']['total'] + + # Follow 2 more artists + artists = ["6DPYiyq5kWVQS4RGwxzPC7", "0NbfKEOTQCcwd6o7wSDOHI"] + self.spotify.user_follow_artists(artists) + res = self.spotify.current_user_followed_artists() + self.assertEqual(res['artists']['total'], current_user_followed_artists + len(artists)) + + # Unfollow these 2 artists + self.spotify.user_unfollow_artists(artists) + res = self.spotify.current_user_followed_artists() + self.assertEqual(res['artists']['total'], current_user_followed_artists) + + def test_current_user(self): + c_user = self.spotify.current_user() + user = self.spotify.user(c_user['id']) + self.assertEqual(c_user['display_name'], user['display_name']) diff --git a/tests/unit/test_oauth.py b/tests/unit/test_oauth.py index 539c7da..06afaaa 100644 --- a/tests/unit/test_oauth.py +++ b/tests/unit/test_oauth.py @@ -5,8 +5,9 @@ import unittest import six.moves.urllib.parse as urllibparse -from spotipy import SpotifyOAuth +from spotipy import SpotifyOAuth, SpotifyImplicitGrant from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOauthError +from spotipy.oauth2 import SpotifyStateError try: import unittest.mock as mock @@ -41,6 +42,10 @@ def _make_oauth(*args, **kwargs): return SpotifyOAuth("CLID", "CLISEC", "REDIR", "STATE", *args, **kwargs) +def _make_implicitgrantauth(*args, **kwargs): + return SpotifyImplicitGrant("CLID", "REDIR", "STATE", *args, **kwargs) + + class OAuthCacheTest(unittest.TestCase): @patch.multiple(SpotifyOAuth, @@ -199,10 +204,7 @@ class TestSpotifyOAuthGetAuthResponseInteractive(unittest.TestCase): def test_get_auth_response_with_inconsistent_state(self, webbrowser_mock, get_user_input_mock): oauth = SpotifyOAuth("CLID", "CLISEC", "redir.io", state='wxyz') - with self.assertRaisesRegexp( - SpotifyOauthError, - "Received inconsistent state from OAuth server." - ): + with self.assertRaises(SpotifyStateError): oauth.get_auth_response() @@ -213,3 +215,121 @@ class TestSpotifyClientCredentials(unittest.TestCase): with self.assertRaises(SpotifyOauthError) as error: oauth.get_access_token() self.assertEqual(error.exception.error, 'invalid_client') + + +class ImplicitGrantCacheTest(unittest.TestCase): + + @patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT) + @patch('spotipy.oauth2.open', create=True) + def test_gets_from_cache_path(self, opener, is_token_expired): + scope = "playlist-modify-private" + path = ".cache-username" + tok = _make_fake_token(1, 1, scope) + + opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False)) + is_token_expired.return_value = False + + spot = _make_implicitgrantauth(scope, path) + cached_tok = spot.get_cached_token() + + opener.assert_called_with(path) + self.assertIsNotNone(cached_tok) + + @patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT) + @patch('spotipy.oauth2.open', create=True) + def test_expired_token_returns_none(self, opener, is_token_expired): + scope = "playlist-modify-private" + path = ".cache-username" + expired_tok = _make_fake_token(0, None, scope) + + token_file = _token_file(json.dumps(expired_tok, ensure_ascii=False)) + opener.return_value = token_file + + spot = _make_implicitgrantauth(scope, path) + cached_tok = spot.get_cached_token() + + is_token_expired.assert_called_with(expired_tok) + opener.assert_any_call(path) + self.assertIsNone(cached_tok) + + @patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT) + @patch('spotipy.oauth2.open', create=True) + def test_badly_scoped_token_bails(self, opener, is_token_expired): + token_scope = "playlist-modify-public" + requested_scope = "playlist-modify-private" + path = ".cache-username" + tok = _make_fake_token(1, 1, token_scope) + + opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False)) + is_token_expired.return_value = False + + spot = _make_implicitgrantauth(requested_scope, path) + cached_tok = spot.get_cached_token() + + opener.assert_called_with(path) + self.assertIsNone(cached_tok) + + @patch('spotipy.oauth2.open', create=True) + def test_saves_to_cache_path(self, opener): + scope = "playlist-modify-private" + path = ".cache-username" + tok = _make_fake_token(1, 1, scope) + + fi = _fake_file() + opener.return_value = fi + + spot = SpotifyImplicitGrant("CLID", "REDIR", "STATE", scope, path) + spot._save_token_info(tok) + + opener.assert_called_with(path, 'w') + self.assertTrue(fi.write.called) + + +class TestSpotifyImplicitGrant(unittest.TestCase): + + def test_get_authorize_url_doesnt_pass_state_by_default(self): + auth = SpotifyImplicitGrant("CLID", "REDIR") + + url = auth.get_authorize_url() + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertNotIn('state', parsed_qs) + + def test_get_authorize_url_passes_state_from_constructor(self): + state = "STATE" + auth = SpotifyImplicitGrant("CLID", "REDIR", state) + + url = auth.get_authorize_url() + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertEqual(parsed_qs['state'][0], state) + + def test_get_authorize_url_passes_state_from_func_call(self): + state = "STATE" + auth = SpotifyImplicitGrant("CLID", "REDIR", "NOT STATE") + + url = auth.get_authorize_url(state=state) + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertEqual(parsed_qs['state'][0], state) + + def test_get_authorize_url_does_not_show_dialog_by_default(self): + auth = SpotifyImplicitGrant("CLID", "REDIR") + + url = auth.get_authorize_url() + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertNotIn('show_dialog', parsed_qs) + + def test_get_authorize_url_shows_dialog_when_requested(self): + auth = SpotifyImplicitGrant("CLID", "REDIR", show_dialog=True) + + url = auth.get_authorize_url() + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertTrue(parsed_qs['show_dialog'])