mirror of
https://github.com/spotipy-dev/spotipy.git
synced 2026-06-19 09:13:53 +00:00
* Add SpotifyImplicitGrant with get_access_token and get_cached_token (and minimum related functions) * Add some overlooked necessary methods/values in SpotifyImplicitGrant * Remove unsuppported functionality and make SpotifyImplicitGrant public * Allow/Expose integration of SpotifyImplicitGrant in client * Add Implicit Grant tests and decrease abilities of prompt_for_user_token Remove Implicit Grant and state support from prompt_for_user_token * Add documentation and changelog entry * Touch up PEP8 compliance * Ignore long line with link for flake8 * Correct changelog * Restore compatibility with Python 2.7 * Correct help(SpotifyImplicitGrant.get_access_token) * Remove as_dict from SpotifyImplicitGrant.get_access_token * Combine status check functionality with implicit grant support In oauth2.py: * Add state checking to SpotifyImplicitGrant * Add dedicated SpotifyStateError as subclass of SpotifyOauthError * Moved `_get_user_input` from SpotifyOAuth to superclass SpotifyAuthBase * Renamed `parse_oauth_response_url` to `parse_auth_response_url` * Moved error handling into `parse_auth_response_url` Made minor changes in tests and client.py accordingly * Update changelog * Trim down tests for SpotifyImplicitGrant * Fix trailing whitespace
This commit is contained in:
parent
ed136d15df
commit
38515689bc
@ -9,8 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added `SpotifyImplicitGrant` as an auth manager option. It provides
|
||||
user authentication without a client secret but sacrifices the ability
|
||||
to refresh the token without user input. (However, read the class
|
||||
docstring for security advisory.)
|
||||
- Added built-in verification of the `state` query parameter
|
||||
- Added two new attributes: error and error_description to `SpotifyOauthError` exception class to show
|
||||
authorization/authentication web api errors details.
|
||||
- Added `SpotifyStateError` subclass of `SpotifyOauthError`
|
||||
- Allow extending `SpotifyClientCredentials` and `SpotifyOAuth`
|
||||
- Added the market paramter to `album_tracks`
|
||||
|
||||
|
||||
@ -63,7 +63,8 @@ class Spotify(object):
|
||||
:param oauth_manager:
|
||||
SpotifyOAuth object
|
||||
:param auth_manager:
|
||||
SpotifyOauth object or SpotifyClientCredentials object
|
||||
SpotifyOauth, SpotifyClientCredentials,
|
||||
or SpotifyImplicitGrant object
|
||||
:param proxies:
|
||||
Definition of proxies (optional).
|
||||
See Requests doc https://2.python-requests.org/en/master/user/advanced/#proxies
|
||||
@ -138,11 +139,13 @@ class Spotify(object):
|
||||
def _auth_headers(self):
|
||||
if self._auth:
|
||||
return {"Authorization": "Bearer {0}".format(self._auth)}
|
||||
elif self.auth_manager:
|
||||
token = self.auth_manager.get_access_token(as_dict=False)
|
||||
return {"Authorization": "Bearer {0}".format(token)}
|
||||
else:
|
||||
if not self.auth_manager:
|
||||
return {}
|
||||
try:
|
||||
token = self.auth_manager.get_access_token(as_dict=False)
|
||||
except TypeError:
|
||||
token = self.auth_manager.get_access_token()
|
||||
return {"Authorization": "Bearer {0}".format(token)}
|
||||
|
||||
def _internal_call(self, method, url, payload, params):
|
||||
args = dict(params=params)
|
||||
|
||||
@ -5,6 +5,8 @@ __all__ = [
|
||||
"SpotifyClientCredentials",
|
||||
"SpotifyOAuth",
|
||||
"SpotifyOauthError",
|
||||
"SpotifyStateError",
|
||||
"SpotifyImplicitGrant",
|
||||
]
|
||||
|
||||
import base64
|
||||
@ -29,12 +31,26 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SpotifyOauthError(Exception):
|
||||
""" Error during Auth Code or Implicit Grant flow """
|
||||
def __init__(self, message, error=None, error_description=None, *args, **kwargs):
|
||||
self.error = error
|
||||
self.error_description = error_description
|
||||
self.__dict__.update(kwargs)
|
||||
super(SpotifyOauthError, self).__init__(message, *args, **kwargs)
|
||||
|
||||
|
||||
class SpotifyStateError(SpotifyOauthError):
|
||||
""" The state sent and state recieved were different """
|
||||
def __init__(self, local_state=None, remote_state=None, message=None,
|
||||
error=None, error_description=None, *args, **kwargs):
|
||||
if not message:
|
||||
message = ("Expected " + local_state + " but recieved "
|
||||
+ remote_state)
|
||||
super(SpotifyOauthError, self).__init__(message, error,
|
||||
error_description, *args,
|
||||
**kwargs)
|
||||
|
||||
|
||||
def _make_authorization_headers(client_id, client_secret):
|
||||
auth_header = base64.b64encode(
|
||||
six.text_type(client_id + ":" + client_secret).encode("ascii")
|
||||
@ -94,6 +110,13 @@ class SpotifyAuthBase(object):
|
||||
def redirect_uri(self, val):
|
||||
self._redirect_uri = _ensure_value(val, "redirect_uri")
|
||||
|
||||
@staticmethod
|
||||
def _get_user_input(prompt):
|
||||
try:
|
||||
return raw_input(prompt)
|
||||
except NameError:
|
||||
return input(prompt)
|
||||
|
||||
def __del__(self):
|
||||
"""Make sure the connection (pool) gets closed"""
|
||||
if isinstance(self._session, requests.Session):
|
||||
@ -322,24 +345,21 @@ class SpotifyOAuth(SpotifyAuthBase):
|
||||
Parameters:
|
||||
- url - the response url
|
||||
"""
|
||||
_, code, _ = self.parse_oauth_response_url(url)
|
||||
_, code = self.parse_auth_response_url(url)
|
||||
if code is None:
|
||||
return url
|
||||
else:
|
||||
return code
|
||||
|
||||
@staticmethod
|
||||
def parse_oauth_response_url(url):
|
||||
def parse_auth_response_url(url):
|
||||
query_s = urlparse(url).query
|
||||
form = dict(parse_qsl(query_s))
|
||||
return tuple(form.get(param) for param in ['state', 'code', 'error'])
|
||||
|
||||
@staticmethod
|
||||
def _get_user_input(prompt):
|
||||
try:
|
||||
return raw_input(prompt)
|
||||
except NameError:
|
||||
return input(prompt)
|
||||
if "error" in form:
|
||||
raise SpotifyOauthError("Received error from auth server: "
|
||||
"{}".format(form["error"]),
|
||||
error=form["error"])
|
||||
return tuple(form.get(param) for param in ["state", "code"])
|
||||
|
||||
def _make_authorization_headers(self):
|
||||
return _make_authorization_headers(self.client_id, self.client_secret)
|
||||
@ -355,9 +375,9 @@ class SpotifyOAuth(SpotifyAuthBase):
|
||||
def _get_auth_response_interactive(self):
|
||||
self._open_auth_url()
|
||||
response = SpotifyOAuth._get_user_input("Enter the URL you were redirected to: ")
|
||||
state, code, _ = SpotifyOAuth.parse_oauth_response_url(response)
|
||||
state, code = SpotifyOAuth.parse_auth_response_url(response)
|
||||
if self.state is not None and self.state != state:
|
||||
raise SpotifyOauthError("Received inconsistent state from OAuth server.")
|
||||
raise SpotifyStateError(self.state, state)
|
||||
return code
|
||||
|
||||
def _get_auth_response_local_server(self, redirect_port):
|
||||
@ -366,7 +386,7 @@ class SpotifyOAuth(SpotifyAuthBase):
|
||||
server.handle_request()
|
||||
|
||||
if self.state is not None and server.state != self.state:
|
||||
raise SpotifyOauthError("Received inconsistent state from OAuth server.")
|
||||
raise SpotifyStateError(self.state, server.state)
|
||||
|
||||
if server.auth_code is not None:
|
||||
return server.auth_code
|
||||
@ -520,12 +540,251 @@ class SpotifyOAuth(SpotifyAuthBase):
|
||||
return token_info
|
||||
|
||||
|
||||
class SpotifyImplicitGrant(SpotifyAuthBase):
|
||||
""" Implements Implicit Grant Flow for client apps
|
||||
|
||||
This auth manager enables *user and non-user* endpoints with only
|
||||
a client secret, redirect uri, and username. The user will need to
|
||||
copy and paste a URI from the browser every hour.
|
||||
|
||||
Security Advisory
|
||||
-----------------
|
||||
The Implicit Grant Flow is part of the
|
||||
[OAuth 2.0 standard](https://oauth.net/2/grant-types/implicit/).
|
||||
It is intended for client-side (running in browser or a native app)
|
||||
interactions where the client secret would have to be hard-coded and
|
||||
exposed. OAuth no longer recommends its use because sensitive
|
||||
info (the auth token) can be yanked from the browser address bar or
|
||||
history, instead recommending the Auth Code flow with PKCE. However,
|
||||
Spotify [does not support PKCE](https://community.spotify.com/t5/Spotify-for-Developers/Authentication-API-failing-in-production-right-now/m-p/4960693/highlight/true#M234), <!---# noqa: E501-->
|
||||
so Implicit Grant is the only viable options for client-side Spotify
|
||||
API requests.
|
||||
"""
|
||||
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
|
||||
|
||||
def __init__(self,
|
||||
client_id=None,
|
||||
redirect_uri=None,
|
||||
state=None,
|
||||
scope=None,
|
||||
cache_path=None,
|
||||
username=None,
|
||||
show_dialog=False):
|
||||
""" Creates Auth Manager using the Implicit Grant flow
|
||||
|
||||
**See help(SpotifyImplictGrant) for Security Advisory**
|
||||
|
||||
Parameters
|
||||
----------
|
||||
* client_id: Must be supplied or set as environment variable
|
||||
* redirect_uri: Must be supplied or set as environment variable
|
||||
* state: May be supplied, no verification is performed
|
||||
* scope: May be supplied, intuitively converted to proper format
|
||||
* cache_path: May be supplied, will otherwise be generated
|
||||
* username: Must be supplied or set as environment variable
|
||||
* show_dialog: Interpreted as boolean
|
||||
"""
|
||||
self.client_id = client_id
|
||||
self.redirect_uri = redirect_uri
|
||||
self.state = state
|
||||
self.cache_path = cache_path
|
||||
self.username = username or os.getenv(
|
||||
CLIENT_CREDS_ENV_VARS["client_username"]
|
||||
)
|
||||
self.scope = self._normalize_scope(scope)
|
||||
self.show_dialog = show_dialog
|
||||
self._session = None # As to not break inherited __del__
|
||||
|
||||
def get_cached_token(self):
|
||||
""" Gets a cached auth token
|
||||
"""
|
||||
token_info = None
|
||||
|
||||
if not self.cache_path and self.username:
|
||||
self.cache_path = ".cache-" + str(self.username)
|
||||
elif not self.cache_path and not self.username:
|
||||
raise SpotifyOauthError(
|
||||
"You must either set a cache_path or a username."
|
||||
)
|
||||
|
||||
if self.cache_path:
|
||||
try:
|
||||
f = open(self.cache_path)
|
||||
token_info_string = f.read()
|
||||
f.close()
|
||||
token_info = json.loads(token_info_string)
|
||||
|
||||
# if scopes don't match, then bail
|
||||
if "scope" not in token_info or not self._is_scope_subset(
|
||||
self.scope, token_info["scope"]
|
||||
):
|
||||
return None
|
||||
|
||||
if self.is_token_expired(token_info):
|
||||
return None
|
||||
|
||||
except IOError:
|
||||
pass
|
||||
return token_info
|
||||
|
||||
def _save_token_info(self, token_info):
|
||||
if not self.cache_path and self.username:
|
||||
self.cache_path = ".cache-" + str(self.username)
|
||||
if self.cache_path:
|
||||
try:
|
||||
f = open(self.cache_path, "w")
|
||||
f.write(json.dumps(token_info))
|
||||
f.close()
|
||||
except IOError:
|
||||
logger.warning('Couldn\'t write token to cache at: %s',
|
||||
self.cache_path)
|
||||
|
||||
def _is_scope_subset(self, needle_scope, haystack_scope):
|
||||
needle_scope = set(needle_scope.split()) if needle_scope else set()
|
||||
haystack_scope = (
|
||||
set(haystack_scope.split()) if haystack_scope else set()
|
||||
)
|
||||
return needle_scope <= haystack_scope
|
||||
|
||||
def is_token_expired(self, token_info):
|
||||
return is_token_expired(token_info)
|
||||
|
||||
def get_access_token(self,
|
||||
state=None,
|
||||
response=None,
|
||||
check_cache=True):
|
||||
""" Gets Auth Token from cache (preferred) or user interaction
|
||||
|
||||
Parameters
|
||||
----------
|
||||
* state: May be given, overrides (without changing) self.state
|
||||
* response: URI with token, can break expiration checks
|
||||
* check_cache: Interpreted as boolean
|
||||
"""
|
||||
if check_cache:
|
||||
token_info = self.get_cached_token()
|
||||
if not (token_info is None or is_token_expired(token_info)):
|
||||
return token_info["access_token"]
|
||||
|
||||
if response:
|
||||
token_info = self.parse_response_token(response)
|
||||
else:
|
||||
token_info = self.get_auth_response(state)
|
||||
token_info = self._add_custom_values_to_token_info(token_info)
|
||||
self._save_token_info(token_info)
|
||||
|
||||
return token_info["access_token"]
|
||||
|
||||
def _normalize_scope(self, scope):
|
||||
if scope:
|
||||
scopes = sorted(scope.split())
|
||||
return " ".join(scopes)
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_authorize_url(self, state=None):
|
||||
""" Gets the URL to use to authorize this app """
|
||||
payload = {
|
||||
"client_id": self.client_id,
|
||||
"response_type": "token",
|
||||
"redirect_uri": self.redirect_uri,
|
||||
}
|
||||
if self.scope:
|
||||
payload["scope"] = self.scope
|
||||
if state is None:
|
||||
state = self.state
|
||||
if state is not None:
|
||||
payload["state"] = state
|
||||
if self.show_dialog:
|
||||
payload["show_dialog"] = True
|
||||
|
||||
urlparams = urllibparse.urlencode(payload)
|
||||
|
||||
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
|
||||
|
||||
def parse_response_token(self, url, state=None):
|
||||
""" Parse the response code in the given response url """
|
||||
remote_state, token, t_type, exp_in = self.parse_auth_response_url(url)
|
||||
if state is None:
|
||||
state = self.state
|
||||
if state is not None and remote_state != state:
|
||||
raise SpotifyStateError(state, remote_state)
|
||||
return {"access_token": token, "token_type": t_type,
|
||||
"expires_in": exp_in, "state": state}
|
||||
|
||||
@staticmethod
|
||||
def parse_auth_response_url(url):
|
||||
url_components = urlparse(url)
|
||||
fragment_s = url_components.fragment
|
||||
query_s = url_components.query
|
||||
form = dict(i.split('=') for i
|
||||
in (fragment_s or query_s or url).split('&'))
|
||||
if "error" in form:
|
||||
raise SpotifyOauthError("Received error from auth server: "
|
||||
"{}".format(form["error"]),
|
||||
state=form["state"])
|
||||
if "expires_in" in form:
|
||||
form["expires_in"] = int(form["expires_in"])
|
||||
return tuple(form.get(param) for param in ["state", "access_token",
|
||||
"token_type", "expires_in"])
|
||||
|
||||
def _open_auth_url(self, state=None):
|
||||
auth_url = self.get_authorize_url(state)
|
||||
try:
|
||||
webbrowser.open(auth_url)
|
||||
logger.info("Opened %s in your browser", auth_url)
|
||||
except webbrowser.Error:
|
||||
logger.error("Please navigate here: %s", auth_url)
|
||||
|
||||
def get_auth_response(self, state=None):
|
||||
""" Gets a new auth **token** with user interaction """
|
||||
logger.info('User authentication requires interaction with your '
|
||||
'web browser. Once you enter your credentials and '
|
||||
'give authorization, you will be redirected to '
|
||||
'a url. Paste that url you were directed to to '
|
||||
'complete the authorization.')
|
||||
|
||||
redirect_info = urlparse(self.redirect_uri)
|
||||
redirect_host, redirect_port = get_host_port(redirect_info.netloc)
|
||||
# Implicit Grant tokens are returned in a hash fragment
|
||||
# which is only available to the browser. Therefore, interactive
|
||||
# URL retrival is required.
|
||||
if (redirect_host in ("127.0.0.1", "localhost")
|
||||
and redirect_info.scheme == "http" and redirect_port):
|
||||
logger.warning('Using a local redirect URI with a '
|
||||
'port, likely expecting automatic '
|
||||
'retrieval. Due to technical limitations, '
|
||||
'the authentication token cannot be '
|
||||
'automatically retrieved and must be '
|
||||
'copied and pasted.')
|
||||
|
||||
self._open_auth_url(state)
|
||||
logger.info('Paste that url you were directed to in order to '
|
||||
'complete the authorization')
|
||||
response = SpotifyImplicitGrant._get_user_input("Enter the URL you "
|
||||
"were redirected to: ")
|
||||
return self.parse_response_token(response, state)
|
||||
|
||||
def _add_custom_values_to_token_info(self, token_info):
|
||||
"""
|
||||
Store some values that aren't directly provided by a Web API
|
||||
response.
|
||||
"""
|
||||
token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
|
||||
token_info["scope"] = self.scope
|
||||
return token_info
|
||||
|
||||
|
||||
class RequestHandler(BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
state, auth_code, error = SpotifyOAuth.parse_oauth_response_url(self.path)
|
||||
self.server.auth_code = self.server.error = None
|
||||
try:
|
||||
state, auth_code = SpotifyOAuth.parse_auth_response_url(self.path)
|
||||
self.server.state = state
|
||||
self.server.auth_code = auth_code
|
||||
self.server.error = error
|
||||
except SpotifyOauthError as err:
|
||||
self.server.state = err.state
|
||||
self.server.error = err.error
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/html")
|
||||
@ -560,5 +819,6 @@ def start_local_http_server(port, handler=RequestHandler):
|
||||
server = HTTPServer(("127.0.0.1", port), handler)
|
||||
server.allow_reuse_address = True
|
||||
server.auth_code = None
|
||||
server.auth_token_form = None
|
||||
server.error = None
|
||||
return server
|
||||
|
||||
@ -26,7 +26,6 @@ def prompt_for_user_token(
|
||||
client_id=None,
|
||||
client_secret=None,
|
||||
redirect_uri=None,
|
||||
state=None,
|
||||
cache_path=None,
|
||||
oauth_manager=None,
|
||||
show_dialog=False
|
||||
@ -51,6 +50,7 @@ def prompt_for_user_token(
|
||||
- redirect_uri - the redirect URI of your app
|
||||
- cache_path - path to location to save tokens
|
||||
- oauth_manager - Oauth manager object.
|
||||
- show_dialog - If true, a login prompt always shows
|
||||
|
||||
"""
|
||||
if not oauth_manager:
|
||||
@ -85,7 +85,6 @@ def prompt_for_user_token(
|
||||
client_id,
|
||||
client_secret,
|
||||
redirect_uri,
|
||||
state=state,
|
||||
scope=scope,
|
||||
cache_path=cache_path,
|
||||
show_dialog=show_dialog
|
||||
|
||||
@ -5,6 +5,7 @@ from spotipy import (
|
||||
prompt_for_user_token,
|
||||
Spotify,
|
||||
SpotifyException,
|
||||
SpotifyImplicitGrant
|
||||
)
|
||||
import unittest
|
||||
import requests
|
||||
@ -395,3 +396,36 @@ class SpotipyPlayerApiTests(unittest.TestCase):
|
||||
after=res['cursors']['before'])
|
||||
self.assertLessEqual(len(res['items']), 50)
|
||||
self.assertGreater(res['items'][0]['played_at'], played_at)
|
||||
|
||||
|
||||
class SpotipyImplicitGrantTests(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
scope = (
|
||||
'user-follow-read '
|
||||
'user-follow-modify '
|
||||
)
|
||||
auth_manager = SpotifyImplicitGrant(scope=scope,
|
||||
cache_path=".cache-implicittest")
|
||||
cls.spotify = Spotify(auth_manager=auth_manager)
|
||||
|
||||
def test_user_follows_and_unfollows_artist(self):
|
||||
# Initially follows 1 artist
|
||||
current_user_followed_artists = self.spotify.current_user_followed_artists()[
|
||||
'artists']['total']
|
||||
|
||||
# Follow 2 more artists
|
||||
artists = ["6DPYiyq5kWVQS4RGwxzPC7", "0NbfKEOTQCcwd6o7wSDOHI"]
|
||||
self.spotify.user_follow_artists(artists)
|
||||
res = self.spotify.current_user_followed_artists()
|
||||
self.assertEqual(res['artists']['total'], current_user_followed_artists + len(artists))
|
||||
|
||||
# Unfollow these 2 artists
|
||||
self.spotify.user_unfollow_artists(artists)
|
||||
res = self.spotify.current_user_followed_artists()
|
||||
self.assertEqual(res['artists']['total'], current_user_followed_artists)
|
||||
|
||||
def test_current_user(self):
|
||||
c_user = self.spotify.current_user()
|
||||
user = self.spotify.user(c_user['id'])
|
||||
self.assertEqual(c_user['display_name'], user['display_name'])
|
||||
|
||||
@ -5,8 +5,9 @@ import unittest
|
||||
|
||||
import six.moves.urllib.parse as urllibparse
|
||||
|
||||
from spotipy import SpotifyOAuth
|
||||
from spotipy import SpotifyOAuth, SpotifyImplicitGrant
|
||||
from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOauthError
|
||||
from spotipy.oauth2 import SpotifyStateError
|
||||
|
||||
try:
|
||||
import unittest.mock as mock
|
||||
@ -41,6 +42,10 @@ def _make_oauth(*args, **kwargs):
|
||||
return SpotifyOAuth("CLID", "CLISEC", "REDIR", "STATE", *args, **kwargs)
|
||||
|
||||
|
||||
def _make_implicitgrantauth(*args, **kwargs):
|
||||
return SpotifyImplicitGrant("CLID", "REDIR", "STATE", *args, **kwargs)
|
||||
|
||||
|
||||
class OAuthCacheTest(unittest.TestCase):
|
||||
|
||||
@patch.multiple(SpotifyOAuth,
|
||||
@ -199,10 +204,7 @@ class TestSpotifyOAuthGetAuthResponseInteractive(unittest.TestCase):
|
||||
def test_get_auth_response_with_inconsistent_state(self, webbrowser_mock, get_user_input_mock):
|
||||
oauth = SpotifyOAuth("CLID", "CLISEC", "redir.io", state='wxyz')
|
||||
|
||||
with self.assertRaisesRegexp(
|
||||
SpotifyOauthError,
|
||||
"Received inconsistent state from OAuth server."
|
||||
):
|
||||
with self.assertRaises(SpotifyStateError):
|
||||
oauth.get_auth_response()
|
||||
|
||||
|
||||
@ -213,3 +215,121 @@ class TestSpotifyClientCredentials(unittest.TestCase):
|
||||
with self.assertRaises(SpotifyOauthError) as error:
|
||||
oauth.get_access_token()
|
||||
self.assertEqual(error.exception.error, 'invalid_client')
|
||||
|
||||
|
||||
class ImplicitGrantCacheTest(unittest.TestCase):
|
||||
|
||||
@patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT)
|
||||
@patch('spotipy.oauth2.open', create=True)
|
||||
def test_gets_from_cache_path(self, opener, is_token_expired):
|
||||
scope = "playlist-modify-private"
|
||||
path = ".cache-username"
|
||||
tok = _make_fake_token(1, 1, scope)
|
||||
|
||||
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
|
||||
is_token_expired.return_value = False
|
||||
|
||||
spot = _make_implicitgrantauth(scope, path)
|
||||
cached_tok = spot.get_cached_token()
|
||||
|
||||
opener.assert_called_with(path)
|
||||
self.assertIsNotNone(cached_tok)
|
||||
|
||||
@patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT)
|
||||
@patch('spotipy.oauth2.open', create=True)
|
||||
def test_expired_token_returns_none(self, opener, is_token_expired):
|
||||
scope = "playlist-modify-private"
|
||||
path = ".cache-username"
|
||||
expired_tok = _make_fake_token(0, None, scope)
|
||||
|
||||
token_file = _token_file(json.dumps(expired_tok, ensure_ascii=False))
|
||||
opener.return_value = token_file
|
||||
|
||||
spot = _make_implicitgrantauth(scope, path)
|
||||
cached_tok = spot.get_cached_token()
|
||||
|
||||
is_token_expired.assert_called_with(expired_tok)
|
||||
opener.assert_any_call(path)
|
||||
self.assertIsNone(cached_tok)
|
||||
|
||||
@patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT)
|
||||
@patch('spotipy.oauth2.open', create=True)
|
||||
def test_badly_scoped_token_bails(self, opener, is_token_expired):
|
||||
token_scope = "playlist-modify-public"
|
||||
requested_scope = "playlist-modify-private"
|
||||
path = ".cache-username"
|
||||
tok = _make_fake_token(1, 1, token_scope)
|
||||
|
||||
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
|
||||
is_token_expired.return_value = False
|
||||
|
||||
spot = _make_implicitgrantauth(requested_scope, path)
|
||||
cached_tok = spot.get_cached_token()
|
||||
|
||||
opener.assert_called_with(path)
|
||||
self.assertIsNone(cached_tok)
|
||||
|
||||
@patch('spotipy.oauth2.open', create=True)
|
||||
def test_saves_to_cache_path(self, opener):
|
||||
scope = "playlist-modify-private"
|
||||
path = ".cache-username"
|
||||
tok = _make_fake_token(1, 1, scope)
|
||||
|
||||
fi = _fake_file()
|
||||
opener.return_value = fi
|
||||
|
||||
spot = SpotifyImplicitGrant("CLID", "REDIR", "STATE", scope, path)
|
||||
spot._save_token_info(tok)
|
||||
|
||||
opener.assert_called_with(path, 'w')
|
||||
self.assertTrue(fi.write.called)
|
||||
|
||||
|
||||
class TestSpotifyImplicitGrant(unittest.TestCase):
|
||||
|
||||
def test_get_authorize_url_doesnt_pass_state_by_default(self):
|
||||
auth = SpotifyImplicitGrant("CLID", "REDIR")
|
||||
|
||||
url = auth.get_authorize_url()
|
||||
|
||||
parsed_url = urllibparse.urlparse(url)
|
||||
parsed_qs = urllibparse.parse_qs(parsed_url.query)
|
||||
self.assertNotIn('state', parsed_qs)
|
||||
|
||||
def test_get_authorize_url_passes_state_from_constructor(self):
|
||||
state = "STATE"
|
||||
auth = SpotifyImplicitGrant("CLID", "REDIR", state)
|
||||
|
||||
url = auth.get_authorize_url()
|
||||
|
||||
parsed_url = urllibparse.urlparse(url)
|
||||
parsed_qs = urllibparse.parse_qs(parsed_url.query)
|
||||
self.assertEqual(parsed_qs['state'][0], state)
|
||||
|
||||
def test_get_authorize_url_passes_state_from_func_call(self):
|
||||
state = "STATE"
|
||||
auth = SpotifyImplicitGrant("CLID", "REDIR", "NOT STATE")
|
||||
|
||||
url = auth.get_authorize_url(state=state)
|
||||
|
||||
parsed_url = urllibparse.urlparse(url)
|
||||
parsed_qs = urllibparse.parse_qs(parsed_url.query)
|
||||
self.assertEqual(parsed_qs['state'][0], state)
|
||||
|
||||
def test_get_authorize_url_does_not_show_dialog_by_default(self):
|
||||
auth = SpotifyImplicitGrant("CLID", "REDIR")
|
||||
|
||||
url = auth.get_authorize_url()
|
||||
|
||||
parsed_url = urllibparse.urlparse(url)
|
||||
parsed_qs = urllibparse.parse_qs(parsed_url.query)
|
||||
self.assertNotIn('show_dialog', parsed_qs)
|
||||
|
||||
def test_get_authorize_url_shows_dialog_when_requested(self):
|
||||
auth = SpotifyImplicitGrant("CLID", "REDIR", show_dialog=True)
|
||||
|
||||
url = auth.get_authorize_url()
|
||||
|
||||
parsed_url = urllibparse.urlparse(url)
|
||||
parsed_qs = urllibparse.parse_qs(parsed_url.query)
|
||||
self.assertTrue(parsed_qs['show_dialog'])
|
||||
|
||||
Loading…
Reference in New Issue
Block a user