* Added an exception clause that catches `FileNotFoundError` and logs a debug message in `SpotifyOAuth.get_cached_token`, `SpotifyPKCE.get_cached_token` and `SpotifyImplicitGrant.get_cached_token`.

* Changed docs for `auth` parameter of `Spotify.init` to `access token` instead of `authorization token`. In issue #599, a user confused the access token with the authorization code.

* Updated CHANGELOG.md

* Removed `FileNotFoundError` because it does not exist in python 2.7 (*sigh*) and replaced it with a call to `os.path.exists`.

* Replaced ` os.path.exists` with `error.errno == errno.ENOENT` to supress errors when the cache file does not exist.

* Changed docs for `search` to mention that you can provide multiple multiple types to search for. The query parameters of requests are now logged. Added log messages for when the access token and refresh tokens are retrieved and when they are refreshed. Other small grammar fixes.

* Removed duplicate word "multiple" from CHANGELOG

* * Fixed the bugs in `SpotifyOAuth.refresh_access_token` and `SpotifyPKCE.refresh_access_token` which raised the incorrect exception upon receiving an error response from the server. This addresses #645.
* Fixed a bug in `RequestHandler.do_GET` in which the non-existent `state` attribute of  `SpotifyOauthError` is accessed. This bug occurs when the user clicks "cancel" in the permissions dialog that opens in the browser.
* Cleaned up the documentation for `SpotifyClientCredentials.__init__`, `SpotifyOAuth.__init__`, and `SpotifyPKCE.__init__`.

* Removed unneeded import

* Added cache handler to `SpotifyClientCredentials` and fixed a bug in refresh tokens methods that raised the wrong exception (#655)

* Added an exception clause that catches `FileNotFoundError` and logs a debug message in `SpotifyOAuth.get_cached_token`, `SpotifyPKCE.get_cached_token` and `SpotifyImplicitGrant.get_cached_token`.

* Changed docs for `auth` parameter of `Spotify.init` to `access token` instead of `authorization token`. In issue #599, a user confused the access token with the authorization code.

* Updated CHANGELOG.md

* Removed `FileNotFoundError` because it does not exist in python 2.7 (*sigh*) and replaced it with a call to `os.path.exists`.

* Replaced ` os.path.exists` with `error.errno == errno.ENOENT` to supress errors when the cache file does not exist.

* Changed docs for `search` to mention that you can provide multiple multiple types to search for. The query parameters of requests are now logged. Added log messages for when the access token and refresh tokens are retrieved and when they are refreshed. Other small grammar fixes.

* Removed duplicate word "multiple" from CHANGELOG

* * Fixed the bugs in `SpotifyOAuth.refresh_access_token` and `SpotifyPKCE.refresh_access_token` which raised the incorrect exception upon receiving an error response from the server. This addresses #645.
* Fixed a bug in `RequestHandler.do_GET` in which the non-existent `state` attribute of  `SpotifyOauthError` is accessed. This bug occurs when the user clicks "cancel" in the permissions dialog that opens in the browser.
* Cleaned up the documentation for `SpotifyClientCredentials.__init__`, `SpotifyOAuth.__init__`, and `SpotifyPKCE.__init__`.

* Removed unneeded import

Co-authored-by: Stéphane Bruckert <stephane.bruckert@gmail.com>

* Made `CacheHandler` an abstract base class

Added:

* `Scope` - An enum which contains all of the authorization scopes (see [here](https://github.com/plamere/spotipy/issues/652#issuecomment-797461311)).

* Added the following endpoints
    * `Spotify.current_user_saved_episodes`
    * `Spotify.current_user_saved_episodes_add`
    * `Spotify.current_user_saved_episodes_delete`
    * `Spotify.current_user_saved_episodes_contains`
    * `Spotify.available_markets

* Fixed formatting issues. Removed python 2.7 from github workflows.

* Added python 3.9 to github workflows. The type hints for set now uses the generic typing.Set instead of builtins.set.

* Changed f-string to percent-formatted string.

* Fixed the duplicate "###Changed" section in the change log.

Co-authored-by: Stéphane Bruckert <stephane.bruckert@gmail.com>
This commit is contained in:
Peter Schorn 2021-04-10 06:37:22 -05:00 committed by GitHub
parent f7ae328501
commit 07fec53288
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 543 additions and 214 deletions

View File

@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [2.7, 3.5, 3.6, 3.7, 3.8]
python-version: [3.5, 3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2

View File

@ -5,27 +5,52 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased [3.0.0-alpha]
While this is unreleased, please only add v3 features here.
Rebasing master onto v3 doesn't require a changelog update.
### Added
* `Scope` - An enum which contains all of the authorization scopes (see [here](https://github.com/plamere/spotipy/issues/652#issuecomment-797461311)).
* Added the following endpoints
* `Spotify.current_user_saved_episodes`
* `Spotify.current_user_saved_episodes_add`
* `Spotify.current_user_saved_episodes_delete`
* `Spotify.current_user_saved_episodes_contains`
* `Spotify.available_markets`
### Changed
Modified the return structure of the `audio_features` function (wrapping the [Get Audio Features for Several Tracks](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-several-audio-features) API) to conform to the return structure of similar APIs, such as:
- [Get Several Tracks](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-several-tracks)
- [Get Multiple Artists](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-multiple-artists)
- [Get Multiple Albums](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-multiple-albums)
The functions wrapping these APIs do not unwrap the single key JSON response, and this is currently the only function that does this.
* Made `CacheHandler` an abstract base class
* Modified the return structure of the `audio_features` function (wrapping the [Get Audio Features for Several Tracks](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-several-audio-features) API) to conform to the return structure of the similar methods listed below. The functions wrapping these APIs do not unwrap the single key JSON response, and this is currently the only function that does this.
* [Get Several Tracks](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-several-tracks)
* [Get Multiple Artists](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-multiple-artists)
* [Get Multiple Albums](https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-multiple-albums)
## Unreleased [2.x.x]
### Added
- Enabled using both short and long IDs for playlist_change_details
- Added a cache handler to `SpotifyClientCredentials`
### Changed
- Add support for a list of scopes rather than just a comma separated string of scopes
### Fixed
* Fixed the bugs in `SpotifyOAuth.refresh_access_token` and `SpotifyPKCE.refresh_access_token` which raised the incorrect exception upon receiving an error response from the server. This addresses #645.
* Fixed a bug in `RequestHandler.do_GET` in which the non-existent `state` attribute of `SpotifyOauthError` is accessed. This bug occurs when the user clicks "cancel" in the permissions dialog that opens in the browser.
* Cleaned up the documentation for `SpotifyClientCredentials.__init__`, `SpotifyOAuth.__init__`, and `SpotifyPKCE.__init__`.
## [2.17.1] - 2021-02-28
### Fixed

View File

@ -3,3 +3,4 @@ from .client import * # noqa
from .exceptions import * # noqa
from .oauth2 import * # noqa
from .util import * # noqa
from .scope import * # noqa

View File

@ -1,35 +1,36 @@
# -*- coding: utf-8 -*-
__all__ = ['CacheHandler', 'CacheFileHandler']
import errno
import json
import logging
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
class CacheHandler():
class CacheHandler(ABC):
"""
An abstraction layer for handling the caching and retrieval of
authorization tokens.
Custom extensions of this class must implement get_cached_token
and save_token_to_cache methods with the same input and output
structure as the CacheHandler class.
Clients are expected to subclass this class and override the
get_cached_token and save_token_to_cache methods with the same
type signatures of this class.
"""
@abstractmethod
def get_cached_token(self):
"""
Get and return a token_info dictionary object.
"""
# return token_info
raise NotImplementedError()
@abstractmethod
def save_token_to_cache(self, token_info):
"""
Save a token_info dictionary object to the cache and return None.
"""
raise NotImplementedError()
return None
class CacheFileHandler(CacheHandler):

View File

@ -1171,16 +1171,184 @@ class Spotify(object):
"""
return self._get("me/player/currently-playing")
def current_user_saved_tracks(self, limit=20, offset=0):
def current_user_saved_albums(self, limit=20, offset=0, market=None):
""" Gets a list of the albums saved in the current authorized user's
"Your Music" library
Parameters:
- limit - the number of albums to return
- offset - the index of the first album to return
- market - an ISO 3166-1 alpha-2 country code.
"""
return self._get("me/albums", limit=limit, offset=offset, market=market)
def current_user_saved_albums_add(self, albums=[]):
""" Add one or more albums to the current user's
"Your Music" library.
Parameters:
- albums - a list of album URIs, URLs or IDs
"""
alist = [self._get_id("album", a) for a in albums]
return self._put("me/albums?ids=" + ",".join(alist))
def current_user_saved_albums_delete(self, albums=[]):
""" Remove one or more albums from the current user's
"Your Music" library.
Parameters:
- albums - a list of album URIs, URLs or IDs
"""
alist = [self._get_id("album", a) for a in albums]
return self._delete("me/albums/?ids=" + ",".join(alist))
def current_user_saved_albums_contains(self, albums=[]):
""" Check if one or more albums is already saved in
the current Spotify users Your Music library.
Parameters:
- albums - a list of album URIs, URLs or IDs
"""
alist = [self._get_id("album", a) for a in albums]
return self._get("me/albums/contains?ids=" + ",".join(alist))
def current_user_saved_tracks(self, limit=20, offset=0, market=None):
""" Gets a list of the tracks saved in the current authorized user's
"Your Music" library
Parameters:
- limit - the number of tracks to return
- offset - the index of the first track to return
- market - an ISO 3166-1 alpha-2 country code
"""
return self._get("me/tracks", limit=limit, offset=offset)
return self._get("me/tracks", limit=limit, offset=offset, market=market)
def current_user_saved_tracks_add(self, tracks=None):
""" Add one or more tracks to the current user's
"Your Music" library.
Parameters:
- tracks - a list of track URIs, URLs or IDs
"""
tlist = []
if tracks is not None:
tlist = [self._get_id("track", t) for t in tracks]
return self._put("me/tracks/?ids=" + ",".join(tlist))
def current_user_saved_tracks_delete(self, tracks=None):
""" Remove one or more tracks from the current user's
"Your Music" library.
Parameters:
- tracks - a list of track URIs, URLs or IDs
"""
tlist = []
if tracks is not None:
tlist = [self._get_id("track", t) for t in tracks]
return self._delete("me/tracks/?ids=" + ",".join(tlist))
def current_user_saved_tracks_contains(self, tracks=None):
""" Check if one or more tracks is already saved in
the current Spotify users Your Music library.
Parameters:
- tracks - a list of track URIs, URLs or IDs
"""
tlist = []
if tracks is not None:
tlist = [self._get_id("track", t) for t in tracks]
return self._get("me/tracks/contains?ids=" + ",".join(tlist))
def current_user_saved_episodes(self, limit=20, offset=0, market=None):
""" Gets a list of the episodes saved in the current authorized user's
"Your Music" library
Parameters:
- limit - the number of episodes to return
- offset - the index of the first episode to return
- market - an ISO 3166-1 alpha-2 country code
"""
return self._get("me/episodes", limit=limit, offset=offset, market=market)
def current_user_saved_episodes_add(self, episodes=None):
""" Add one or more episodes to the current user's
"Your Music" library.
Parameters:
- episodes - a list of episode URIs, URLs or IDs
"""
elist = []
if episodes is not None:
elist = [self._get_id("episode", e) for e in episodes]
return self._put("me/episodes/?ids=" + ",".join(elist))
def current_user_saved_episodes_delete(self, episodes=None):
""" Remove one or more episodes from the current user's
"Your Music" library.
Parameters:
- episodes - a list of episode URIs, URLs or IDs
"""
elist = []
if episodes is not None:
elist = [self._get_id("episode", e) for e in episodes]
return self._delete("me/episodes/?ids=" + ",".join(elist))
def current_user_saved_episodes_contains(self, episodes=None):
""" Check if one or more episodes is already saved in
the current Spotify users Your Music library.
Parameters:
- episodes - a list of episode URIs, URLs or IDs
"""
elist = []
if episodes is not None:
elist = [self._get_id("episode", e) for e in episodes]
return self._get("me/episodes/contains?ids=" + ",".join(elist))
def current_user_saved_shows(self, limit=20, offset=0, market=None):
""" Gets a list of the shows saved in the current authorized user's
"Your Music" library
Parameters:
- limit - the number of shows to return
- offset - the index of the first show to return
- market - an ISO 3166-1 alpha-2 country code
"""
return self._get("me/shows", limit=limit, offset=offset, market=market)
def current_user_saved_shows_add(self, shows=[]):
""" Add one or more albums to the current user's
"Your Music" library.
Parameters:
- shows - a list of show URIs, URLs or IDs
"""
slist = [self._get_id("show", s) for s in shows]
return self._put("me/shows?ids=" + ",".join(slist))
def current_user_saved_shows_delete(self, shows=[]):
""" Remove one or more shows from the current user's
"Your Music" library.
Parameters:
- shows - a list of show URIs, URLs or IDs
"""
slist = [self._get_id("show", s) for s in shows]
return self._delete("me/shows/?ids=" + ",".join(slist))
def current_user_saved_shows_contains(self, shows=[]):
""" Check if one or more shows is already saved in
the current Spotify users Your Music library.
Parameters:
- shows - a list of show URIs, URLs or IDs
"""
slist = [self._get_id("show", s) for s in shows]
return self._get("me/shows/contains?ids=" + ",".join(slist))
def current_user_followed_artists(self, limit=20, after=None):
""" Gets a list of the artists followed by the current authorized user
@ -1225,42 +1393,6 @@ class Spotify(object):
"me/following/contains", ids=",".join(idlist), type="user"
)
def current_user_saved_tracks_delete(self, tracks=None):
""" Remove one or more tracks from the current user's
"Your Music" library.
Parameters:
- tracks - a list of track URIs, URLs or IDs
"""
tlist = []
if tracks is not None:
tlist = [self._get_id("track", t) for t in tracks]
return self._delete("me/tracks/?ids=" + ",".join(tlist))
def current_user_saved_tracks_contains(self, tracks=None):
""" Check if one or more tracks is already saved in
the current Spotify users Your Music library.
Parameters:
- tracks - a list of track URIs, URLs or IDs
"""
tlist = []
if tracks is not None:
tlist = [self._get_id("track", t) for t in tracks]
return self._get("me/tracks/contains?ids=" + ",".join(tlist))
def current_user_saved_tracks_add(self, tracks=None):
""" Add one or more tracks to the current user's
"Your Music" library.
Parameters:
- tracks - a list of track URIs, URLs or IDs
"""
tlist = []
if tracks is not None:
tlist = [self._get_id("track", t) for t in tracks]
return self._put("me/tracks/?ids=" + ",".join(tlist))
def current_user_top_artists(
self, limit=20, offset=0, time_range="medium_term"
):
@ -1310,86 +1442,6 @@ class Spotify(object):
before=before,
)
def current_user_saved_albums(self, limit=20, offset=0):
""" Gets a list of the albums saved in the current authorized user's
"Your Music" library
Parameters:
- limit - the number of albums to return
- offset - the index of the first album to return
"""
return self._get("me/albums", limit=limit, offset=offset)
def current_user_saved_albums_contains(self, albums=[]):
""" Check if one or more albums is already saved in
the current Spotify users Your Music library.
Parameters:
- albums - a list of album URIs, URLs or IDs
"""
alist = [self._get_id("album", a) for a in albums]
return self._get("me/albums/contains?ids=" + ",".join(alist))
def current_user_saved_albums_add(self, albums=[]):
""" Add one or more albums to the current user's
"Your Music" library.
Parameters:
- albums - a list of album URIs, URLs or IDs
"""
alist = [self._get_id("album", a) for a in albums]
return self._put("me/albums?ids=" + ",".join(alist))
def current_user_saved_albums_delete(self, albums=[]):
""" Remove one or more albums from the current user's
"Your Music" library.
Parameters:
- albums - a list of album URIs, URLs or IDs
"""
alist = [self._get_id("album", a) for a in albums]
return self._delete("me/albums/?ids=" + ",".join(alist))
def current_user_saved_shows(self, limit=50, offset=0):
""" Gets a list of the shows saved in the current authorized user's
"Your Music" library
Parameters:
- limit - the number of shows to return
- offset - the index of the first show to return
"""
return self._get("me/shows", limit=limit, offset=offset)
def current_user_saved_shows_contains(self, shows=[]):
""" Check if one or more shows is already saved in
the current Spotify users Your Music library.
Parameters:
- shows - a list of show URIs, URLs or IDs
"""
slist = [self._get_id("show", s) for s in shows]
return self._get("me/shows/contains?ids=" + ",".join(slist))
def current_user_saved_shows_add(self, shows=[]):
""" Add one or more albums to the current user's
"Your Music" library.
Parameters:
- shows - a list of show URIs, URLs or IDs
"""
slist = [self._get_id("show", s) for s in shows]
return self._put("me/shows?ids=" + ",".join(slist))
def current_user_saved_shows_delete(self, shows=[]):
""" Remove one or more shows from the current user's
"Your Music" library.
Parameters:
- shows - a list of show URIs, URLs or IDs
"""
slist = [self._get_id("show", s) for s in shows]
return self._delete("me/shows/?ids=" + ",".join(slist))
def user_follow_artists(self, ids=[]):
""" Follow one or more artists
Parameters:
@ -1824,6 +1876,14 @@ class Spotify(object):
return self._post(endpoint)
def available_markets(self):
""" Get the list of markets where Spotify is available.
Returns a list of the countries in which Spotify is available, identified by their
ISO 3166-1 alpha-2 country code with additional country codes for special territories.
"""
return self._get("markets")
def _append_device_id(self, path, device_id):
""" Append device ID to API path.

View File

@ -24,8 +24,10 @@ from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from six.moves.urllib_parse import parse_qsl, urlparse
from spotipy.cache_handler import CacheFileHandler, CacheHandler
from spotipy.exceptions import SpotifyException
from spotipy.util import CLIENT_CREDS_ENV_VARS, get_host_port, normalize_scope
from spotipy.util import CLIENT_CREDS_ENV_VARS, get_host_port
from spotipy.scope import Scope
from typing import Iterable
import re
logger = logging.getLogger(__name__)
@ -73,6 +75,7 @@ def _ensure_value(value, env_key):
class SpotifyAuthBase(object):
def __init__(self, requests_session):
if isinstance(requests_session, requests.Session):
self._session = requests_session
@ -84,7 +87,40 @@ class SpotifyAuthBase(object):
self._session = api
def _normalize_scope(self, scope):
return normalize_scope(scope)
"""
Accepts a string of scopes, or an iterable with elements of type
`Scope` or `str` and returns a space-separated string of scopes.
Returns `None` if the argument is `None`.
"""
# TODO: do we need to sort the scopes?
if isinstance(scope, str):
# allow for any separator(s) between the scopes other than a word
# character or a hyphen
scopes = re.split(pattern=r"[^\w-]+", string=scope)
return " ".join(sorted(scopes))
if isinstance(scope, Iterable):
# Assume all of the iterable's elements are of the same type.
# If the iterable is empty, then return None.
first_element = next(iter(scope), None)
if isinstance(first_element, str):
return " ".join(sorted(scope))
if isinstance(first_element, Scope):
return Scope.make_string(scope)
if first_element is None:
return ""
elif scope is None:
return None
raise TypeError(
"Unsupported type for scopes: %s. Expected either a string of scopes, or "
"an Iterable with elements of type `Scope` or `str`." % type(scope)
)
@property
def client_id(self):
@ -139,27 +175,57 @@ class SpotifyAuthBase(object):
class SpotifyClientCredentials(SpotifyAuthBase):
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
def __init__(self,
client_id=None,
client_secret=None,
proxies=None,
requests_session=True,
requests_timeout=None):
def __init__(
self,
client_id=None,
client_secret=None,
proxies=None,
requests_session=True,
requests_timeout=None,
cache_handler=None
):
"""
Creates a Client Credentials Flow Manager.
The Client Credentials flow is used in server-to-server authentication.
Only endpoints that do not access user information can be accessed.
This means that endpoints that require authorization scopes cannot be accessed.
The advantage, however, of this authorization flow is that it does not require any
user interaction
You can either provide a client_id and client_secret to the
constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET
environment variables
Parameters:
* client_id: Must be supplied or set as environment variable
* client_secret: Must be supplied or set as environment variable
* proxies: Optional, proxy for the requests library to route through
* requests_session: A Requests session
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
"""
super(SpotifyClientCredentials, self).__init__(requests_session)
self.client_id = client_id
self.client_secret = client_secret
self.token_info = None
self.proxies = proxies
self.requests_timeout = requests_timeout
if cache_handler:
assert issubclass(cache_handler.__class__, CacheHandler), \
"cache_handler must be a subclass of CacheHandler: " + str(type(cache_handler)) \
+ " != " + str(CacheHandler)
self.cache_handler = cache_handler
else:
self.cache_handler = CacheFileHandler()
def get_access_token(self, as_dict=True):
def get_access_token(self, as_dict=True, check_cache=True):
"""
If a valid access token is in memory, returns it
Else feches a new token and returns it
@ -179,13 +245,15 @@ class SpotifyClientCredentials(SpotifyAuthBase):
stacklevel=2,
)
if self.token_info and not self.is_token_expired(self.token_info):
return self.token_info if as_dict else self.token_info["access_token"]
if check_cache:
token_info = self.cache_handler.get_cached_token()
if token_info and not self.is_token_expired(token_info):
return token_info if as_dict else token_info["access_token"]
token_info = self._request_access_token()
token_info = self._add_custom_values_to_token_info(token_info)
self.token_info = token_info
return self.token_info["access_token"]
self.cache_handler.save_token_to_cache(token_info)
return token_info if as_dict else token_info["access_token"]
def _request_access_token(self):
"""Gets client credentials access token """
@ -258,22 +326,27 @@ class SpotifyOAuth(SpotifyAuthBase):
* client_secret: Must be supplied or set as environment variable
* redirect_uri: Must be supplied or set as environment variable
* state: Optional, no verification is performed
* scope: Optional, either a list of scopes or comma separated string of scopes.
* scope: Optional, either a string of scopes, or an iterable with elements of type
`Scope` or `str`. E.g.,
{Scope.user_modify_playback_state, Scope.user_library_read}
iterable of scopes or comma separated string of scopes.
e.g, "playlist-read-private,playlist-read-collaborative"
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
* cache_path: (deprecated) Optional, will otherwise be generated
(takes precedence over `username`)
* username: (deprecated) Optional or set as environment variable
(will set `cache_path` to `.cache-{username}`)
* show_dialog: Optional, interpreted as boolean
* proxies: Optional, proxy for the requests library to route through
* show_dialog: Optional, interpreted as boolean
* requests_session: A Requests session
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* open_browser: Optional, whether or not the web browser should be opened to
authorize a user
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
"""
super(SpotifyOAuth, self).__init__(requests_session)
@ -414,7 +487,7 @@ class SpotifyOAuth(SpotifyAuthBase):
if server.auth_code is not None:
return server.auth_code
elif server.error is not None:
raise SpotifyOauthError("Received error from OAuth server: {}".format(server.error))
raise server.error
else:
raise SpotifyOauthError("Server listening on localhost has not been accessed")
@ -432,7 +505,7 @@ class SpotifyOAuth(SpotifyAuthBase):
open_browser = self.open_browser
if (
(open_browser or self.open_browser)
open_browser
and redirect_host in ("127.0.0.1", "localhost")
and redirect_info.scheme == "http"
):
@ -539,20 +612,13 @@ class SpotifyOAuth(SpotifyAuthBase):
timeout=self.requests_timeout,
)
try:
response.raise_for_status()
except BaseException:
logger.error('Couldn\'t refresh token. Response Status Code: %s '
'Reason: %s', response.status_code, response.reason)
message = "Couldn't refresh token: code:%d reason:%s" % (
response.status_code,
response.reason,
)
raise SpotifyException(response.status_code,
-1,
message,
headers)
if response.status_code != 200:
error_payload = response.json()
raise SpotifyOauthError(
'error: {0}, error_description: {1}'.format(
error_payload['error'], error_payload['error_description']),
error=error_payload['error'],
error_description=error_payload['error_description'])
token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info)
@ -623,25 +689,25 @@ class SpotifyPKCE(SpotifyAuthBase):
Parameters:
* client_id: Must be supplied or set as environment variable
* client_secret: Must be supplied or set as environment variable
* redirect_uri: Must be supplied or set as environment variable
* state: Optional, no verification is performed
* scope: Optional, either a list of scopes or comma separated string of scopes.
e.g, "playlist-read-private,playlist-read-collaborative"
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
* scope: Optional, either a string of scopes, or an iterable with elements of type
`Scope` or `str`. E.g.,
{Scope.user_modify_playback_state, Scope.user_library_read}
* cache_path: (deprecated) Optional, will otherwise be generated
(takes precedence over `username`)
* username: (deprecated) Optional or set as environment variable
(will set `cache_path` to `.cache-{username}`)
* show_dialog: Optional, interpreted as boolean
* proxies: Optional, proxy for the requests library to route through
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* requests_session: A Requests session
* open_browser: Optional, thether or not the web browser should be opened to
authorize a user
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
"""
super(SpotifyPKCE, self).__init__(requests_session)
@ -921,20 +987,13 @@ class SpotifyPKCE(SpotifyAuthBase):
timeout=self.requests_timeout,
)
try:
response.raise_for_status()
except BaseException:
logger.error('Couldn\'t refresh token. Response Status Code: %s '
'Reason: %s', response.status_code, response.reason)
message = "Couldn't refresh token: code:%d reason:%s" % (
response.status_code,
response.reason,
)
raise SpotifyException(response.status_code,
-1,
message,
headers)
if response.status_code != 200:
error_payload = response.json()
raise SpotifyOauthError(
'error: {0}, error_description: {1}'.format(
error_payload['error'], error_payload['error_description']),
error=error_payload['error'],
error_description=error_payload['error_description'])
token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info)
@ -1033,8 +1092,9 @@ class SpotifyImplicitGrant(SpotifyAuthBase):
* client_id: Must be supplied or set as environment variable
* redirect_uri: Must be supplied or set as environment variable
* state: May be supplied, no verification is performed
* scope: Optional, either a list of scopes or comma separated string of scopes.
e.g, "playlist-read-private,playlist-read-collaborative"
* scope: Optional, either a string of scopes, or an iterable with elements of type
`Scope` or `str`. E.g.,
{Scope.user_modify_playback_state, Scope.user_library_read}
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
May be supplied, will otherwise use `CacheFileHandler`.
@ -1246,9 +1306,8 @@ class RequestHandler(BaseHTTPRequestHandler):
state, auth_code = SpotifyOAuth.parse_auth_response_url(self.path)
self.server.state = state
self.server.auth_code = auth_code
except SpotifyOauthError as err:
self.server.state = err.state
self.server.error = err.error
except SpotifyOauthError as error:
self.server.error = error
self.send_response(200)
self.send_header("Content-Type", "text/html")

85
spotipy/scope.py Normal file
View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
__all__ = ["Scope"]
from enum import Enum
import re
from typing import Iterable, Set
class Scope(Enum):
"""
The Spotify authorization scopes
Create a Scope from a string:
scope = Scope("playlist-modify-private")
Create a set of scopes:
scopes = {
Scope.user_read_currently_playing,
Scope.playlist_read_collaborative,
Scope.playlist_modify_public
}
"""
user_read_currently_playing = "user-read-currently-playing"
playlist_read_collaborative = "playlist-read-collaborative"
playlist_modify_private = "playlist-modify-private"
user_read_playback_position = "user-read-playback-position"
user_library_modify = "user-library-modify"
user_top_read = "user-top-read"
user_read_playback_state = "user-read-playback-state"
user_read_email = "user-read-email"
ugc_image_upload = "ugc-image-upload"
user_read_private = "user-read-private"
playlist_modify_public = "playlist-modify-public"
user_library_read = "user-library-read"
streaming = "streaming"
user_read_recently_played = "user-read-recently-played"
user_follow_read = "user-follow-read"
user_follow_modify = "user-follow-modify"
app_remote_control = "app-remote-control"
playlist_read_private = "playlist-read-private"
user_modify_playback_state = "user-modify-playback-state"
@staticmethod
def all() -> Set['Scope']:
"""Returns all of the authorization scopes"""
return set(Scope)
@staticmethod
def make_string(scopes: Iterable['Scope']) -> str:
"""
Converts an iterable of scopes to a space-separated string.
* scopes: An iterable of scopes.
returns: a space-separated string of scopes
"""
return " ".join([scope.value for scope in scopes])
@staticmethod
def from_string(scope_string: str) -> Set['Scope']:
"""
Converts a string of (usuallly space-separated) scopes into a
set of scopes
Any scope-strings that do not match any of the known scopes are
ignored.
* scope_string: a string of scopes
returns: a set of scopes.
"""
scope_string_list = re.split(pattern=r"[^\w-]+", string=scope_string)
scopes = set()
for scope_string in sorted(scope_string_list):
try:
scope = Scope(scope_string)
scopes.add(scope)
except ValueError:
pass
return scopes

View File

@ -7,7 +7,6 @@ __all__ = ["CLIENT_CREDS_ENV_VARS", "prompt_for_user_token"]
import logging
import os
import warnings
import spotipy
LOGGER = logging.getLogger(__name__)
@ -117,19 +116,3 @@ def get_host_port(netloc):
port = None
return host, port
def normalize_scope(scope):
if scope:
if isinstance(scope, str):
scopes = scope.split(',')
elif isinstance(scope, list) or isinstance(scope, tuple):
scopes = scope
else:
raise Exception(
"Unsupported scope value, please either provide a list of scopes, "
"or a string of scopes separated by commas"
)
return " ".join(sorted(scopes))
else:
return None

View File

@ -364,3 +364,9 @@ class AuthTestSpotipy(unittest.TestCase):
self.assertNotIsInstance(with_no_session._session, requests.Session)
user = with_no_session.user(user="akx")
self.assertEqual(user["uri"], "spotify:user:akx")
def test_available_markets(self):
markets = self.spotify.available_markets()["markets"]
self.assertTrue(isinstance(markets, list))
self.assertIn("US", markets)
self.assertIn("GB", markets)

View File

@ -209,6 +209,10 @@ class SpotipyLibraryApiTests(unittest.TestCase):
"http://open.spotify.com/track/3cySlItpiPiIAzU3NyHCJf"]
cls.album_ids = ["spotify:album:6kL09DaURb7rAoqqaA51KU",
"spotify:album:6RTzC0rDbvagTSJLlY7AKl"]
cls.episode_ids = [
"spotify:episode:3OEdPEYB69pfXoBrhvQYeC",
"spotify:episode:5LEFdZ9pYh99wSz7Go2D0g"
]
cls.username = os.getenv(CCEV['client_username'])
scope = (
@ -267,6 +271,21 @@ class SpotipyLibraryApiTests(unittest.TestCase):
resp = self.spotify.current_user_saved_albums_contains(self.album_ids)
self.assertEqual(resp, [False, False])
def test_current_user_saved_episodes(self):
# Add
self.spotify.current_user_saved_episodes_add(self.episode_ids)
episodes = self.spotify.current_user_saved_episodes(market="US")
self.assertGreaterEqual(len(episodes['items']), 2)
# Contains
resp = self.spotify.current_user_saved_episodes_contains(self.episode_ids)
self.assertEqual(resp, [True, True])
# Remove
self.spotify.current_user_saved_episodes_delete(self.episode_ids)
resp = self.spotify.current_user_saved_episodes_contains(self.episode_ids)
self.assertEqual(resp, [False, False])
class SpotipyUserApiTests(unittest.TestCase):
@classmethod

90
tests/unit/test_scopes.py Normal file
View File

@ -0,0 +1,90 @@
from unittest import TestCase
from spotipy.scope import Scope
from spotipy.oauth2 import SpotifyAuthBase
class SpotipyScopeTest(TestCase):
@classmethod
def setUpClass(cls):
cls.auth_manager = SpotifyAuthBase(requests_session=True)
def normalize_scope(self, scope):
return self.auth_manager._normalize_scope(scope)
def test_empty_scope(self):
scopes = set()
scope_string = Scope.make_string(scopes)
normalized_scope_string = self.normalize_scope(scopes)
normalized_scope_string_2 = self.normalize_scope(scope_string)
self.assertEqual(scope_string, "")
self.assertEqual(normalized_scope_string, "")
self.assertEqual(normalized_scope_string_2, "")
converted_scopes = Scope.from_string(scope_string)
self.assertEqual(converted_scopes, set())
def test_scopes(self):
scopes = {
Scope.playlist_modify_public,
Scope.playlist_read_collaborative,
Scope.user_read_playback_state,
Scope.ugc_image_upload
}
normalized_scope_string = self.normalize_scope(scopes)
scope_string = Scope.make_string(scopes)
self.assertEqual(scope_string, normalized_scope_string)
normalized_scope_string_2 = self.normalize_scope(scope_string)
converted_scopes = Scope.from_string(scope_string)
normalized_converted_scope = Scope.from_string(normalized_scope_string)
normalized_converted_scope_2 = Scope.from_string(normalized_scope_string_2)
self.assertEqual(scopes, converted_scopes)
self.assertEqual(scopes, normalized_converted_scope)
self.assertEqual(scopes, normalized_converted_scope_2)
def test_single_scope(self):
scope_string = "user-modify-playback-state"
scope = Scope(scope_string)
self.assertEqual(scope, Scope.user_modify_playback_state)
self.assertEqual(scope_string, scope.value)
def test_scope_string(self):
scope_string = (
"user-read-currently-playing playlist-read-collaborative,user-library-read "
"playlist-read-private user-read-email"
)
expected_scopes = {
Scope.user_read_currently_playing,
Scope.playlist_read_collaborative,
Scope.user_library_read,
Scope.playlist_read_private,
Scope.user_read_email
}
parsed_scopes = Scope.from_string(scope_string)
normalized_scope_string = self.normalize_scope(scope_string)
normalized_parsed_scopes = Scope.from_string(normalized_scope_string)
self.assertEqual(parsed_scopes, expected_scopes)
self.assertEqual(normalized_parsed_scopes, expected_scopes)
def test_invalid_types(self):
numbers = [1, 2, 3]
with self.assertRaises(TypeError):
self.normalize_scope(numbers)
with self.assertRaises(TypeError):
self.normalize_scope(True)
def test_normalize_scope(self):
normalized_scope_string = self.normalize_scope([])
self.assertEqual(normalized_scope_string, "")
normalized_scope_string_2 = self.normalize_scope(())
self.assertEqual(normalized_scope_string_2, "")
self.assertIsNone(self.normalize_scope(None))