diff --git a/CHANGELOG.md b/CHANGELOG.md index d8c647a..03c75c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Proper replacements for all deprecated playlist endpoints (See https://developer.spotify.com/community/news/2018/06/12/changes-to-playlist-uris/ and below) - Allow for OAuth 2.0 authorization by instructing the user to open the URL in a browser instead of opening the browser. +- Support for the PKCE Auth Flow ### Deprecated diff --git a/spotipy/oauth2.py b/spotipy/oauth2.py index 0498d5c..dcb595d 100644 --- a/spotipy/oauth2.py +++ b/spotipy/oauth2.py @@ -7,6 +7,7 @@ __all__ = [ "SpotifyOauthError", "SpotifyStateError", "SpotifyImplicitGrant", + "SpotifyPKCE" ] import base64 @@ -552,6 +553,359 @@ class SpotifyOAuth(SpotifyAuthBase): return token_info +class SpotifyPKCE(SpotifyAuthBase): + """ Implements PKCE Authorization Flow for client apps + + This auth manager enables *user and non-user* endpoints with only + a client secret, redirect uri, and username. When the app requests + an an access token for the first time, the user is prompted to + authorize the new client app. After authorizing the app, the client + app is then given both access and refresh tokens. This is the + preferred way of authorizing a mobile/desktop client. + + """ + + OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize" + OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token" + + def __init__(self, + client_id=None, + redirect_uri=None, + state=None, + scope=None, + cache_path=None, + username=None, + proxies=None, + requests_timeout=None, + requests_session=True,): + """ + Creates Auth Manager with the PKCE Auth flow. + + Parameters: + - client_id - the client id of your app + - redirect_uri - the redirect URI of your app + - state - security state + - scope - the desired scope of the request + - cache_path - path to location to save tokens + - username - username of current client + - proxies - proxy for the requests library to route through + - requests_timeout - tell Requests to stop waiting for a response + after a given number of seconds + """ + super(SpotifyPKCE, self).__init__(requests_session) + self.client_id = client_id + self.redirect_uri = redirect_uri + self.state = state + self.scope = self._normalize_scope(scope) + self.cache_path = cache_path + self.username = username or os.getenv( + CLIENT_CREDS_ENV_VARS["client_username"] + ) + self.proxies = proxies + self.requests_timeout = requests_timeout + + self._code_challenge_method = "S256" # Spotify requires SHA256 + self.code_verifier = None + self.code_challenge = None + self.authorization_code = None + + def _normalize_scope(self, scope): + if scope: + scopes = sorted(scope.split()) + return " ".join(scopes) + else: + return None + + def _get_code_verifier(self): + ''' Spotify PCKE code verifier - See step 1 of the reference guide below + Reference: + https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce + ''' + # Range (33,96) is used to select between 44-128 base64 characters for the + # next operation. The range looks weird because base64 is 6 bytes + import random + length = random.randint(33, 96) + + # The seeded length generates between a 44 and 128 base64 characters encoded string + try: + import secrets + verifier = secrets.token_urlsafe(length) + except ImportError: # For python 3.5 support + import os + import base64 + rand_bytes = os.urandom(length) + verifier = base64.urlsafe_b64encode(rand_bytes).decode('utf-8').replace('=', '') + return verifier + + def _get_code_challenge(self): + ''' Spotify PCKE code challenge - See step 1 of the reference guide below + Reference: + https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce + ''' + import hashlib + import base64 + code_challenge_digest = hashlib.sha256(self.code_verifier.encode('utf-8')).digest() + code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8') + return code_challenge.replace('=', '') + + def get_authorize_url(self, state=None): + """ Gets the URL to use to authorize this app """ + payload = { + "client_id": self.client_id, + "response_type": "code", + "redirect_uri": self.redirect_uri, + "code_challenge_method": self._code_challenge_method, + "code_challenge": self.code_challenge + } + if self.scope: + payload["scope"] = self.scope + if state is None: + state = self.state + if state is not None: + payload["state"] = state + urlparams = urllibparse.urlencode(payload) + return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams) + + def _open_auth_url(self, state=None): + auth_url = self.get_authorize_url(state) + try: + webbrowser.open(auth_url) + logger.info("Opened %s in your browser", auth_url) + except webbrowser.Error: + logger.error("Please navigate here: %s", auth_url) + + def _get_auth_response(self, open_browser=True): + logger.info('User authentication requires interaction with your ' + 'web browser. Once you enter your credentials and ' + 'give authorization, you will be redirected to ' + 'a url. Paste that url you were directed to to ' + 'complete the authorization.') + + redirect_info = urlparse(self.redirect_uri) + redirect_host, redirect_port = get_host_port(redirect_info.netloc) + + if ( + open_browser + and redirect_host in ("127.0.0.1", "localhost") + and redirect_info.scheme == "http" + ): + # Only start a local http server if a port is specified + if redirect_port: + return self._get_auth_response_local_server(redirect_port) + else: + logger.warning('Using `%s` as redirect URI without a port. ' + 'Specify a port (e.g. `%s:8080`) to allow ' + 'automatic retrieval of authentication code ' + 'instead of having to copy and paste ' + 'the URL your browser is redirected to.', + redirect_host, redirect_host) + return self._get_auth_response_interactive(open_browser=open_browser) + + def _get_auth_response_local_server(self, redirect_port): + server = start_local_http_server(redirect_port) + self._open_auth_url() + server.handle_request() + + if self.state is not None and server.state != self.state: + raise SpotifyStateError(self.state, server.state) + + if server.auth_code is not None: + return server.auth_code + elif server.error is not None: + raise SpotifyOauthError("Received error from OAuth server: {}".format(server.error)) + else: + raise SpotifyOauthError("Server listening on localhost has not been accessed") + + def _get_auth_response_interactive(self, open_browser=True): + if open_browser: + self._open_auth_url() + prompt = "Enter the URL you were redirected to: " + else: + url = self.get_authorize_url() + prompt = ( + "Go to the following URL: {}\n" + "Enter the URL you were redirected to: ".format(url) + ) + response = SpotifyOAuth._get_user_input(prompt) + state, code = SpotifyOAuth.parse_auth_response_url(response) + if self.state is not None and self.state != state: + raise SpotifyStateError(self.state, state) + return code + + def get_authorization_code(self, response=None): + if response: + return self.parse_response_code(response) + return self._get_auth_response() + + def get_cached_token(self): + """ Gets a cached auth token + """ + token_info = None + + if not self.cache_path and self.username: + self.cache_path = ".cache-" + str(self.username) + elif not self.cache_path and not self.username: + raise SpotifyOauthError( + "You must either set a cache_path or a username." + ) + + if self.cache_path: + try: + f = open(self.cache_path) + token_info_string = f.read() + f.close() + token_info = json.loads(token_info_string) + + # if scopes don't match, then bail + if "scope" not in token_info or not self._is_scope_subset( + self.scope, token_info["scope"] + ): + return None + + if self.is_token_expired(token_info): + token_info = self.refresh_access_token( + token_info["refresh_token"] + ) + + except IOError: + pass + return token_info + + def _is_scope_subset(self, needle_scope, haystack_scope): + needle_scope = set(needle_scope.split()) if needle_scope else set() + haystack_scope = ( + set(haystack_scope.split()) if haystack_scope else set() + ) + return needle_scope <= haystack_scope + + def is_token_expired(self, token_info): + return is_token_expired(token_info) + + def _save_token_info(self, token_info): + if self.cache_path: + try: + f = open(self.cache_path, "w") + f.write(json.dumps(token_info)) + f.close() + except IOError: + logger.warning('Couldn\'t write token to cache at: %s', + self.cache_path) + + def _add_custom_values_to_token_info(self, token_info): + """ + Store some values that aren't directly provided by a Web API + response. + """ + token_info["expires_at"] = int(time.time()) + token_info["expires_in"] + return token_info + + def get_pkce_handshake_parameters(self): + self.code_verifier = self._get_code_verifier() + self.code_challenge = self._get_code_challenge() + + def get_access_token(self, check_cache=True): + """ Gets the access token for the app given the code + + Parameters: + - check_cache - check for locally stored token before request + a new token if True + """ + + if check_cache: + token_info = self.get_cached_token() + if token_info is not None: + if is_token_expired(token_info): + token_info = self.refresh_access_token( + token_info["refresh_token"] + ) + return token_info["access_token"] + + if self.code_verifier is None or self.code_challenge is None: + self.get_pkce_handshake_parameters() + + payload = { + "client_id": self.client_id, + "grant_type": "authorization_code", + "code": self.get_authorization_code(), + "redirect_uri": self.redirect_uri, + "code_verifier": self.code_verifier + } + + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + response = self._session.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + verify=True, + proxies=self.proxies, + timeout=self.requests_timeout, + ) + if response.status_code != 200: + error_payload = response.json() + raise SpotifyOauthError('error: {0}, error_descr: {1}'.format(error_payload['error'], + error_payload[ + 'error_description' + ]), + error=error_payload['error'], + error_description=error_payload['error_description']) + token_info = response.json() + token_info = self._add_custom_values_to_token_info(token_info) + self._save_token_info(token_info) + return token_info["access_token"] + + def refresh_access_token(self, refresh_token): + payload = { + "refresh_token": refresh_token, + "grant_type": "refresh_token", + "client_id": self.client_id, + } + + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + response = self._session.post( + self.OAUTH_TOKEN_URL, + data=payload, + headers=headers, + proxies=self.proxies, + timeout=self.requests_timeout, + ) + + try: + response.raise_for_status() + except BaseException: + logger.error('Couldn\'t refresh token. Response Status Code: %s ' + 'Reason: %s', response.status_code, response.reason) + + message = "Couldn't refresh token: code:%d reason:%s" % ( + response.status_code, + response.reason, + ) + raise SpotifyException(response.status_code, + -1, + message, + headers) + + token_info = response.json() + token_info = self._add_custom_values_to_token_info(token_info) + if "refresh_token" not in token_info: + token_info["refresh_token"] = refresh_token + self._save_token_info(token_info) + return token_info + + def parse_response_code(self, url): + """ Parse the response code in the given response url + + Parameters: + - url - the response url + """ + _, code = SpotifyOAuth.parse_auth_response_url(url) + if code is None: + return url + else: + return code + + class SpotifyImplicitGrant(SpotifyAuthBase): """ Implements Implicit Grant Flow for client apps @@ -817,6 +1171,10 @@ window.close()

Authentication status: {}

This window can be closed. + + """.format(status)) diff --git a/tests/integration/test_user_endpoints.py b/tests/integration/test_user_endpoints.py index d259f82..4438b65 100644 --- a/tests/integration/test_user_endpoints.py +++ b/tests/integration/test_user_endpoints.py @@ -5,7 +5,8 @@ from spotipy import ( prompt_for_user_token, Spotify, SpotifyException, - SpotifyImplicitGrant + SpotifyImplicitGrant, + SpotifyPKCE ) import unittest import requests @@ -424,3 +425,36 @@ class SpotipyImplicitGrantTests(unittest.TestCase): c_user = self.spotify.current_user() user = self.spotify.user(c_user['id']) self.assertEqual(c_user['display_name'], user['display_name']) + + +class SpotifyPKCETests(unittest.TestCase): + + @classmethod + def setUpClass(cls): + scope = ( + 'user-follow-read ' + 'user-follow-modify ' + ) + auth_manager = SpotifyPKCE(scope=scope, cache_path=".cache-implicittest") + cls.spotify = Spotify(auth_manager=auth_manager) + + def test_user_follows_and_unfollows_artist(self): + # Initially follows 1 artist + current_user_followed_artists = self.spotify.current_user_followed_artists()[ + 'artists']['total'] + + # Follow 2 more artists + artists = ["6DPYiyq5kWVQS4RGwxzPC7", "0NbfKEOTQCcwd6o7wSDOHI"] + self.spotify.user_follow_artists(artists) + res = self.spotify.current_user_followed_artists() + self.assertEqual(res['artists']['total'], current_user_followed_artists + len(artists)) + + # Unfollow these 2 artists + self.spotify.user_unfollow_artists(artists) + res = self.spotify.current_user_followed_artists() + self.assertEqual(res['artists']['total'], current_user_followed_artists) + + def test_current_user(self): + c_user = self.spotify.current_user() + user = self.spotify.user(c_user['id']) + self.assertEqual(c_user['display_name'], user['display_name']) diff --git a/tests/unit/test_oauth.py b/tests/unit/test_oauth.py index 06afaaa..fdd9b31 100644 --- a/tests/unit/test_oauth.py +++ b/tests/unit/test_oauth.py @@ -5,7 +5,7 @@ import unittest import six.moves.urllib.parse as urllibparse -from spotipy import SpotifyOAuth, SpotifyImplicitGrant +from spotipy import SpotifyOAuth, SpotifyImplicitGrant, SpotifyPKCE from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOauthError from spotipy.oauth2 import SpotifyStateError @@ -46,6 +46,10 @@ def _make_implicitgrantauth(*args, **kwargs): return SpotifyImplicitGrant("CLID", "REDIR", "STATE", *args, **kwargs) +def _make_pkceauth(*args, **kwargs): + return SpotifyPKCE("CLID", "REDIR", "STATE", *args, **kwargs) + + class OAuthCacheTest(unittest.TestCase): @patch.multiple(SpotifyOAuth, @@ -333,3 +337,135 @@ class TestSpotifyImplicitGrant(unittest.TestCase): parsed_url = urllibparse.urlparse(url) parsed_qs = urllibparse.parse_qs(parsed_url.query) self.assertTrue(parsed_qs['show_dialog']) + + +class SpotifyPKCECacheTest(unittest.TestCase): + + @patch.multiple(SpotifyPKCE, + is_token_expired=DEFAULT, refresh_access_token=DEFAULT) + @patch('spotipy.oauth2.open', create=True) + def test_gets_from_cache_path(self, opener, + is_token_expired, refresh_access_token): + scope = "playlist-modify-private" + path = ".cache-username" + tok = _make_fake_token(1, 1, scope) + + opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False)) + is_token_expired.return_value = False + + spot = _make_pkceauth(scope, path) + cached_tok = spot.get_cached_token() + + opener.assert_called_with(path) + self.assertIsNotNone(cached_tok) + self.assertEqual(refresh_access_token.call_count, 0) + + @patch.multiple(SpotifyPKCE, + is_token_expired=DEFAULT, refresh_access_token=DEFAULT) + @patch('spotipy.oauth2.open', create=True) + def test_expired_token_refreshes(self, opener, + is_token_expired, refresh_access_token): + scope = "playlist-modify-private" + path = ".cache-username" + expired_tok = _make_fake_token(0, None, scope) + fresh_tok = _make_fake_token(1, 1, scope) + + token_file = _token_file(json.dumps(expired_tok, ensure_ascii=False)) + opener.return_value = token_file + refresh_access_token.return_value = fresh_tok + + spot = _make_pkceauth(scope, path) + spot.get_cached_token() + + is_token_expired.assert_called_with(expired_tok) + refresh_access_token.assert_called_with(expired_tok['refresh_token']) + opener.assert_any_call(path) + + @patch.multiple(SpotifyPKCE, + is_token_expired=DEFAULT, refresh_access_token=DEFAULT) + @patch('spotipy.oauth2.open', create=True) + def test_badly_scoped_token_bails(self, opener, + is_token_expired, refresh_access_token): + token_scope = "playlist-modify-public" + requested_scope = "playlist-modify-private" + path = ".cache-username" + tok = _make_fake_token(1, 1, token_scope) + + opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False)) + is_token_expired.return_value = False + + spot = _make_pkceauth(requested_scope, path) + cached_tok = spot.get_cached_token() + + opener.assert_called_with(path) + self.assertIsNone(cached_tok) + self.assertEqual(refresh_access_token.call_count, 0) + + @patch('spotipy.oauth2.open', create=True) + def test_saves_to_cache_path(self, opener): + scope = "playlist-modify-private" + path = ".cache-username" + tok = _make_fake_token(1, 1, scope) + + fi = _fake_file() + opener.return_value = fi + + spot = SpotifyPKCE("CLID", "REDIR", "STATE", scope, path) + spot._save_token_info(tok) + + opener.assert_called_with(path, 'w') + self.assertTrue(fi.write.called) + + +class TestSpotifyPKCE(unittest.TestCase): + + def test_generate_code_verifier_for_pkce(self): + auth = SpotifyPKCE("CLID", "REDIR") + auth.get_pkce_handshake_parameters() + self.assertTrue(auth.code_verifier) + + def test_generate_code_challenge_for_pkce(self): + auth = SpotifyPKCE("CLID", "REDIR") + auth.get_pkce_handshake_parameters() + self.assertTrue(auth.code_challenge) + + def test_code_verifier_and_code_challenge_are_correct(self): + import hashlib + import base64 + auth = SpotifyPKCE("CLID", "REDIR") + auth.get_pkce_handshake_parameters() + self.assertEqual(auth.code_challenge, + base64.urlsafe_b64encode( + hashlib.sha256(auth.code_verifier.encode('utf-8')) + .digest()) + .decode('utf-8') + .replace('=', '')) + + def test_get_authorize_url_doesnt_pass_state_by_default(self): + auth = SpotifyPKCE("CLID", "REDIR") + + url = auth.get_authorize_url() + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertNotIn('state', parsed_qs) + + def test_get_authorize_url_passes_state_from_constructor(self): + state = "STATE" + auth = SpotifyPKCE("CLID", "REDIR", state) + + url = auth.get_authorize_url() + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertEqual(parsed_qs['state'][0], state) + + def test_get_authorize_url_passes_state_from_func_call(self): + state = "STATE" + auth = SpotifyPKCE("CLID", "REDIR") + + url = auth.get_authorize_url(state=state) + + parsed_url = urllibparse.urlparse(url) + parsed_qs = urllibparse.parse_qs(parsed_url.query) + self.assertEqual(parsed_qs['state'][0], state)