Auto-refresh AuthCode flow token. (#435)

* Auto-refresh AuthCode flow token.

* Reformatted.

* Reformatted.

* Removed invalid syntax.

* Removed abstract class SpotifyAuthManager.

* Fix typo on docstrings.

* Optionally try to fetch main input parameters from environment.

Implements the capability of trying to fetch the following parameters from the environment, when they're not directly passed to the authorization handler.
The affected parameters are: client_id, client_secret, redirect_uri.
An SpotifyOauthError is raised if no value gets found.

* Removed f-string for Python2 compatibility.

* Fix line-too-long.

* Remove useless import.

* Add username to docstring.

* Remove redundant return.

* Fix empty lines print statement for backward compatibility with Python2.

* Update simple4 example.

* Set optional 'as_dict' parameter on OAuth 'get_access_token'.

* Update changelog.

Co-authored-by: Stéphane Bruckert <stephane.bruckert@gmail.com>
This commit is contained in:
Stefano 2020-02-10 11:20:02 +01:00 committed by GitHub
parent 3b0e8febc4
commit 4515446a65
5 changed files with 715 additions and 437 deletions

View File

@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Support for `playlist_cover_image` - Support for `playlist_cover_image`
- Support `after` and `before` parameter in `current_user_recently_played` - Support `after` and `before` parameter in `current_user_recently_played`
- CI for unit tests - CI for unit tests
- Automatic `token` refresh
- `auth_manager` and `oauth_manager` optional parameters added to `Spotify`'s init.
- Optional `username` parameter to be passed to SpotifyOAuth, to infer a `cache_path` automatically
- Optional `as_dict` parameter to control `SpotifyOAuth`'s `get_access_token` output type. However, this is going to be deprecated in the future, and the method will always return a token string
### Changed
- Both `SpotifyClientCredentials` and `SpotifyOAuth` inherit from a common `SpotifyAuthBase` which handles common parameters and logics.
## [2.7.1] - 2020-01-20 ## [2.7.1] - 2020-01-20

12
examples/simple4.py Normal file
View File

@ -0,0 +1,12 @@
import spotipy
import os
from pprint import pprint
def main():
spotify = spotipy.Spotify(auth_manager=spotipy.SpotifyOAuth())
me = spotify.me()
pprint(me)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@ -3,10 +3,10 @@
from __future__ import print_function from __future__ import print_function
__all__ = [ __all__ = [
'is_token_expired', "is_token_expired",
'SpotifyClientCredentials', "SpotifyClientCredentials",
'SpotifyOAuth', "SpotifyOAuth",
'SpotifyOauthError' "SpotifyOauthError",
] ]
import base64 import base64
@ -14,8 +14,10 @@ import json
import os import os
import sys import sys
import time import time
import warnings
import requests import requests
from spotipy.util import CLIENT_CREDS_ENV_VARS
# Workaround to support both python 2 & 3 # Workaround to support both python 2 & 3
import six import six
@ -28,19 +30,56 @@ class SpotifyOauthError(Exception):
def _make_authorization_headers(client_id, client_secret): def _make_authorization_headers(client_id, client_secret):
auth_header = base64.b64encode( auth_header = base64.b64encode(
six.text_type( six.text_type(client_id + ":" + client_secret).encode("ascii")
'{}:{}'.format(client_id, client_secret) )
).encode('ascii')) return {"Authorization": "Basic %s" % auth_header.decode("ascii")}
return {'Authorization': 'Basic %s' % auth_header.decode('ascii')}
def is_token_expired(token_info): def is_token_expired(token_info):
now = int(time.time()) now = int(time.time())
return token_info['expires_at'] - now < 60 return token_info["expires_at"] - now < 60
class SpotifyClientCredentials(object): def _ensure_value(value, env_key):
OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' env_val = CLIENT_CREDS_ENV_VARS[env_key]
_val = value or os.getenv(env_val)
if _val is None:
msg = "No %s. Pass it or set a %s environment variable." % (
env_key,
env_val,
)
raise SpotifyOauthError(msg)
return _val
class SpotifyAuthBase(object):
@property
def client_id(self):
return self._client_id
@client_id.setter
def client_id(self, val):
self._client_id = _ensure_value(val, "client_id")
@property
def client_secret(self):
return self._client_secret
@client_secret.setter
def client_secret(self, val):
self._client_secret = _ensure_value(val, "client_secret")
@property
def redirect_uri(self):
return self._redirect_uri
@redirect_uri.setter
def redirect_uri(self, val):
self._redirect_uri = _ensure_value(val, "redirect_uri")
class SpotifyClientCredentials(SpotifyAuthBase):
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
def __init__(self, client_id=None, client_secret=None, proxies=None): def __init__(self, client_id=None, client_secret=None, proxies=None):
""" """
@ -48,17 +87,6 @@ class SpotifyClientCredentials(object):
constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET
environment variables environment variables
""" """
if not client_id:
client_id = os.getenv('SPOTIPY_CLIENT_ID')
if not client_secret:
client_secret = os.getenv('SPOTIPY_CLIENT_SECRET')
if not client_id:
raise SpotifyOauthError('No client id')
if not client_secret:
raise SpotifyOauthError('No client secret')
self.client_id = client_id self.client_id = client_id
self.client_secret = client_secret self.client_secret = client_secret
@ -71,23 +99,28 @@ class SpotifyClientCredentials(object):
Else feches a new token and returns it Else feches a new token and returns it
""" """
if self.token_info and not self.is_token_expired(self.token_info): if self.token_info and not self.is_token_expired(self.token_info):
return self.token_info['access_token'] return self.token_info["access_token"]
token_info = self._request_access_token() token_info = self._request_access_token()
token_info = self._add_custom_values_to_token_info(token_info) token_info = self._add_custom_values_to_token_info(token_info)
self.token_info = token_info self.token_info = token_info
return self.token_info['access_token'] return self.token_info["access_token"]
def _request_access_token(self): def _request_access_token(self):
"""Gets client credentials access token """ """Gets client credentials access token """
payload = {'grant_type': 'client_credentials'} payload = {"grant_type": "client_credentials"}
headers = _make_authorization_headers( headers = _make_authorization_headers(
self.client_id, self.client_secret) self.client_id, self.client_secret
)
response = requests.post(self.OAUTH_TOKEN_URL, data=payload, response = requests.post(
headers=headers, verify=True, self.OAUTH_TOKEN_URL,
proxies=self.proxies) data=payload,
headers=headers,
verify=True,
proxies=self.proxies,
)
if response.status_code != 200: if response.status_code != 200:
raise SpotifyOauthError(response.reason) raise SpotifyOauthError(response.reason)
token_info = response.json() token_info = response.json()
@ -101,21 +134,30 @@ class SpotifyClientCredentials(object):
Store some values that aren't directly provided by a Web API Store some values that aren't directly provided by a Web API
response. response.
""" """
token_info['expires_at'] = int(time.time()) + token_info['expires_in'] token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
return token_info return token_info
class SpotifyOAuth(object): class SpotifyOAuth(SpotifyAuthBase):
''' """
Implements Authorization Code Flow for Spotify's OAuth implementation. Implements Authorization Code Flow for Spotify's OAuth implementation.
''' """
OAUTH_AUTHORIZE_URL = 'https://accounts.spotify.com/authorize' OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
def __init__(self, client_id, client_secret, redirect_uri, def __init__(
state=None, scope=None, cache_path=None, proxies=None): self,
''' client_id=None,
client_secret=None,
redirect_uri=None,
state=None,
scope=None,
cache_path=None,
username=None,
proxies=None,
):
"""
Creates a SpotifyOAuth object Creates a SpotifyOAuth object
Parameters: Parameters:
@ -125,20 +167,32 @@ class SpotifyOAuth(object):
- state - security state - state - security state
- scope - the desired scope of the request - scope - the desired scope of the request
- cache_path - path to location to save tokens - cache_path - path to location to save tokens
''' - username - username of current client
"""
self.client_id = client_id self.client_id = client_id
self.client_secret = client_secret self.client_secret = client_secret
self.redirect_uri = redirect_uri self.redirect_uri = redirect_uri
self.state = state self.state = state
self.cache_path = cache_path self.cache_path = cache_path
self.username = username or os.getenv(
CLIENT_CREDS_ENV_VARS["client_username"]
)
self.scope = self._normalize_scope(scope) self.scope = self._normalize_scope(scope)
self.proxies = proxies self.proxies = proxies
def get_cached_token(self): def get_cached_token(self):
''' Gets a cached auth token """ Gets a cached auth token
''' """
token_info = None token_info = None
if not self.cache_path and self.username:
self.cache_path = ".cache-" + str(self.username)
elif not self.cache_path and not self.username:
raise SpotifyOauthError(
"You must either set a cache_path or a username."
)
if self.cache_path: if self.cache_path:
try: try:
f = open(self.cache_path) f = open(self.cache_path)
@ -147,13 +201,15 @@ class SpotifyOAuth(object):
token_info = json.loads(token_info_string) token_info = json.loads(token_info_string)
# if scopes don't match, then bail # if scopes don't match, then bail
if 'scope' not in token_info or not self._is_scope_subset( if "scope" not in token_info or not self._is_scope_subset(
self.scope, token_info['scope']): self.scope, token_info["scope"]
):
return None return None
if self.is_token_expired(token_info): if self.is_token_expired(token_info):
token_info = self.refresh_access_token( token_info = self.refresh_access_token(
token_info['refresh_token']) token_info["refresh_token"]
)
except IOError: except IOError:
pass pass
@ -162,7 +218,7 @@ class SpotifyOAuth(object):
def _save_token_info(self, token_info): def _save_token_info(self, token_info):
if self.cache_path: if self.cache_path:
try: try:
f = open(self.cache_path, 'w') f = open(self.cache_path, "w")
f.write(json.dumps(token_info)) f.write(json.dumps(token_info))
f.close() f.close()
except IOError: except IOError:
@ -171,8 +227,9 @@ class SpotifyOAuth(object):
def _is_scope_subset(self, needle_scope, haystack_scope): def _is_scope_subset(self, needle_scope, haystack_scope):
needle_scope = set(needle_scope.split()) if needle_scope else set() needle_scope = set(needle_scope.split()) if needle_scope else set()
haystack_scope = set( haystack_scope = (
haystack_scope.split()) if haystack_scope else set() set(haystack_scope.split()) if haystack_scope else set()
)
return needle_scope <= haystack_scope return needle_scope <= haystack_scope
def is_token_expired(self, token_info): def is_token_expired(self, token_info):
@ -181,17 +238,19 @@ class SpotifyOAuth(object):
def get_authorize_url(self, state=None, show_dialog=False): def get_authorize_url(self, state=None, show_dialog=False):
""" Gets the URL to use to authorize this app """ Gets the URL to use to authorize this app
""" """
payload = {'client_id': self.client_id, payload = {
'response_type': 'code', "client_id": self.client_id,
'redirect_uri': self.redirect_uri} "response_type": "code",
"redirect_uri": self.redirect_uri,
}
if self.scope: if self.scope:
payload['scope'] = self.scope payload["scope"] = self.scope
if state is None: if state is None:
state = self.state state = self.state
if state is not None: if state is not None:
payload['state'] = state payload["state"] = state
if show_dialog: if show_dialog:
payload['show_dialog'] = True payload["show_dialog"] = True
urlparams = urllibparse.urlencode(payload) urlparams = urllibparse.urlencode(payload)
@ -212,70 +271,138 @@ class SpotifyOAuth(object):
def _make_authorization_headers(self): def _make_authorization_headers(self):
return _make_authorization_headers(self.client_id, self.client_secret) return _make_authorization_headers(self.client_id, self.client_secret)
def get_access_token(self, code): def get_auth_response(self):
print(
"""
User authentication requires interaction with your
web browser. Once you enter your credentials and
give authorization, you will be redirected to
a url. Paste that url you were directed to to
complete the authorization.
"""
)
auth_url = self.get_authorize_url()
try:
import webbrowser
webbrowser.open(auth_url)
print("Opened %s in your browser" % auth_url)
except BaseException:
print("Please navigate here: %s" % auth_url)
print("")
print("")
try:
response = raw_input("Enter the URL you were redirected to: ")
except NameError:
response = input("Enter the URL you were redirected to: ")
print("")
print("")
return response
def get_authorization_code(self, response=None):
return self.parse_response_code(response or self.get_auth_response())
def get_access_token(self, code=None, as_dict=True):
""" Gets the access token for the app given the code """ Gets the access token for the app given the code
Parameters: Parameters:
- code - the response code - code - the response code
- as_dict - a boolean indicating if returning the access token
as a token_info dictionary, otherwise it will be returned
as a string.
""" """
if as_dict:
print("")
warnings.warn(
"You're using 'as_dict = True'."
"get_access_token will return the token string directly in future "
"versions. Please adjust your code accordingly, or use "
"get_cached_token instead.",
DeprecationWarning,
stacklevel=2,
)
print("")
token_info = self.get_cached_token()
if token_info is not None:
if is_token_expired(token_info):
token_info = self.refresh_access_token(
token_info["refresh_token"]
)
return token_info if as_dict else token_info["access_token"]
payload = {'redirect_uri': self.redirect_uri, payload = {
'code': code, "redirect_uri": self.redirect_uri,
'grant_type': 'authorization_code'} "code": code or self.get_authorization_code(),
"grant_type": "authorization_code",
}
if self.scope: if self.scope:
payload['scope'] = self.scope payload["scope"] = self.scope
if self.state: if self.state:
payload['state'] = self.state payload["state"] = self.state
headers = self._make_authorization_headers() headers = self._make_authorization_headers()
response = requests.post(self.OAUTH_TOKEN_URL, data=payload, response = requests.post(
headers=headers, verify=True, self.OAUTH_TOKEN_URL,
proxies=self.proxies) data=payload,
headers=headers,
verify=True,
proxies=self.proxies,
)
if response.status_code != 200: if response.status_code != 200:
raise SpotifyOauthError(response.reason) raise SpotifyOauthError(response.reason)
token_info = response.json() token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info) token_info = self._add_custom_values_to_token_info(token_info)
self._save_token_info(token_info) self._save_token_info(token_info)
return token_info return token_info if as_dict else token_info["access_token"]
def _normalize_scope(self, scope): def _normalize_scope(self, scope):
if scope: if scope:
scopes = sorted(scope.split()) scopes = sorted(scope.split())
return ' '.join(scopes) return " ".join(scopes)
else: else:
return None return None
def refresh_access_token(self, refresh_token): def refresh_access_token(self, refresh_token):
payload = {'refresh_token': refresh_token, payload = {
'grant_type': 'refresh_token'} "refresh_token": refresh_token,
"grant_type": "refresh_token",
}
headers = self._make_authorization_headers() headers = self._make_authorization_headers()
response = requests.post(self.OAUTH_TOKEN_URL, data=payload, response = requests.post(
headers=headers, proxies=self.proxies) self.OAUTH_TOKEN_URL,
data=payload,
headers=headers,
proxies=self.proxies,
)
if response.status_code != 200: if response.status_code != 200:
if False: # debugging code if False: # debugging code
print('headers', headers) print("headers", headers)
print('request', response.url) print("request", response.url)
self._warn("couldn't refresh token: code:%d reason:%s" self._warn(
% (response.status_code, response.reason)) "couldn't refresh token: code:%d reason:%s"
% (response.status_code, response.reason)
)
return None return None
token_info = response.json() token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info) token_info = self._add_custom_values_to_token_info(token_info)
if 'refresh_token' not in token_info: if "refresh_token" not in token_info:
token_info['refresh_token'] = refresh_token token_info["refresh_token"] = refresh_token
self._save_token_info(token_info) self._save_token_info(token_info)
return token_info return token_info
def _add_custom_values_to_token_info(self, token_info): def _add_custom_values_to_token_info(self, token_info):
''' """
Store some values that aren't directly provided by a Web API Store some values that aren't directly provided by a Web API
response. response.
''' """
token_info['expires_at'] = int(time.time()) + token_info['expires_in'] token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
token_info['scope'] = self.scope token_info["scope"] = self.scope
return token_info return token_info
def _warn(self, msg): def _warn(self, msg):
print('warning:' + msg, file=sys.stderr) print("warning:" + msg, file=sys.stderr)

View File

@ -4,29 +4,30 @@
from __future__ import print_function from __future__ import print_function
__all__ = [ __all__ = ["CLIENT_CREDS_ENV_VARS", "prompt_for_user_token"]
'CLIENT_CREDS_ENV_VARS',
'prompt_for_user_token'
]
import os import os
from . import oauth2
import spotipy import spotipy
CLIENT_CREDS_ENV_VARS = { CLIENT_CREDS_ENV_VARS = {
'client_id': 'SPOTIPY_CLIENT_ID', "client_id": "SPOTIPY_CLIENT_ID",
'client_secret': 'SPOTIPY_CLIENT_SECRET', "client_secret": "SPOTIPY_CLIENT_SECRET",
'client_username': 'SPOTIPY_CLIENT_USERNAME', "client_username": "SPOTIPY_CLIENT_USERNAME",
'redirect_uri': 'SPOTIPY_REDIRECT_URI' "redirect_uri": "SPOTIPY_REDIRECT_URI",
} }
def prompt_for_user_token(username, scope=None, client_id=None, def prompt_for_user_token(
client_secret=None, redirect_uri=None, username,
cache_path=None): scope=None,
''' prompts the user to login if necessary and returns client_id=None,
client_secret=None,
redirect_uri=None,
cache_path=None,
oauth_manager=None,
):
""" prompts the user to login if necessary and returns
the user token suitable for use with the spotipy.Spotify the user token suitable for use with the spotipy.Spotify
constructor constructor
@ -38,35 +39,44 @@ def prompt_for_user_token(username, scope=None, client_id=None,
- client_secret - the client secret of your app - client_secret - the client secret of your app
- redirect_uri - the redirect URI of your app - redirect_uri - the redirect URI of your app
- cache_path - path to location to save tokens - cache_path - path to location to save tokens
- oauth_manager - Oauth manager object.
''' """
if not oauth_manager:
if not client_id:
client_id = os.getenv("SPOTIPY_CLIENT_ID")
if not client_id: if not client_secret:
client_id = os.getenv('SPOTIPY_CLIENT_ID') client_secret = os.getenv("SPOTIPY_CLIENT_SECRET")
if not client_secret: if not redirect_uri:
client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') redirect_uri = os.getenv("SPOTIPY_REDIRECT_URI")
if not redirect_uri: if not client_id:
redirect_uri = os.getenv('SPOTIPY_REDIRECT_URI') print(
"""
You need to set your Spotify API credentials.
You can do this by setting environment variables like so:
if not client_id: export SPOTIPY_CLIENT_ID='your-spotify-client-id'
print(''' export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret'
You need to set your Spotify API credentials. You can do this by export SPOTIPY_REDIRECT_URI='your-app-redirect-url'
setting environment variables like so:
export SPOTIPY_CLIENT_ID='your-spotify-client-id' Get your credentials at
export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret' https://developer.spotify.com/my-applications
export SPOTIPY_REDIRECT_URI='your-app-redirect-url' """
)
raise spotipy.SpotifyException(550, -1, "no credentials set")
Get your credentials at cache_path = cache_path or ".cache-" + username
https://developer.spotify.com/my-applications
''')
raise spotipy.SpotifyException(550, -1, 'no credentials set')
cache_path = cache_path or ".cache-" + username sp_oauth = oauth_manager or spotipy.SpotifyOAuth(
sp_oauth = oauth2.SpotifyOAuth(client_id, client_secret, redirect_uri, client_id,
scope=scope, cache_path=cache_path) client_secret,
redirect_uri,
scope=scope,
cache_path=cache_path,
)
# try to get a valid token for this user, from the cache, # try to get a valid token for this user, from the cache,
# if not in the cache, the create a new (this will send # if not in the cache, the create a new (this will send
@ -75,37 +85,14 @@ def prompt_for_user_token(username, scope=None, client_id=None,
token_info = sp_oauth.get_cached_token() token_info = sp_oauth.get_cached_token()
if not token_info: if not token_info:
print(''' url = sp_oauth.get_auth_response()
code = sp_oauth.parse_response_code(url)
token = sp_oauth.get_access_token(code, as_dict=False)
else:
return token_info["access_token"]
User authentication requires interaction with your
web browser. Once you enter your credentials and
give authorization, you will be redirected to
a url. Paste that url you were directed to to
complete the authorization.
''')
auth_url = sp_oauth.get_authorize_url()
try:
import webbrowser
webbrowser.open(auth_url)
print("Opened %s in your browser" % auth_url)
except BaseException:
print("Please navigate here: %s" % auth_url)
print()
print()
try:
response = raw_input("Enter the URL you were redirected to: ")
except NameError:
response = input("Enter the URL you were redirected to: ")
print()
print()
code = sp_oauth.parse_response_code(response)
token_info = sp_oauth.get_access_token(code)
# Auth'ed API request # Auth'ed API request
if token_info: if token:
return token_info['access_token'] return token
else: else:
return None return None