Convert docstrings to reStructuredText (#1185)

* convert docstings to reStructuredText format for client.py

* convert docstings to reStructuredText format for cache_handler.py, exceptions.py, oauth2.py, scope.py, util.py

* fix formatting and inconsistencies

* add changelog
This commit is contained in:
Niko 2025-02-02 09:20:42 +01:00 committed by GitHub
parent 6083be8b1f
commit e13b10885a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1036 additions and 780 deletions

View File

@ -25,6 +25,7 @@ Rebasing master onto v3 doesn't require a changelog update.
- Removed the `client_credentials_manager` and `oauth_manager` parameters because they are redundant. - Removed the `client_credentials_manager` and `oauth_manager` parameters because they are redundant.
- Replaced the `set_auth` and `auth_manager` properties with standard attributes. - Replaced the `set_auth` and `auth_manager` properties with standard attributes.
- Replaced string concatenations and `str.format()` with f-strings - Replaced string concatenations and `str.format()` with f-strings
- Modified docstrings to use the reStructuredText format
### Fixed ### Fixed

View File

@ -48,21 +48,31 @@ class CacheHandler(ABC):
@abstractmethod @abstractmethod
def get_cached_token(self) -> TokenInfo | None: def get_cached_token(self) -> TokenInfo | None:
"""Get and return a token_info dictionary object.""" """
Get and return a token_info dictionary object.
:return: A token_info dictionary object or None if no token is cached.
"""
@abstractmethod @abstractmethod
def save_token_to_cache(self, token_info: TokenInfo) -> None: def save_token_to_cache(self, token_info: TokenInfo) -> None:
"""Save a token_info dictionary object to the cache and return None.""" """
Save a token_info dictionary object to the cache.
:param token_info: A token_info dictionary object to be cached.
"""
class CacheFileHandler(CacheHandler): class CacheFileHandler(CacheHandler):
"""Read and write cached Spotify authorization tokens as json files on disk.""" """
Read and write cached Spotify authorization tokens as json files on disk.
"""
def __init__( def __init__(
self, self,
cache_path: str | None = None, cache_path: str | None = None,
username: str | None = None, username: str | None = None,
encoder_cls: type[JSONEncoder] | None = None, encoder_cls: type[JSONEncoder] | None = None,
) -> None: ) -> None:
""" """
Initialize CacheFileHandler instance. Initialize CacheFileHandler instance.
@ -82,7 +92,11 @@ class CacheFileHandler(CacheHandler):
self.cache_path = cache_path self.cache_path = cache_path
def get_cached_token(self) -> TokenInfo | None: def get_cached_token(self) -> TokenInfo | None:
"""Get cached token from file.""" """
Get cached token from file.
:return: A token_info dictionary object or None if no token is cached.
"""
token_info: TokenInfo | None = None token_info: TokenInfo | None = None
try: try:
@ -100,7 +114,11 @@ class CacheFileHandler(CacheHandler):
return token_info return token_info
def save_token_to_cache(self, token_info: TokenInfo) -> None: def save_token_to_cache(self, token_info: TokenInfo) -> None:
"""Save token cache to file.""" """
Save token cache to file.
:param token_info: A token_info dictionary object to be cached.
"""
try: try:
f = open(self.cache_path, "w") f = open(self.cache_path, "w")
f.write(json.dumps(token_info, cls=self.encoder_cls)) f.write(json.dumps(token_info, cls=self.encoder_cls))
@ -110,22 +128,32 @@ class CacheFileHandler(CacheHandler):
class MemoryCacheHandler(CacheHandler): class MemoryCacheHandler(CacheHandler):
"""Cache handler that stores the token non-persistently as an instance attribute.""" """
Cache handler that stores the token non-persistently as an instance attribute.
"""
def __init__(self, token_info: TokenInfo | None = None) -> None: def __init__(self, token_info: TokenInfo | None = None) -> None:
""" """
Initialize MemoryCacheHandler instance. Initialize MemoryCacheHandler instance.
:param token_info: Optional initial cached token :param token_info: Optional initial cached token.
""" """
self.token_info = token_info self.token_info = token_info
def get_cached_token(self) -> TokenInfo | None: def get_cached_token(self) -> TokenInfo | None:
"""Retrieve the cached token from the instance.""" """
Retrieve the cached token from the instance.
:return: A token_info dictionary object or None if no token is cached.
"""
return self.token_info return self.token_info
def save_token_to_cache(self, token_info: TokenInfo) -> None: def save_token_to_cache(self, token_info: TokenInfo) -> None:
"""Cache the token in this instance.""" """
Cache the token in this instance.
:param token_info: A token_info dictionary object to be cached.
"""
self.token_info = token_info self.token_info = token_info
@ -139,13 +167,18 @@ class DjangoSessionCacheHandler(CacheHandler):
def __init__(self, request): def __init__(self, request):
""" """
Parameters: Initialize DjangoSessionCacheHandler instance.
* request: HttpRequest object provided by Django for every
incoming request :param request: HttpRequest object provided by Django for every incoming request.
""" """
self.request = request self.request = request
def get_cached_token(self): def get_cached_token(self):
"""
Retrieve the cached token from the Django session.
:return: A token_info dictionary object or None if no token is cached.
"""
token_info = None token_info = None
try: try:
token_info = self.request.session["token_info"] token_info = self.request.session["token_info"]
@ -155,6 +188,11 @@ class DjangoSessionCacheHandler(CacheHandler):
return token_info return token_info
def save_token_to_cache(self, token_info): def save_token_to_cache(self, token_info):
"""
Cache the token in the Django session.
:param token_info: A token_info dictionary object to be cached.
"""
try: try:
self.request.session["token_info"] = token_info self.request.session["token_info"] = token_info
except Exception as e: except Exception as e:
@ -164,13 +202,23 @@ class DjangoSessionCacheHandler(CacheHandler):
class FlaskSessionCacheHandler(CacheHandler): class FlaskSessionCacheHandler(CacheHandler):
""" """
A cache handler that stores the token info in the session framework A cache handler that stores the token info in the session framework
provided by flask. provided by Flask.
""" """
def __init__(self, session): def __init__(self, session):
"""
Initialize FlaskSessionCacheHandler instance.
:param session: Flask session object.
"""
self.session = session self.session = session
def get_cached_token(self): def get_cached_token(self):
"""
Retrieve the cached token from the Flask session.
:return: A token_info dictionary object or None if no token is cached.
"""
token_info = None token_info = None
try: try:
token_info = self.session["token_info"] token_info = self.session["token_info"]
@ -180,6 +228,11 @@ class FlaskSessionCacheHandler(CacheHandler):
return token_info return token_info
def save_token_to_cache(self, token_info): def save_token_to_cache(self, token_info):
"""
Cache the token in the Flask session.
:param token_info: A token_info dictionary object to be cached.
"""
try: try:
self.session["token_info"] = token_info self.session["token_info"] = token_info
except Exception as e: except Exception as e:
@ -187,20 +240,26 @@ class FlaskSessionCacheHandler(CacheHandler):
class RedisCacheHandler(CacheHandler): class RedisCacheHandler(CacheHandler):
"""A cache handler that stores the token info in the Redis.""" """
A cache handler that stores the token info in Redis.
"""
def __init__(self, redis_obj: redis.client.Redis, key: str | None = None) -> None: def __init__(self, redis_obj: redis.client.Redis, key: str | None = None) -> None:
""" """
Initialize RedisCacheHandler instance. Initialize RedisCacheHandler instance.
:param redis: The Redis object to function as the cache :param redis_obj: The Redis object to function as the cache.
:param key: (Optional) The key to used to store the token in the cache :param key: (Optional) The key used to store the token in the cache.
""" """
self.redis = redis_obj self.redis = redis_obj
self.key = key or "token_info" self.key = key or "token_info"
def get_cached_token(self) -> TokenInfo | None: def get_cached_token(self) -> TokenInfo | None:
"""Fetch cache token from the Redis.""" """
Fetch cached token from Redis.
:return: A token_info dictionary object or None if no token is cached.
"""
token_info = None token_info = None
try: try:
token_info = self.redis.get(self.key) token_info = self.redis.get(self.key)
@ -212,7 +271,11 @@ class RedisCacheHandler(CacheHandler):
return token_info return token_info
def save_token_to_cache(self, token_info: TokenInfo) -> None: def save_token_to_cache(self, token_info: TokenInfo) -> None:
"""Cache token in the Redis.""" """
Cache token in Redis.
:param token_info: A token_info dictionary object to be cached.
"""
try: try:
self.redis.set(self.key, json.dumps(token_info)) self.redis.set(self.key, json.dumps(token_info))
except RedisError as e: except RedisError as e:
@ -220,21 +283,26 @@ class RedisCacheHandler(CacheHandler):
class MemcacheCacheHandler(CacheHandler): class MemcacheCacheHandler(CacheHandler):
"""A Cache handler that stores the token info in Memcache using the pymemcache client """
A cache handler that stores the token info in Memcache using the pymemcache client.
""" """
def __init__(self, memcache, key=None) -> None: def __init__(self, memcache, key=None) -> None:
""" """
Parameters: Initialize MemcacheCacheHandler instance.
* memcache: memcache client object provided by pymemcache
(https://pymemcache.readthedocs.io/en/latest/getting_started.html) :param memcache: Memcache client object provided by pymemcache.
* key: May be supplied, will otherwise be generated :param key: (Optional) The key used to store the token in the cache.
(takes precedence over `token_info`)
""" """
self.memcache = memcache self.memcache = memcache
self.key = key or "token_info" self.key = key or "token_info"
def get_cached_token(self): def get_cached_token(self):
"""
Fetch cached token from Memcache.
:return: A token_info dictionary object or None if no token is cached.
"""
from pymemcache import MemcacheError from pymemcache import MemcacheError
try: try:
@ -245,6 +313,11 @@ class MemcacheCacheHandler(CacheHandler):
logger.warning(f"Error getting token to cache: {e}") logger.warning(f"Error getting token to cache: {e}")
def save_token_to_cache(self, token_info): def save_token_to_cache(self, token_info):
"""
Cache token in Memcache.
:param token_info: A token_info dictionary object to be cached.
"""
from pymemcache import MemcacheError from pymemcache import MemcacheError
try: try:

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,15 @@ class SpotifyBaseException(Exception):
class SpotifyException(SpotifyBaseException): class SpotifyException(SpotifyBaseException):
"""
Exception raised for Spotify API errors.
:param http_status: The HTTP status code returned by the API.
:param code: The specific error code returned by the API.
:param msg: The error message returned by the API.
:param reason: (Optional) The reason for the error.
:param headers: (Optional) The headers returned by the API.
"""
def __init__(self, http_status, code, msg, reason=None, headers=None): def __init__(self, http_status, code, msg, reason=None, headers=None):
self.http_status = http_status self.http_status = http_status
@ -22,7 +31,13 @@ class SpotifyException(SpotifyBaseException):
class SpotifyOauthError(SpotifyBaseException): class SpotifyOauthError(SpotifyBaseException):
""" Error during Auth Code or Implicit Grant flow """ """
Exception raised for errors during Auth Code or Implicit Grant flow.
:param message: The error message.
:param error: (Optional) The specific error code.
:param error_description: (Optional) A description of the error.
"""
def __init__(self, message, error=None, error_description=None, *args, **kwargs): def __init__(self, message, error=None, error_description=None, *args, **kwargs):
self.error = error self.error = error
@ -32,7 +47,15 @@ class SpotifyOauthError(SpotifyBaseException):
class SpotifyStateError(SpotifyOauthError): class SpotifyStateError(SpotifyOauthError):
""" The state sent and state received were different """ """
Exception raised when the state sent and state received are different.
:param local_state: The state sent.
:param remote_state: The state received.
:param message: (Optional) The error message.
:param error: (Optional) The specific error code.
:param error_description: (Optional) A description of the error.
"""
def __init__(self, local_state=None, remote_state=None, message=None, def __init__(self, local_state=None, remote_state=None, message=None,
error=None, error_description=None, *args, **kwargs): error=None, error_description=None, *args, **kwargs):

View File

@ -28,6 +28,13 @@ logger = logging.getLogger(__name__)
def _make_authorization_headers(client_id, client_secret): def _make_authorization_headers(client_id, client_secret):
"""
Create authorization headers for Spotify API requests.
:param client_id: The client ID provided by Spotify.
:param client_secret: The client secret provided by Spotify.
:return: A dictionary containing the authorization headers.
"""
auth_header = base64.b64encode( auth_header = base64.b64encode(
f"{client_id}:{client_secret}".encode("ascii") f"{client_id}:{client_secret}".encode("ascii")
) )
@ -35,6 +42,14 @@ def _make_authorization_headers(client_id, client_secret):
def _ensure_value(value, env_key): def _ensure_value(value, env_key):
"""
Ensure that a value is provided, either directly or via an environment variable.
:param value: The value to check.
:param env_key: The key for the environment variable to check if the value is not provided.
:return: The value or the value from the environment variable.
:raises SpotifyOauthError: If neither the value nor the environment variable is set.
"""
env_val = CLIENT_CREDS_ENV_VARS[env_key] env_val = CLIENT_CREDS_ENV_VARS[env_key]
_val = value or os.getenv(env_val) _val = value or os.getenv(env_val)
if _val is None: if _val is None:
@ -44,6 +59,11 @@ def _ensure_value(value, env_key):
class SpotifyAuthBase: class SpotifyAuthBase:
"""
Base class for Spotify authentication.
:param requests_session: A Requests session object or a boolean value to create one.
"""
def __init__(self, requests_session): def __init__(self, requests_session):
if isinstance(requests_session, requests.Session): if isinstance(requests_session, requests.Session):
@ -59,6 +79,10 @@ class SpotifyAuthBase:
Accepts a string of scopes, or an iterable with elements of type Accepts a string of scopes, or an iterable with elements of type
`Scope` or `str` and returns a space-separated string of scopes. `Scope` or `str` and returns a space-separated string of scopes.
Returns `None` if the argument is `None`. Returns `None` if the argument is `None`.
:param scope: A string or iterable of scopes.
:return: A space-separated string of scopes or `None`.
:raises TypeError: If the scope is not a string or iterable.
""" """
# TODO: do we need to sort the scopes? # TODO: do we need to sort the scopes?
@ -71,7 +95,7 @@ class SpotifyAuthBase:
if isinstance(scope, Iterable): if isinstance(scope, Iterable):
# Assume all of the iterable's elements are of the same type. # Assume all the iterable's elements are of the same type.
# If the iterable is empty, then return None. # If the iterable is empty, then return None.
first_element = next(iter(scope), None) first_element = next(iter(scope), None)
@ -120,11 +144,24 @@ class SpotifyAuthBase:
@staticmethod @staticmethod
def is_token_expired(token_info): def is_token_expired(token_info):
"""
Check if the token is expired.
:param token_info: The token information.
:return: `True` if the token is expired, `False` otherwise.
"""
now = int(time.time()) now = int(time.time())
return token_info["expires_at"] - now < 60 return token_info["expires_at"] - now < 60
@staticmethod @staticmethod
def _is_scope_subset(needle_scope, haystack_scope): def _is_scope_subset(needle_scope, haystack_scope):
"""
Check if one scope is a subset of another.
:param needle_scope: The scope to check.
:param haystack_scope: The scope to check against.
:return: `True` if `needle_scope` is a subset of `haystack_scope`, `False` otherwise.
"""
needle_scope = set(needle_scope.split()) if needle_scope else set() needle_scope = set(needle_scope.split()) if needle_scope else set()
haystack_scope = ( haystack_scope = (
set(haystack_scope.split()) if haystack_scope else set() set(haystack_scope.split()) if haystack_scope else set()
@ -132,6 +169,12 @@ class SpotifyAuthBase:
return needle_scope <= haystack_scope return needle_scope <= haystack_scope
def _handle_oauth_error(self, http_error): def _handle_oauth_error(self, http_error):
"""
Handle OAuth errors.
:param http_error: The HTTP error.
:raises SpotifyOauthError: If an OAuth error occurs.
"""
response = http_error.response response = http_error.response
try: try:
error_payload = response.json() error_payload = response.json()
@ -158,16 +201,19 @@ class SpotifyAuthBase:
class SpotifyClientCredentials(SpotifyAuthBase): class SpotifyClientCredentials(SpotifyAuthBase):
"""
Implements Client Credentials Flow for Spotify's OAuth implementation.
"""
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token" OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
def __init__( def __init__(
self, self,
client_id=None, client_id=None,
client_secret=None, client_secret=None,
cache_handler=None, cache_handler=None,
proxies=None, proxies=None,
requests_session=True, requests_session=True,
requests_timeout=None requests_timeout=None
): ):
""" """
Creates a Client Credentials Flow Manager. Creates a Client Credentials Flow Manager.
@ -176,26 +222,20 @@ class SpotifyClientCredentials(SpotifyAuthBase):
Only endpoints that do not access user information can be accessed. Only endpoints that do not access user information can be accessed.
This means that endpoints that require authorization scopes cannot be accessed. This means that endpoints that require authorization scopes cannot be accessed.
The advantage, however, of this authorization flow is that it does not require any The advantage, however, of this authorization flow is that it does not require any
user interaction user interaction.
You can either provide a client_id and client_secret to the
constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET
environment variables
Parameters:
* client_id: Must be supplied or set as environment variable
* client_secret: Must be supplied or set as environment variable
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
* proxies: Optional, proxy for the requests library to route through
* requests_session: A Requests session object or a true value to create one.
A false value disables sessions.
It should generally be a good idea to keep sessions enabled
for performance reasons (connection pooling).
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
:param client_id: Must be supplied or set as environment variable.
:param client_secret: Must be supplied or set as environment variable.
:param cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
:param proxies: Optional, proxy for the requests library to route through.
:param requests_session: A Requests session object or a true value to create one.
A false value disables sessions.
It should generally be a good idea to keep sessions enabled
for performance reasons (connection pooling).
:param requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds.
""" """
super().__init__(requests_session) super().__init__(requests_session)
@ -215,12 +255,12 @@ class SpotifyClientCredentials(SpotifyAuthBase):
def get_access_token(self, check_cache=True): def get_access_token(self, check_cache=True):
""" """
If a valid access token is in memory, returns it If a valid access token is in memory, returns it.
Else fetches a new token and returns it Else fetches a new token and returns it.
Parameters: :param check_cache: If true, checks for a locally stored token
- check_cache - if true, checks for a locally stored token before requesting a new token.
before requesting a new token. :return: The access token.
""" """
if check_cache: if check_cache:
@ -234,7 +274,12 @@ class SpotifyClientCredentials(SpotifyAuthBase):
return token_info["access_token"] return token_info["access_token"]
def _request_access_token(self): def _request_access_token(self):
"""Gets client credentials access token """ """
Gets client credentials access token.
:return: The token information.
:raises SpotifyOauthError: If an OAuth error occurs.
"""
payload = {"grant_type": "client_credentials"} payload = {"grant_type": "client_credentials"}
headers = _make_authorization_headers( headers = _make_authorization_headers(
@ -261,8 +306,10 @@ class SpotifyClientCredentials(SpotifyAuthBase):
def _add_custom_values_to_token_info(self, token_info): def _add_custom_values_to_token_info(self, token_info):
""" """
Store some values that aren't directly provided by a Web API Store some values that aren't directly provided by a Web API response.
response.
:param token_info: The token information.
:return: The token information with additional values.
""" """
token_info["expires_at"] = int(time.time()) + token_info["expires_in"] token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
return token_info return token_info
@ -290,29 +337,27 @@ class SpotifyOAuth(SpotifyAuthBase):
open_browser=True open_browser=True
): ):
""" """
Creates a SpotifyOAuth object Creates a SpotifyOAuth object.
Parameters: :param client_id: Must be supplied or set as environment variable.
* client_id: Must be supplied or set as environment variable :param client_secret: Must be supplied or set as environment variable.
* client_secret: Must be supplied or set as environment variable :param redirect_uri: Must be supplied or set as environment variable.
* redirect_uri: Must be supplied or set as environment variable :param state: Optional, no verification is performed.
* state: Optional, no verification is performed :param scope: Optional, either a string of scopes, or an iterable with elements of type
* scope: Optional, either a string of scopes, or an iterable with elements of type `Scope` or `str`. E.g., {Scope.user_modify_playback_state, Scope.user_library_read}.
`Scope` or `str`. E.g., :param cache_handler: An instance of the `CacheHandler` class to handle
{Scope.user_modify_playback_state, Scope.user_library_read} getting and saving cached authorization tokens.
* cache_handler: An instance of the `CacheHandler` class to handle Optional, will otherwise use `CacheFileHandler`.
getting and saving cached authorization tokens. :param proxies: Optional, proxy for the requests library to route through.
Optional, will otherwise use `CacheFileHandler`. :param show_dialog: Optional, interpreted as boolean.
* proxies: Optional, proxy for the requests library to route through :param requests_session: A Requests session object or a true value to create one.
* show_dialog: Optional, interpreted as boolean A false value disables sessions.
* requests_session: A Requests session object or a true value to create one. It should generally be a good idea to keep sessions enabled
A false value disables sessions. for performance reasons (connection pooling).
It should generally be a good idea to keep sessions enabled :param requests_timeout: Optional, tell Requests to stop waiting for a response after
for performance reasons (connection pooling). a given number of seconds.
* requests_timeout: Optional, tell Requests to stop waiting for a response after :param open_browser: Optional, whether the web browser should be opened to
a given number of seconds authorize a user.
* open_browser: Optional, whether the web browser should be opened to
authorize a user
""" """
super().__init__(requests_session) super().__init__(requests_session)
@ -338,6 +383,12 @@ class SpotifyOAuth(SpotifyAuthBase):
self.open_browser = open_browser self.open_browser = open_browser
def validate_token(self, token_info): def validate_token(self, token_info):
"""
Validate the token information.
:param token_info: The token information.
:return: The validated token information or None if invalid.
"""
if token_info is None: if token_info is None:
return None return None
@ -355,7 +406,11 @@ class SpotifyOAuth(SpotifyAuthBase):
return token_info return token_info
def get_authorize_url(self, state=None): def get_authorize_url(self, state=None):
""" Gets the URL to use to authorize this app """
Get the URL to use to authorize this app.
:param state: Optional, the state parameter.
:return: The authorization URL.
""" """
payload = { payload = {
"client_id": self.client_id, "client_id": self.client_id,
@ -376,16 +431,24 @@ class SpotifyOAuth(SpotifyAuthBase):
return f"{self.OAUTH_AUTHORIZE_URL}?{urlparams}" return f"{self.OAUTH_AUTHORIZE_URL}?{urlparams}"
def parse_response_code(self, url): def parse_response_code(self, url):
""" Parse the response code in the given response url """
Parse the response code in the given response URL.
Parameters: :param url: The response URL.
- url - the response url :return: The response code.
""" """
_, code = self.parse_auth_response_url(url) _, code = self.parse_auth_response_url(url)
return url if code is None else code return url if code is None else code
@staticmethod @staticmethod
def parse_auth_response_url(url): def parse_auth_response_url(url):
"""
Parse the authorization response URL.
:param url: The response URL.
:return: A tuple containing the state and code.
:raises SpotifyOauthError: If an error occurs.
"""
query_s = urlparse(url).query query_s = urlparse(url).query
form = dict(parse_qsl(query_s)) form = dict(parse_qsl(query_s))
if "error" in form: if "error" in form:
@ -407,6 +470,12 @@ class SpotifyOAuth(SpotifyAuthBase):
logger.error(f"Please navigate here: {auth_url}") logger.error(f"Please navigate here: {auth_url}")
def _get_auth_response_interactive(self, open_browser=False): def _get_auth_response_interactive(self, open_browser=False):
"""
Get the authorization response interactively.
:param open_browser: Whether to open the browser.
:return: The authorization code.
"""
if open_browser: if open_browser:
self._open_auth_url() self._open_auth_url()
prompt = "Enter the URL you were redirected to: " prompt = "Enter the URL you were redirected to: "
@ -423,6 +492,13 @@ class SpotifyOAuth(SpotifyAuthBase):
return code return code
def _get_auth_response_local_server(self, redirect_port): def _get_auth_response_local_server(self, redirect_port):
"""
Get the authorization response using a local server.
:param redirect_port: The port on which to start the server.
:return: The authorization code.
:raises SpotifyOauthError: If an error occurs.
"""
server = start_local_http_server(redirect_port) server = start_local_http_server(redirect_port)
self._open_auth_url() self._open_auth_url()
server.handle_request() server.handle_request()
@ -437,6 +513,12 @@ class SpotifyOAuth(SpotifyAuthBase):
raise SpotifyOauthError("Server listening on localhost has not been accessed") raise SpotifyOauthError("Server listening on localhost has not been accessed")
def get_auth_response(self, open_browser=None): def get_auth_response(self, open_browser=None):
"""
Get the authorization response.
:param open_browser: Whether to open the browser.
:return: The authorization code.
"""
logger.info('User authentication requires interaction with your ' logger.info('User authentication requires interaction with your '
'web browser. Once you enter your credentials and ' 'web browser. Once you enter your credentials and '
'give authorization, you will be redirected to ' 'give authorization, you will be redirected to '
@ -467,17 +549,24 @@ class SpotifyOAuth(SpotifyAuthBase):
return self._get_auth_response_interactive(open_browser=open_browser) return self._get_auth_response_interactive(open_browser=open_browser)
def get_authorization_code(self, response=None): def get_authorization_code(self, response=None):
"""
Get the authorization code.
:param response: The response URL.
:return: The authorization code.
"""
if response: if response:
return self.parse_response_code(response) return self.parse_response_code(response)
return self.get_auth_response() return self.get_auth_response()
def get_access_token(self, code=None, check_cache=True): def get_access_token(self, code=None, check_cache=True):
""" Gets the access token for the app given the code """
Get the access token for the app given the code.
Parameters: :param code: The response code.
- code - the response code :param check_cache: If true, checks for a locally stored token
- check_cache - if true, checks for a locally stored token before requesting a new token.
before requesting a new token :return: The access token.
""" """
if check_cache: if check_cache:
token_info = self.validate_token(self.cache_handler.get_cached_token()) token_info = self.validate_token(self.cache_handler.get_cached_token())
@ -521,6 +610,13 @@ class SpotifyOAuth(SpotifyAuthBase):
self._handle_oauth_error(http_error) self._handle_oauth_error(http_error)
def refresh_access_token(self, refresh_token): def refresh_access_token(self, refresh_token):
"""
Refresh the access token.
:param refresh_token: The refresh token.
:return: The new token information.
:raises SpotifyOauthError: If an error occurs.
"""
payload = { payload = {
"refresh_token": refresh_token, "refresh_token": refresh_token,
"grant_type": "refresh_token", "grant_type": "refresh_token",
@ -551,8 +647,10 @@ class SpotifyOAuth(SpotifyAuthBase):
def _add_custom_values_to_token_info(self, token_info): def _add_custom_values_to_token_info(self, token_info):
""" """
Store some values that aren't directly provided by a Web API Store some values that aren't directly provided by a Web API response.
response.
:param token_info: The token information.
:return: The token information with additional values.
""" """
token_info["expires_at"] = int(time.time()) + token_info["expires_in"] token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
token_info["scope"] = self.scope token_info["scope"] = self.scope
@ -560,7 +658,8 @@ class SpotifyOAuth(SpotifyAuthBase):
class SpotifyPKCE(SpotifyAuthBase): class SpotifyPKCE(SpotifyAuthBase):
""" Implements PKCE Authorization Flow for client apps """
Implements PKCE Authorization Flow for client apps.
This auth manager enables *user and non-user* endpoints with only This auth manager enables *user and non-user* endpoints with only
a client ID, redirect URI, and username. When the app requests a client ID, redirect URI, and username. When the app requests
@ -568,56 +667,43 @@ class SpotifyPKCE(SpotifyAuthBase):
authorize the new client app. After authorizing the app, the client authorize the new client app. After authorizing the app, the client
app is then given both access and refresh tokens. This is the app is then given both access and refresh tokens. This is the
preferred way of authorizing a mobile/desktop client. preferred way of authorizing a mobile/desktop client.
""" """
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize" OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token" OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
def __init__( def __init__(
self, self,
client_id=None, client_id=None,
redirect_uri=None, redirect_uri=None,
state=None, state=None,
scope=None, scope=None,
cache_handler=None, cache_handler=None,
proxies=None, proxies=None,
requests_timeout=None, requests_timeout=None,
requests_session=True, requests_session=True,
open_browser=True open_browser=True
): ):
""" """
Creates Auth Manager with the PKCE Auth flow. Creates Auth Manager with the PKCE Auth flow.
Parameters: :param client_id: Must be supplied or set as environment variable.
* client_id: Must be supplied or set as environment variable :param redirect_uri: Must be supplied or set as environment variable.
* redirect_uri: Must be supplied or set as environment variable :param state: Optional, no verification is performed.
* state: Optional, no verification is performed :param scope: Optional, either a string of scopes, or an iterable with elements of type
* scope: Optional, either a string of scopes, or an iterable with elements of type `Scope` or `str`. E.g., {Scope.user_modify_playback_state, Scope.user_library_read}.
`Scope` or `str`. E.g., :param cache_handler: An instance of the `CacheHandler` class to handle
{Scope.user_modify_playback_state, Scope.user_library_read} getting and saving cached authorization tokens.
* cache_path: (deprecated) Optional, will otherwise be generated Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `username`) :param proxies: Optional, proxy for the requests library to route through.
* username: (deprecated) Optional or set as environment variable :param requests_timeout: Optional, tell Requests to stop waiting for a response after
(will set `cache_path` to `.cache-{username}`) a given number of seconds.
* proxies: Optional, proxy for the requests library to route through :param requests_session: A Requests session object or a true value to create one.
* requests_timeout: Optional, tell Requests to stop waiting for a response after A false value disables sessions.
a given number of seconds It should generally be a good idea to keep sessions enabled
* requests_session: A Requests session for performance reasons (connection pooling).
* open_browser: Optional, whether the web browser should be opened to :param open_browser: Optional, whether the web browser should be opened to
authorize a user authorize a user.
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
* proxies: Optional, proxy for the requests library to route through
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* requests_session: A Requests session object or a true value to create one.
A false value disables sessions.
It should generally be a good idea to keep sessions enabled
for performance reasons (connection pooling).
* open_browser: Optional, whether the web browser should be opened to
authorize a user
""" """
super().__init__(requests_session) super().__init__(requests_session)
@ -644,9 +730,12 @@ class SpotifyPKCE(SpotifyAuthBase):
self.open_browser = open_browser self.open_browser = open_browser
def _get_code_verifier(self): def _get_code_verifier(self):
""" Spotify PCKE code verifier - See step 1 of the reference guide below """
Spotify PCKE code verifier - See step 1 of the reference guide below
Reference: Reference:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
:return: A code verifier string.
""" """
# Range (33,96) is used to select between 44-128 base64 characters for the # Range (33,96) is used to select between 44-128 base64 characters for the
# next operation. The range looks weird because base64 is 6 bytes # next operation. The range looks weird because base64 is 6 bytes
@ -658,9 +747,12 @@ class SpotifyPKCE(SpotifyAuthBase):
return secrets.token_urlsafe(length) return secrets.token_urlsafe(length)
def _get_code_challenge(self): def _get_code_challenge(self):
""" Spotify PCKE code challenge - See step 1 of the reference guide below """
Spotify PCKE code challenge - See step 1 of the reference guide below
Reference: Reference:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
:return: A code challenge string.
""" """
import base64 import base64
import hashlib import hashlib
@ -669,7 +761,12 @@ class SpotifyPKCE(SpotifyAuthBase):
return code_challenge.replace('=', '') return code_challenge.replace('=', '')
def get_authorize_url(self, state=None): def get_authorize_url(self, state=None):
""" Gets the URL to use to authorize this app """ """
Get the URL to use to authorize this app.
:param state: Optional, the state parameter.
:return: The authorization URL.
"""
if not self.code_challenge: if not self.code_challenge:
self.get_pkce_handshake_parameters() self.get_pkce_handshake_parameters()
payload = { payload = {
@ -755,11 +852,23 @@ class SpotifyPKCE(SpotifyAuthBase):
return code return code
def get_authorization_code(self, response=None): def get_authorization_code(self, response=None):
"""
Get the authorization code.
:param response: The response URL.
:return: The authorization code.
"""
if response: if response:
return self.parse_response_code(response) return self.parse_response_code(response)
return self._get_auth_response() return self._get_auth_response()
def validate_token(self, token_info): def validate_token(self, token_info):
"""
Validate the token information.
:param token_info: The token information.
:return: The validated token information or None if invalid.
"""
if token_info is None: if token_info is None:
return None return None
@ -778,27 +887,33 @@ class SpotifyPKCE(SpotifyAuthBase):
def _add_custom_values_to_token_info(self, token_info): def _add_custom_values_to_token_info(self, token_info):
""" """
Store some values that aren't directly provided by a Web API Store some values that aren't directly provided by a Web API response.
response.
:param token_info: The token information.
:return: The token information with additional values.
""" """
token_info["expires_at"] = int(time.time()) + token_info["expires_in"] token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
return token_info return token_info
def get_pkce_handshake_parameters(self): def get_pkce_handshake_parameters(self):
"""
Generate PKCE handshake parameters.
"""
self.code_verifier = self._get_code_verifier() self.code_verifier = self._get_code_verifier()
self.code_challenge = self._get_code_challenge() self.code_challenge = self._get_code_challenge()
def get_access_token(self, code=None, check_cache=True): def get_access_token(self, code=None, check_cache=True):
""" Gets the access token for the app """
Get the access token for the app.
If the code is not given and no cached token is used, an If the code is not given and no cached token is used, an
authentication window will be shown to the user to get a new authentication window will be shown to the user to get a new
code. code.
Parameters: :param code: The response code from authentication.
- code - the response code from authentication :param check_cache: If true, checks for a locally stored token
- check_cache - if true, checks for a locally stored token before requesting a new token.
before requesting a new token :return: The access token.
""" """
if check_cache: if check_cache:
@ -844,6 +959,13 @@ class SpotifyPKCE(SpotifyAuthBase):
self._handle_oauth_error(http_error) self._handle_oauth_error(http_error)
def refresh_access_token(self, refresh_token): def refresh_access_token(self, refresh_token):
"""
Refresh the access token.
:param refresh_token: The refresh token.
:return: The new token information.
:raises SpotifyOauthError: If an error occurs.
"""
payload = { payload = {
"refresh_token": refresh_token, "refresh_token": refresh_token,
"grant_type": "refresh_token", "grant_type": "refresh_token",
@ -874,21 +996,42 @@ class SpotifyPKCE(SpotifyAuthBase):
self._handle_oauth_error(http_error) self._handle_oauth_error(http_error)
def parse_response_code(self, url): def parse_response_code(self, url):
""" Parse the response code in the given response url """
Parse the response code in the given response URL.
Parameters: :param url: The response URL.
- url - the response url :return: The response code.
""" """
_, code = self.parse_auth_response_url(url) _, code = self.parse_auth_response_url(url)
return url if code is None else code return url if code is None else code
@staticmethod @staticmethod
def parse_auth_response_url(url): def parse_auth_response_url(url):
"""
Parse the authorization response URL.
:param url: The response URL.
:return: A tuple containing the state and code.
:raises SpotifyOauthError: If an error occurs.
"""
return SpotifyOAuth.parse_auth_response_url(url) return SpotifyOAuth.parse_auth_response_url(url)
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
"""
Handles HTTP GET requests for the local server used in OAuth authentication.
This handler processes the OAuth redirect response, extracting the authorization
code or error from the URL and sending an appropriate HTML response back to the client.
"""
def do_GET(self): def do_GET(self):
"""
Handle GET requests.
Parses the URL to extract the state and authorization code, or an error if present.
Sends an HTML response indicating the authentication status.
"""
self.server.auth_code = self.server.error = None self.server.auth_code = self.server.error = None
try: try:
state, auth_code = SpotifyOAuth.parse_auth_response_url(self.path) state, auth_code = SpotifyOAuth.parse_auth_response_url(self.path)
@ -931,6 +1074,13 @@ window.close()
def start_local_http_server(port, handler=RequestHandler): def start_local_http_server(port, handler=RequestHandler):
"""
Start a local HTTP server to handle OAuth redirects.
:param port: The port on which to start the server.
:param handler: The request handler class to use.
:return: An instance of the HTTPServer.
"""
server = HTTPServer(("127.0.0.1", port), handler) server = HTTPServer(("127.0.0.1", port), handler)
server.allow_reuse_address = True server.allow_reuse_address = True
server.auth_code = None server.auth_code = None

View File

@ -46,8 +46,11 @@ class Scope(Enum):
@staticmethod @staticmethod
def all() -> Set['Scope']: def all() -> Set['Scope']:
"""Returns all of the authorization scopes""" """
Returns all the authorization scopes.
:return: A set of all scopes.
"""
return set(Scope) return set(Scope)
@staticmethod @staticmethod
@ -55,24 +58,20 @@ class Scope(Enum):
""" """
Converts an iterable of scopes to a space-separated string. Converts an iterable of scopes to a space-separated string.
* scopes: An iterable of scopes. :param scopes: An iterable of scopes.
:return: A space-separated string of scopes.
returns: a space-separated string of scopes
""" """
return " ".join([scope.value for scope in scopes]) return " ".join([scope.value for scope in scopes])
@staticmethod @staticmethod
def from_string(scope_string: str) -> Set['Scope']: def from_string(scope_string: str) -> Set['Scope']:
""" """
Converts a string of (usuallly space-separated) scopes into a Converts a string of (usually space-separated) scopes into a set of scopes.
set of scopes
Any scope-strings that do not match any of the known scopes are Any scope-strings that do not match any of the known scopes are ignored.
ignored.
* scope_string: a string of scopes :param scope_string: A string of scopes.
:return: A set of scopes.
returns: a set of scopes.
""" """
scope_string_list = re.split(pattern=r"[^\w-]+", string=scope_string) scope_string_list = re.split(pattern=r"[^\w-]+", string=scope_string)
scopes = set() scopes = set()

View File

@ -21,11 +21,12 @@ CLIENT_CREDS_ENV_VARS = {
def get_host_port(netloc): def get_host_port(netloc):
""" Split the network location string into host and port and returns a tuple """
where the host is a string and the the port is an integer. Split the network location string into host and port and return a tuple
where the host is a string and the port is an integer.
Parameters: :param netloc: A string representing the network location.
- netloc - a string representing the network location. :return: A tuple containing the host and port.
""" """
if ":" in netloc: if ":" in netloc:
host, port = netloc.split(":", 1) host, port = netloc.split(":", 1)
@ -38,13 +39,14 @@ def get_host_port(netloc):
def normalize_scope(scope): def normalize_scope(scope):
"""Normalize the scope to verify that it is a list or tuple. A string """
Normalize the scope to verify that it is a list or tuple. A string
input will split the string by commas to create a list of scopes. input will split the string by commas to create a list of scopes.
A list or tuple input is used directly. A list or tuple input is used directly.
Parameters: :param scope: A string representing scopes separated by commas, or a list/tuple of scopes.
- scope - a string representing scopes separated by commas, :return: A space-separated string of scopes.
or a list/tuple of scopes. :raises TypeError: If the scope is not a string, list, or tuple.
""" """
if scope: if scope:
if isinstance(scope, str): if isinstance(scope, str):