Adding Initial PKCE Auth Flow Support (#542)

* Added base funtionality for PKCE Authorization - i538

* fixed a mistake with the auth code

* fixed more misunderstandings. fixed grant_access_token to now call authorization if needed

* added comments and references to code verifier and code challenge

* removed debug print statement

* updated unit tests for new PKCE flow

* cleaned up username issues - added doc strings to class

* fixed import issue, added user endpoint tests

* forgot to commit this file

* linted

* clarified comment

* no longer generates code verifier or challenge in constructor, only when needed

* fixed flake8 complaints, added forgotten unit tests

* fixed linting with unit tests

* anotha one

* added python3.5 support

* linting

* added to CHANGELOG

* removed as_dict option from get_access_token()

Co-authored-by: tomCLANCC <26153156+tomCLANCC@users.noreply.github.com>
This commit is contained in:
DJSdev 2020-07-21 12:08:09 -04:00 committed by GitHub
parent 88cf75b778
commit c425102659
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 531 additions and 2 deletions

View File

@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Proper replacements for all deprecated playlist endpoints - Proper replacements for all deprecated playlist endpoints
(See https://developer.spotify.com/community/news/2018/06/12/changes-to-playlist-uris/ and below) (See https://developer.spotify.com/community/news/2018/06/12/changes-to-playlist-uris/ and below)
- Allow for OAuth 2.0 authorization by instructing the user to open the URL in a browser instead of opening the browser. - Allow for OAuth 2.0 authorization by instructing the user to open the URL in a browser instead of opening the browser.
- Support for the PKCE Auth Flow
### Deprecated ### Deprecated

View File

@ -7,6 +7,7 @@ __all__ = [
"SpotifyOauthError", "SpotifyOauthError",
"SpotifyStateError", "SpotifyStateError",
"SpotifyImplicitGrant", "SpotifyImplicitGrant",
"SpotifyPKCE"
] ]
import base64 import base64
@ -552,6 +553,359 @@ class SpotifyOAuth(SpotifyAuthBase):
return token_info return token_info
class SpotifyPKCE(SpotifyAuthBase):
""" Implements PKCE Authorization Flow for client apps
This auth manager enables *user and non-user* endpoints with only
a client secret, redirect uri, and username. When the app requests
an an access token for the first time, the user is prompted to
authorize the new client app. After authorizing the app, the client
app is then given both access and refresh tokens. This is the
preferred way of authorizing a mobile/desktop client.
"""
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
def __init__(self,
client_id=None,
redirect_uri=None,
state=None,
scope=None,
cache_path=None,
username=None,
proxies=None,
requests_timeout=None,
requests_session=True,):
"""
Creates Auth Manager with the PKCE Auth flow.
Parameters:
- client_id - the client id of your app
- redirect_uri - the redirect URI of your app
- state - security state
- scope - the desired scope of the request
- cache_path - path to location to save tokens
- username - username of current client
- proxies - proxy for the requests library to route through
- requests_timeout - tell Requests to stop waiting for a response
after a given number of seconds
"""
super(SpotifyPKCE, self).__init__(requests_session)
self.client_id = client_id
self.redirect_uri = redirect_uri
self.state = state
self.scope = self._normalize_scope(scope)
self.cache_path = cache_path
self.username = username or os.getenv(
CLIENT_CREDS_ENV_VARS["client_username"]
)
self.proxies = proxies
self.requests_timeout = requests_timeout
self._code_challenge_method = "S256" # Spotify requires SHA256
self.code_verifier = None
self.code_challenge = None
self.authorization_code = None
def _normalize_scope(self, scope):
if scope:
scopes = sorted(scope.split())
return " ".join(scopes)
else:
return None
def _get_code_verifier(self):
''' Spotify PCKE code verifier - See step 1 of the reference guide below
Reference:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
'''
# Range (33,96) is used to select between 44-128 base64 characters for the
# next operation. The range looks weird because base64 is 6 bytes
import random
length = random.randint(33, 96)
# The seeded length generates between a 44 and 128 base64 characters encoded string
try:
import secrets
verifier = secrets.token_urlsafe(length)
except ImportError: # For python 3.5 support
import os
import base64
rand_bytes = os.urandom(length)
verifier = base64.urlsafe_b64encode(rand_bytes).decode('utf-8').replace('=', '')
return verifier
def _get_code_challenge(self):
''' Spotify PCKE code challenge - See step 1 of the reference guide below
Reference:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
'''
import hashlib
import base64
code_challenge_digest = hashlib.sha256(self.code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8')
return code_challenge.replace('=', '')
def get_authorize_url(self, state=None):
""" Gets the URL to use to authorize this app """
payload = {
"client_id": self.client_id,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"code_challenge_method": self._code_challenge_method,
"code_challenge": self.code_challenge
}
if self.scope:
payload["scope"] = self.scope
if state is None:
state = self.state
if state is not None:
payload["state"] = state
urlparams = urllibparse.urlencode(payload)
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
def _open_auth_url(self, state=None):
auth_url = self.get_authorize_url(state)
try:
webbrowser.open(auth_url)
logger.info("Opened %s in your browser", auth_url)
except webbrowser.Error:
logger.error("Please navigate here: %s", auth_url)
def _get_auth_response(self, open_browser=True):
logger.info('User authentication requires interaction with your '
'web browser. Once you enter your credentials and '
'give authorization, you will be redirected to '
'a url. Paste that url you were directed to to '
'complete the authorization.')
redirect_info = urlparse(self.redirect_uri)
redirect_host, redirect_port = get_host_port(redirect_info.netloc)
if (
open_browser
and redirect_host in ("127.0.0.1", "localhost")
and redirect_info.scheme == "http"
):
# Only start a local http server if a port is specified
if redirect_port:
return self._get_auth_response_local_server(redirect_port)
else:
logger.warning('Using `%s` as redirect URI without a port. '
'Specify a port (e.g. `%s:8080`) to allow '
'automatic retrieval of authentication code '
'instead of having to copy and paste '
'the URL your browser is redirected to.',
redirect_host, redirect_host)
return self._get_auth_response_interactive(open_browser=open_browser)
def _get_auth_response_local_server(self, redirect_port):
server = start_local_http_server(redirect_port)
self._open_auth_url()
server.handle_request()
if self.state is not None and server.state != self.state:
raise SpotifyStateError(self.state, server.state)
if server.auth_code is not None:
return server.auth_code
elif server.error is not None:
raise SpotifyOauthError("Received error from OAuth server: {}".format(server.error))
else:
raise SpotifyOauthError("Server listening on localhost has not been accessed")
def _get_auth_response_interactive(self, open_browser=True):
if open_browser:
self._open_auth_url()
prompt = "Enter the URL you were redirected to: "
else:
url = self.get_authorize_url()
prompt = (
"Go to the following URL: {}\n"
"Enter the URL you were redirected to: ".format(url)
)
response = SpotifyOAuth._get_user_input(prompt)
state, code = SpotifyOAuth.parse_auth_response_url(response)
if self.state is not None and self.state != state:
raise SpotifyStateError(self.state, state)
return code
def get_authorization_code(self, response=None):
if response:
return self.parse_response_code(response)
return self._get_auth_response()
def get_cached_token(self):
""" Gets a cached auth token
"""
token_info = None
if not self.cache_path and self.username:
self.cache_path = ".cache-" + str(self.username)
elif not self.cache_path and not self.username:
raise SpotifyOauthError(
"You must either set a cache_path or a username."
)
if self.cache_path:
try:
f = open(self.cache_path)
token_info_string = f.read()
f.close()
token_info = json.loads(token_info_string)
# if scopes don't match, then bail
if "scope" not in token_info or not self._is_scope_subset(
self.scope, token_info["scope"]
):
return None
if self.is_token_expired(token_info):
token_info = self.refresh_access_token(
token_info["refresh_token"]
)
except IOError:
pass
return token_info
def _is_scope_subset(self, needle_scope, haystack_scope):
needle_scope = set(needle_scope.split()) if needle_scope else set()
haystack_scope = (
set(haystack_scope.split()) if haystack_scope else set()
)
return needle_scope <= haystack_scope
def is_token_expired(self, token_info):
return is_token_expired(token_info)
def _save_token_info(self, token_info):
if self.cache_path:
try:
f = open(self.cache_path, "w")
f.write(json.dumps(token_info))
f.close()
except IOError:
logger.warning('Couldn\'t write token to cache at: %s',
self.cache_path)
def _add_custom_values_to_token_info(self, token_info):
"""
Store some values that aren't directly provided by a Web API
response.
"""
token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
return token_info
def get_pkce_handshake_parameters(self):
self.code_verifier = self._get_code_verifier()
self.code_challenge = self._get_code_challenge()
def get_access_token(self, check_cache=True):
""" Gets the access token for the app given the code
Parameters:
- check_cache - check for locally stored token before request
a new token if True
"""
if check_cache:
token_info = self.get_cached_token()
if token_info is not None:
if is_token_expired(token_info):
token_info = self.refresh_access_token(
token_info["refresh_token"]
)
return token_info["access_token"]
if self.code_verifier is None or self.code_challenge is None:
self.get_pkce_handshake_parameters()
payload = {
"client_id": self.client_id,
"grant_type": "authorization_code",
"code": self.get_authorization_code(),
"redirect_uri": self.redirect_uri,
"code_verifier": self.code_verifier
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = self._session.post(
self.OAUTH_TOKEN_URL,
data=payload,
headers=headers,
verify=True,
proxies=self.proxies,
timeout=self.requests_timeout,
)
if response.status_code != 200:
error_payload = response.json()
raise SpotifyOauthError('error: {0}, error_descr: {1}'.format(error_payload['error'],
error_payload[
'error_description'
]),
error=error_payload['error'],
error_description=error_payload['error_description'])
token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info)
self._save_token_info(token_info)
return token_info["access_token"]
def refresh_access_token(self, refresh_token):
payload = {
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"client_id": self.client_id,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = self._session.post(
self.OAUTH_TOKEN_URL,
data=payload,
headers=headers,
proxies=self.proxies,
timeout=self.requests_timeout,
)
try:
response.raise_for_status()
except BaseException:
logger.error('Couldn\'t refresh token. Response Status Code: %s '
'Reason: %s', response.status_code, response.reason)
message = "Couldn't refresh token: code:%d reason:%s" % (
response.status_code,
response.reason,
)
raise SpotifyException(response.status_code,
-1,
message,
headers)
token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info)
if "refresh_token" not in token_info:
token_info["refresh_token"] = refresh_token
self._save_token_info(token_info)
return token_info
def parse_response_code(self, url):
""" Parse the response code in the given response url
Parameters:
- url - the response url
"""
_, code = SpotifyOAuth.parse_auth_response_url(url)
if code is None:
return url
else:
return code
class SpotifyImplicitGrant(SpotifyAuthBase): class SpotifyImplicitGrant(SpotifyAuthBase):
""" Implements Implicit Grant Flow for client apps """ Implements Implicit Grant Flow for client apps
@ -817,6 +1171,10 @@ window.close()
<body> <body>
<h1>Authentication status: {}</h1> <h1>Authentication status: {}</h1>
This window can be closed. This window can be closed.
<script>
window.close()
</script>
<button class="closeButton" style="cursor: pointer" onclick="window.close();">Close Window</button>
</body> </body>
</html>""".format(status)) </html>""".format(status))

View File

@ -5,7 +5,8 @@ from spotipy import (
prompt_for_user_token, prompt_for_user_token,
Spotify, Spotify,
SpotifyException, SpotifyException,
SpotifyImplicitGrant SpotifyImplicitGrant,
SpotifyPKCE
) )
import unittest import unittest
import requests import requests
@ -424,3 +425,36 @@ class SpotipyImplicitGrantTests(unittest.TestCase):
c_user = self.spotify.current_user() c_user = self.spotify.current_user()
user = self.spotify.user(c_user['id']) user = self.spotify.user(c_user['id'])
self.assertEqual(c_user['display_name'], user['display_name']) self.assertEqual(c_user['display_name'], user['display_name'])
class SpotifyPKCETests(unittest.TestCase):
@classmethod
def setUpClass(cls):
scope = (
'user-follow-read '
'user-follow-modify '
)
auth_manager = SpotifyPKCE(scope=scope, cache_path=".cache-implicittest")
cls.spotify = Spotify(auth_manager=auth_manager)
def test_user_follows_and_unfollows_artist(self):
# Initially follows 1 artist
current_user_followed_artists = self.spotify.current_user_followed_artists()[
'artists']['total']
# Follow 2 more artists
artists = ["6DPYiyq5kWVQS4RGwxzPC7", "0NbfKEOTQCcwd6o7wSDOHI"]
self.spotify.user_follow_artists(artists)
res = self.spotify.current_user_followed_artists()
self.assertEqual(res['artists']['total'], current_user_followed_artists + len(artists))
# Unfollow these 2 artists
self.spotify.user_unfollow_artists(artists)
res = self.spotify.current_user_followed_artists()
self.assertEqual(res['artists']['total'], current_user_followed_artists)
def test_current_user(self):
c_user = self.spotify.current_user()
user = self.spotify.user(c_user['id'])
self.assertEqual(c_user['display_name'], user['display_name'])

View File

@ -5,7 +5,7 @@ import unittest
import six.moves.urllib.parse as urllibparse import six.moves.urllib.parse as urllibparse
from spotipy import SpotifyOAuth, SpotifyImplicitGrant from spotipy import SpotifyOAuth, SpotifyImplicitGrant, SpotifyPKCE
from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOauthError from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOauthError
from spotipy.oauth2 import SpotifyStateError from spotipy.oauth2 import SpotifyStateError
@ -46,6 +46,10 @@ def _make_implicitgrantauth(*args, **kwargs):
return SpotifyImplicitGrant("CLID", "REDIR", "STATE", *args, **kwargs) return SpotifyImplicitGrant("CLID", "REDIR", "STATE", *args, **kwargs)
def _make_pkceauth(*args, **kwargs):
return SpotifyPKCE("CLID", "REDIR", "STATE", *args, **kwargs)
class OAuthCacheTest(unittest.TestCase): class OAuthCacheTest(unittest.TestCase):
@patch.multiple(SpotifyOAuth, @patch.multiple(SpotifyOAuth,
@ -333,3 +337,135 @@ class TestSpotifyImplicitGrant(unittest.TestCase):
parsed_url = urllibparse.urlparse(url) parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query) parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertTrue(parsed_qs['show_dialog']) self.assertTrue(parsed_qs['show_dialog'])
class SpotifyPKCECacheTest(unittest.TestCase):
@patch.multiple(SpotifyPKCE,
is_token_expired=DEFAULT, refresh_access_token=DEFAULT)
@patch('spotipy.oauth2.open', create=True)
def test_gets_from_cache_path(self, opener,
is_token_expired, refresh_access_token):
scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, scope)
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_pkceauth(scope, path)
cached_tok = spot.get_cached_token()
opener.assert_called_with(path)
self.assertIsNotNone(cached_tok)
self.assertEqual(refresh_access_token.call_count, 0)
@patch.multiple(SpotifyPKCE,
is_token_expired=DEFAULT, refresh_access_token=DEFAULT)
@patch('spotipy.oauth2.open', create=True)
def test_expired_token_refreshes(self, opener,
is_token_expired, refresh_access_token):
scope = "playlist-modify-private"
path = ".cache-username"
expired_tok = _make_fake_token(0, None, scope)
fresh_tok = _make_fake_token(1, 1, scope)
token_file = _token_file(json.dumps(expired_tok, ensure_ascii=False))
opener.return_value = token_file
refresh_access_token.return_value = fresh_tok
spot = _make_pkceauth(scope, path)
spot.get_cached_token()
is_token_expired.assert_called_with(expired_tok)
refresh_access_token.assert_called_with(expired_tok['refresh_token'])
opener.assert_any_call(path)
@patch.multiple(SpotifyPKCE,
is_token_expired=DEFAULT, refresh_access_token=DEFAULT)
@patch('spotipy.oauth2.open', create=True)
def test_badly_scoped_token_bails(self, opener,
is_token_expired, refresh_access_token):
token_scope = "playlist-modify-public"
requested_scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, token_scope)
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_pkceauth(requested_scope, path)
cached_tok = spot.get_cached_token()
opener.assert_called_with(path)
self.assertIsNone(cached_tok)
self.assertEqual(refresh_access_token.call_count, 0)
@patch('spotipy.oauth2.open', create=True)
def test_saves_to_cache_path(self, opener):
scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, scope)
fi = _fake_file()
opener.return_value = fi
spot = SpotifyPKCE("CLID", "REDIR", "STATE", scope, path)
spot._save_token_info(tok)
opener.assert_called_with(path, 'w')
self.assertTrue(fi.write.called)
class TestSpotifyPKCE(unittest.TestCase):
def test_generate_code_verifier_for_pkce(self):
auth = SpotifyPKCE("CLID", "REDIR")
auth.get_pkce_handshake_parameters()
self.assertTrue(auth.code_verifier)
def test_generate_code_challenge_for_pkce(self):
auth = SpotifyPKCE("CLID", "REDIR")
auth.get_pkce_handshake_parameters()
self.assertTrue(auth.code_challenge)
def test_code_verifier_and_code_challenge_are_correct(self):
import hashlib
import base64
auth = SpotifyPKCE("CLID", "REDIR")
auth.get_pkce_handshake_parameters()
self.assertEqual(auth.code_challenge,
base64.urlsafe_b64encode(
hashlib.sha256(auth.code_verifier.encode('utf-8'))
.digest())
.decode('utf-8')
.replace('=', ''))
def test_get_authorize_url_doesnt_pass_state_by_default(self):
auth = SpotifyPKCE("CLID", "REDIR")
url = auth.get_authorize_url()
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertNotIn('state', parsed_qs)
def test_get_authorize_url_passes_state_from_constructor(self):
state = "STATE"
auth = SpotifyPKCE("CLID", "REDIR", state)
url = auth.get_authorize_url()
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertEqual(parsed_qs['state'][0], state)
def test_get_authorize_url_passes_state_from_func_call(self):
state = "STATE"
auth = SpotifyPKCE("CLID", "REDIR")
url = auth.get_authorize_url(state=state)
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertEqual(parsed_qs['state'][0], state)