Renamed the auth parameter of Spotify.__init__ to access_token for better clarity.

Removed the `client_credentials_manager` and `oauth_manager` parameters because they are redundant.

Replaced the `set_auth` and `auth_manager` properties with standard attributes.

Removed the following deprecated methods from `Spotify`:
* `playlist_tracks`
* `user_playlist`
* `user_playlist_tracks`
* `user_playlist_change_details`
* `user_playlist_unfollow`
* `user_playlist_add_tracks`
* `user_playlist_replace_tracks`
* `user_playlist_reorder_tracks`
* `user_playlist_remove_all_occurrences_of_tracks`
* `user_playlist_remove_specific_occurrences_of_tracks`
* `user_playlist_follow_playlist`
* `user_playlist_is_following`

Removed the deprecated `as_dict` parameter from the `get_access_token` method of `SpotifyOAuth` and `SpotifyPKCE`.

Removed the deprecated `get_cached_token` and `_save_token_info` methods of `SpotifyOAuth` and `SpotifyPKCE`.

Removed `SpotifyImplicitGrant`.

Removed `prompt_for_user_token`.
This commit is contained in:
Peter-Schorn 2021-04-14 12:40:08 -05:00 committed by dieser-niko
parent 68ee2d6de8
commit c8e045891b
34 changed files with 202 additions and 1196 deletions

View File

@ -7,7 +7,7 @@ import spotipy
logger = logging.getLogger('examples.artist_albums')
logging.basicConfig(level='INFO')
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
def get_args():

View File

@ -69,6 +69,6 @@ def main():
if __name__ == '__main__':
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
main()

View File

@ -8,8 +8,8 @@ from spotipy.oauth2 import SpotifyClientCredentials
logger = logging.getLogger('examples.artist_recommendations')
logging.basicConfig(level='INFO')
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
def get_args():

View File

@ -7,8 +7,8 @@ import time
import sys
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
if len(sys.argv) > 1:
tid = sys.argv[1]

View File

@ -7,8 +7,8 @@ import time
import sys
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
sp.trace = False
if len(sys.argv) > 1:

View File

@ -7,8 +7,8 @@ import time
import sys
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
sp.trace = True
if len(sys.argv) > 1:

View File

@ -2,8 +2,8 @@ from spotipy.oauth2 import SpotifyClientCredentials
import spotipy
from pprint import pprint
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
search_str = 'Muse'
result = sp.search(search_str)

View File

@ -1,10 +0,0 @@
import spotipy
import spotipy.util as util
from pprint import pprint
while True:
username = input("Type the Spotify user ID to use: ")
token = util.prompt_for_user_token(username, show_dialog=True)
sp = spotipy.Spotify(token)
pprint(sp.me())

View File

@ -4,7 +4,7 @@ from pprint import pprint
from time import sleep
scope = "user-read-playback-state,user-modify-playback-state"
sp = spotipy.Spotify(client_credentials_manager=SpotifyOAuth(scope=scope))
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))
# Shows playing devices
res = sp.devices()

View File

@ -6,7 +6,7 @@ import spotipy
PlaylistExample = '37i9dQZEVXbMDoHDwVN2tF'
# create spotipy client
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
# load the first 100 songs
tracks = []

View File

@ -2,7 +2,7 @@ from spotipy.oauth2 import SpotifyClientCredentials
import spotipy
from pprint import pprint
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
pl_id = 'spotify:playlist:5RIbzhG2QqdkaP24iXLnZX'
offset = 0

View File

@ -2,8 +2,8 @@ from spotipy.oauth2 import SpotifyClientCredentials
import spotipy
import json
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
playlist_id = 'spotify:user:spotifycharts:playlist:37i9dQZEVXbJiZcmkrIHGU'
results = sp.playlist(playlist_id)

View File

@ -10,6 +10,6 @@ if len(sys.argv) > 1:
else:
search_str = 'Radiohead'
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
result = sp.search(search_str)
pprint.pprint(result)

View File

@ -10,6 +10,6 @@ if len(sys.argv) > 1:
else:
urn = 'spotify:album:5yTx83u3qerZF7GRJu7eFk'
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
album = sp.album(urn)
pprint(album)

View File

@ -10,7 +10,7 @@ if len(sys.argv) > 1:
else:
urn = 'spotify:artist:3jOstUTkEu2JkjvRdBA5Gu'
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
artist = sp.artist(urn)
pprint(artist)

View File

@ -10,7 +10,7 @@ if len(sys.argv) > 1:
else:
urn = 'spotify:artist:3jOstUTkEu2JkjvRdBA5Gu'
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
response = sp.artist_top_tracks(urn)
for track in response['tracks']:

View File

@ -9,8 +9,8 @@ if len(sys.argv) > 1:
else:
artist_name = 'weezer'
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
result = sp.search(q='artist:' + artist_name, type='artist')
try:
name = result['artists']['items'][0]['name']

View File

@ -10,7 +10,7 @@ if len(sys.argv) > 1:
else:
urn = 'spotify:track:0Svkvt5I79wficMFgaqEQJ'
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
track = sp.track(urn)
pprint(track)

View File

@ -15,8 +15,8 @@ if __name__ == '__main__':
file = sys.stdin
tids = file.read().split()
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
for start in range(0, len(tids), max_tracks_per_call):
results = sp.tracks(tids[start: start + max_tracks_per_call])
for track in results['tracks']:

View File

@ -10,8 +10,8 @@ if len(sys.argv) > 1:
else:
username = 'plamere'
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
sp.trace = True
user = sp.user(username)
pprint.pprint(user)

View File

@ -3,8 +3,8 @@ import spotipy
birdy_uri = 'spotify:artist:2WX2uTcsvV5OnS0inACecP'
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
results = sp.artist_albums(birdy_uri, album_type='album')
albums = results['items']

View File

@ -3,8 +3,8 @@ import spotipy
lz_uri = 'spotify:artist:36QJpDe2go2KgaRleHCDTp'
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
results = sp.artist_top_tracks(lz_uri)

View File

@ -1,8 +1,8 @@
from spotipy.oauth2 import SpotifyClientCredentials
import spotipy
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
results = sp.search(q='weezer', limit=20)
for i, t in enumerate(results['tracks']['items']):

View File

@ -4,7 +4,7 @@ import sys
from spotipy.oauth2 import SpotifyClientCredentials
import spotipy
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials())
if len(sys.argv) > 1:
name = ' '.join(sys.argv[1:])

View File

@ -9,8 +9,8 @@ import random
usage: python title_chain.py [song name]
'''
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
skiplist = {'dm', 'remix'}

View File

@ -6,8 +6,8 @@ from spotipy.oauth2 import SpotifyClientCredentials
import spotipy
import sys
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
if len(sys.argv) > 1:
artist_name = ' '.join(sys.argv[1:])

View File

@ -6,8 +6,8 @@ import sys
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
client_credentials_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
user = 'spotify'

View File

@ -125,11 +125,9 @@ class Spotify:
def __init__(
self,
auth=None,
requests_session=True,
client_credentials_manager=None,
oauth_manager=None,
access_token=None,
auth_manager=None,
requests_session=True,
proxies=None,
requests_timeout=5,
status_forcelist=None,
@ -141,19 +139,16 @@ class Spotify:
"""
Creates a Spotify API client.
:param auth: An access token (optional)
:param access_token: An access token (optional). If not None, then this parameter
will override the auth_manager parameter. Prefer `auth_manager` over this parameter
because otherwise you cannot refresh the `access_token`.
:param auth_manager:
SpotifyOauth, SpotifyClientCredentials, or SpotifyPKCE object
:param requests_session:
A Requests session object or a truthy value to create one.
A falsy value disables sessions.
A Requests session object or a true value to create one.
A false value disables sessions.
It should generally be a good idea to keep sessions enabled
for performance reasons (connection pooling).
:param client_credentials_manager:
SpotifyClientCredentials object
:param oauth_manager:
SpotifyOAuth object
:param auth_manager:
SpotifyOauth, SpotifyClientCredentials,
or SpotifyImplicitGrant object
:param proxies:
Definition of proxies (optional).
See Requests doc https://2.python-requests.org/en/master/user/advanced/#proxies
@ -173,17 +168,23 @@ class Spotify:
The language parameter advertises what language the user prefers to see.
See ISO-639-1 language code: https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes
"""
if access_token is not None and auth_manager is not None:
warnings.warn(
"Either `access_token` or `auth_manager` should be provided, "
"not both. `auth_manager` will be ignored.",
UserWarning
)
self.prefix = "https://api.spotify.com/v1/"
self._auth = auth
self.client_credentials_manager = client_credentials_manager
self.oauth_manager = oauth_manager
self.access_token = access_token
self.auth_manager = auth_manager
self.proxies = proxies
self.requests_timeout = requests_timeout
self.status_forcelist = status_forcelist or self.default_retry_codes
self.backoff_factor = backoff_factor
self.retries = retries
self.status_retries = status_retries
self.backoff_factor = backoff_factor
self.language = language
if isinstance(requests_session, requests.Session):
@ -194,21 +195,6 @@ class Spotify:
else: # Use the Requests API module as a "session".
self._session = requests.api
def set_auth(self, auth):
self._auth = auth
@property
def auth_manager(self):
return self._auth_manager
@auth_manager.setter
def auth_manager(self, auth_manager):
if auth_manager is not None:
self._auth_manager = auth_manager
else:
self._auth_manager = (
self.client_credentials_manager or self.oauth_manager
)
def __del__(self):
"""Make sure the connection (pool) gets closed"""
@ -234,12 +220,12 @@ class Spotify:
self._session.mount('https://', adapter)
def _auth_headers(self):
if self._auth:
return {"Authorization": f"Bearer {self._auth}"}
if self.access_token:
return {"Authorization": "Bearer {0}".format(self.access_token)}
if not self.auth_manager:
return {}
try:
token = self.auth_manager.get_access_token(as_dict=False)
token = self.auth_manager.get_access_token()
except TypeError:
token = self.auth_manager.get_access_token()
return {"Authorization": f"Bearer {token}"}
@ -670,34 +656,6 @@ class Spotify:
additional_types=",".join(additional_types),
)
def playlist_tracks(
self,
playlist_id,
fields=None,
limit=100,
offset=0,
market=None,
additional_types=("track",)
):
""" Get full details of the tracks of a playlist.
Parameters:
- playlist_id - the playlist ID, URI or URL
- fields - which fields to return
- limit - the maximum number of tracks to return
- offset - the index of the first track to return
- market - an ISO 3166-1 alpha-2 country code.
- additional_types - list of item types to return.
valid types are: track and episode
"""
warnings.warn(
"You should use `playlist_items(playlist_id, ...,"
"additional_types=('track',))` instead",
DeprecationWarning,
)
return self.playlist_items(playlist_id, fields, limit, offset,
market, additional_types)
def playlist_items(
self,
playlist_id,
@ -752,55 +710,6 @@ class Spotify:
content_type="image/jpeg",
)
def user_playlist(self, user, playlist_id=None, fields=None, market=None):
warnings.warn(
"You should use `playlist(playlist_id)` instead",
DeprecationWarning,
)
""" Gets a single playlist of a user
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- fields - which fields to return
"""
if playlist_id is None:
return self._get(f"users/{user}/starred")
return self.playlist(playlist_id, fields=fields, market=market)
def user_playlist_tracks(
self,
user=None,
playlist_id=None,
fields=None,
limit=100,
offset=0,
market=None,
):
warnings.warn(
"You should use `playlist_tracks(playlist_id)` instead",
DeprecationWarning,
)
""" Get full details of the tracks of a playlist owned by a user.
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- fields - which fields to return
- limit - the maximum number of tracks to return
- offset - the index of the first track to return
- market - an ISO 3166-1 alpha-2 country code.
"""
return self.playlist_tracks(
playlist_id,
limit=limit,
offset=offset,
fields=fields,
market=market,
)
def user_playlists(self, user, limit=50, offset=0):
""" Gets playlists of a user
@ -832,233 +741,6 @@ class Spotify:
return self._post(f"users/{user}/playlists", payload=data)
def user_playlist_change_details(
self,
user,
playlist_id,
name=None,
public=None,
collaborative=None,
description=None,
):
""" This function is no longer in use, please use the recommended function in the warning!
Changes a playlist's name and/or public/private state
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- name - optional name of the playlist
- public - optional is the playlist public
- collaborative - optional is the playlist collaborative
- description - optional description of the playlist
"""
warnings.warn(
"You should use `playlist_change_details(playlist_id, ...)` instead",
DeprecationWarning,
)
return self.playlist_change_details(playlist_id, name, public,
collaborative, description)
def user_playlist_unfollow(self, user, playlist_id):
""" This function is no longer in use, please use the recommended function in the warning!
Unfollows (deletes) a playlist for a user
Parameters:
- user - the id of the user
- name - the name of the playlist
"""
warnings.warn(
"You should use `current_user_unfollow_playlist(playlist_id)` instead",
DeprecationWarning,
)
return self.current_user_unfollow_playlist(playlist_id)
def user_playlist_add_tracks(
self, user, playlist_id, tracks, position=None
):
""" This function is no longer in use, please use the recommended function in the warning!
Adds tracks to a playlist
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- tracks - a list of track URIs, URLs or IDs
- position - the position to add the tracks
"""
warnings.warn(
"You should use `playlist_add_items(playlist_id, tracks)` instead",
DeprecationWarning,
)
tracks = [self._get_uri("track", tid) for tid in tracks]
return self.playlist_add_items(playlist_id, tracks, position)
def user_playlist_add_episodes(
self, user, playlist_id, episodes, position=None
):
""" This function is no longer in use, please use the recommended function in the warning!
Adds episodes to a playlist
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- episodes - a list of track URIs, URLs or IDs
- position - the position to add the episodes
"""
warnings.warn(
"You should use `playlist_add_items(playlist_id, episodes)` instead",
DeprecationWarning,
)
episodes = [self._get_uri("episode", tid) for tid in episodes]
return self.playlist_add_items(playlist_id, episodes, position)
def user_playlist_replace_tracks(self, user, playlist_id, tracks):
""" This function is no longer in use, please use the recommended function in the warning!
Replace all tracks in a playlist for a user
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- tracks - the list of track ids to add to the playlist
"""
warnings.warn(
"You should use `playlist_replace_items(playlist_id, tracks)` instead",
DeprecationWarning,
)
return self.playlist_replace_items(playlist_id, tracks)
def user_playlist_reorder_tracks(
self,
user,
playlist_id,
range_start,
insert_before,
range_length=1,
snapshot_id=None,
):
""" This function is no longer in use, please use the recommended function in the warning!
Reorder tracks in a playlist from a user
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- range_start - the position of the first track to be reordered
- range_length - optional the number of tracks to be reordered
(default: 1)
- insert_before - the position where the tracks should be
inserted
- snapshot_id - optional playlist's snapshot ID
"""
warnings.warn(
"You should use `playlist_reorder_items(playlist_id, ...)` instead",
DeprecationWarning,
)
return self.playlist_reorder_items(playlist_id, range_start,
insert_before, range_length,
snapshot_id)
def user_playlist_remove_all_occurrences_of_tracks(
self, user, playlist_id, tracks, snapshot_id=None
):
""" This function is no longer in use, please use the recommended function in the warning!
Removes all occurrences of the given tracks from the given playlist
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- tracks - the list of track ids to remove from the playlist
- snapshot_id - optional id of the playlist snapshot
"""
warnings.warn(
"You should use `playlist_remove_all_occurrences_of_items"
"(playlist_id, tracks)` instead",
DeprecationWarning,
)
return self.playlist_remove_all_occurrences_of_items(playlist_id,
tracks,
snapshot_id)
def user_playlist_remove_specific_occurrences_of_tracks(
self, user, playlist_id, tracks, snapshot_id=None
):
""" This function is no longer in use, please use the recommended function in the warning!
Removes all occurrences of the given tracks from the given playlist
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- tracks - an array of objects containing Spotify URIs of the
tracks to remove with their current positions in the
playlist. For example:
[ { "uri":"4iV5W9uYEdYUVa79Axb7Rh", "positions":[2] },
{ "uri":"1301WleyT98MSxVHPZCA6M", "positions":[7] } ]
- snapshot_id - optional id of the playlist snapshot
"""
warnings.warn(
"You should use `playlist_remove_specific_occurrences_of_items"
"(playlist_id, tracks)` instead",
DeprecationWarning,
)
plid = self._get_id("playlist", playlist_id)
ftracks = []
for tr in tracks:
ftracks.append(
{
"uri": self._get_uri("track", tr["uri"]),
"positions": tr["positions"],
}
)
payload = {"tracks": ftracks}
if snapshot_id:
payload["snapshot_id"] = snapshot_id
return self._delete(
f"users/{user}/playlists/{plid}/tracks", payload=payload
)
def user_playlist_follow_playlist(self, playlist_owner_id, playlist_id):
""" This function is no longer in use, please use the recommended function in the warning!
Add the current authenticated user as a follower of a playlist.
Parameters:
- playlist_owner_id - the user id of the playlist owner
- playlist_id - the id of the playlist
"""
warnings.warn(
"You should use `current_user_follow_playlist(playlist_id)` instead",
DeprecationWarning,
)
return self.current_user_follow_playlist(playlist_id)
def user_playlist_is_following(
self, playlist_owner_id, playlist_id, user_ids
):
""" This function is no longer in use, please use the recommended function in the warning!
Check to see if the given users are following the given playlist
Parameters:
- playlist_owner_id - the user id of the playlist owner
- playlist_id - the id of the playlist
- user_ids - the ids of the users that you want to check to see
if they follow the playlist. Maximum: 5 ids.
"""
warnings.warn(
"You should use `playlist_is_following(playlist_id, user_ids)` instead",
DeprecationWarning,
)
return self.playlist_is_following(playlist_id, user_ids)
def playlist_change_details(
self,
playlist_id,

View File

@ -3,7 +3,6 @@ __all__ = [
"SpotifyOAuth",
"SpotifyOauthError",
"SpotifyStateError",
"SpotifyImplicitGrant",
"SpotifyPKCE"
]
@ -11,7 +10,6 @@ import base64
import logging
import os
import time
import warnings
import webbrowser
import requests
@ -82,9 +80,6 @@ class SpotifyAuthBase:
@staticmethod
def _get_user_input(prompt):
try:
return raw_input(prompt)
except NameError:
return input(prompt)
@staticmethod
@ -133,10 +128,10 @@ class SpotifyClientCredentials(SpotifyAuthBase):
self,
client_id=None,
client_secret=None,
cache_handler=None,
proxies=None,
requests_session=True,
requests_timeout=None,
cache_handler=None
requests_timeout=None
):
"""
Creates a Client Credentials Flow Manager.
@ -154,14 +149,16 @@ class SpotifyClientCredentials(SpotifyAuthBase):
Parameters:
* client_id: Must be supplied or set as environment variable
* client_secret: Must be supplied or set as environment variable
* proxies: Optional, proxy for the requests library to route through
* requests_session: A Requests session
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
* proxies: Optional, proxy for the requests library to route through
* requests_session: A Requests session object or a true value to create one.
A false value disables sessions.
It should generally be a good idea to keep sessions enabled
for performance reasons (connection pooling).
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
"""
@ -179,35 +176,25 @@ class SpotifyClientCredentials(SpotifyAuthBase):
else:
self.cache_handler = CacheFileHandler()
def get_access_token(self, as_dict=True, check_cache=True):
def get_access_token(self, check_cache=True):
"""
If a valid access token is in memory, returns it
Else fetches a new token and returns it
Parameters:
- as_dict - a boolean indicating if returning the access token
as a token_info dictionary, otherwise it will be returned
as a string.
- check_cache - if true, checks for a locally stored token
before requesting a new token.
"""
if as_dict:
warnings.warn(
"You're using 'as_dict = True'."
"get_access_token will return the token string directly in future "
"versions. Please adjust your code accordingly, or use "
"get_cached_token instead.",
DeprecationWarning,
stacklevel=2,
)
if check_cache:
token_info = self.cache_handler.get_cached_token()
if token_info and not self.is_token_expired(token_info):
return token_info if as_dict else token_info["access_token"]
return token_info["access_token"]
token_info = self._request_access_token()
token_info = self._add_custom_values_to_token_info(token_info)
self.cache_handler.save_token_to_cache(token_info)
return token_info if as_dict else token_info["access_token"]
return token_info["access_token"]
def _request_access_token(self):
"""Gets client credentials access token """
@ -260,14 +247,12 @@ class SpotifyOAuth(SpotifyAuthBase):
redirect_uri=None,
state=None,
scope=None,
cache_path=None,
username=None,
cache_handler=None,
proxies=None,
show_dialog=False,
requests_session=True,
requests_timeout=None,
open_browser=True,
cache_handler=None
open_browser=True
):
"""
Creates a SpotifyOAuth object
@ -280,24 +265,19 @@ class SpotifyOAuth(SpotifyAuthBase):
* scope: Optional, either a string of scopes, or an iterable with elements of type
`Scope` or `str`. E.g.,
{Scope.user_modify_playback_state, Scope.user_library_read}
iterable of scopes or comma separated string of scopes.
e.g, "playlist-read-private,playlist-read-collaborative"
* cache_path: (deprecated) Optional, will otherwise be generated
(takes precedence over `username`)
* username: (deprecated) Optional or set as environment variable
(will set `cache_path` to `.cache-{username}`)
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
* proxies: Optional, proxy for the requests library to route through
* show_dialog: Optional, interpreted as boolean
* requests_session: A Requests session
* requests_session: A Requests session object or a true value to create one.
A false value disables sessions.
It should generally be a good idea to keep sessions enabled
for performance reasons (connection pooling).
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* open_browser: Optional, whether the web browser should be opened to
authorize a user
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
"""
super().__init__(requests_session)
@ -307,33 +287,14 @@ class SpotifyOAuth(SpotifyAuthBase):
self.redirect_uri = redirect_uri
self.state = state
self.scope = self._normalize_scope(scope)
if username or cache_path:
warnings.warn("Specifying cache_path or username as arguments to SpotifyOAuth " +
"will be deprecated. Instead, please create a CacheFileHandler " +
"instance with the desired cache_path and username and pass it " +
"to SpotifyOAuth as the cache_handler. For example:\n\n" +
"\tfrom spotipy.oauth2 import CacheFileHandler\n" +
"\thandler = CacheFileHandler(cache_path=cache_path, " +
"username=username)\n" +
"\tsp = spotipy.SpotifyOAuth(client_id, client_secret, " +
"redirect_uri," +
" cache_handler=handler)",
DeprecationWarning
)
if cache_handler:
warnings.warn("A cache_handler has been specified along with a cache_path or " +
"username. The cache_path and username arguments will be ignored.")
if cache_handler:
assert issubclass(cache_handler.__class__, CacheHandler), \
"cache_handler must be a subclass of CacheHandler: " + str(type(cache_handler)) \
+ " != " + str(CacheHandler)
self.cache_handler = cache_handler
else:
username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
self.cache_handler = CacheFileHandler(
username=username,
cache_path=cache_path
)
self.cache_handler = CacheFileHandler()
self.proxies = proxies
self.requests_timeout = requests_timeout
self.show_dialog = show_dialog
@ -475,24 +436,14 @@ class SpotifyOAuth(SpotifyAuthBase):
return self.parse_response_code(response)
return self.get_auth_response()
def get_access_token(self, code=None, as_dict=True, check_cache=True):
def get_access_token(self, code=None, check_cache=True):
""" Gets the access token for the app given the code
Parameters:
- code - the response code
- as_dict - a boolean indicating if returning the access token
as a token_info dictionary, otherwise it will be returned
as a string.
- check_cache - if true, checks for a locally stored token
before requesting a new token
"""
if as_dict:
warnings.warn(
"You're using 'as_dict = True'."
"get_access_token will return the token string directly in future "
"versions. Please adjust your code accordingly, or use "
"get_cached_token instead.",
DeprecationWarning,
stacklevel=2,
)
if check_cache:
token_info = self.validate_token(self.cache_handler.get_cached_token())
if token_info is not None:
@ -500,7 +451,7 @@ class SpotifyOAuth(SpotifyAuthBase):
token_info = self.refresh_access_token(
token_info["refresh_token"]
)
return token_info if as_dict else token_info["access_token"]
return token_info["access_token"]
payload = {
"redirect_uri": self.redirect_uri,
@ -532,7 +483,7 @@ class SpotifyOAuth(SpotifyAuthBase):
token_info = response.json()
token_info = self._add_custom_values_to_token_info(token_info)
self.cache_handler.save_token_to_cache(token_info)
return token_info if as_dict else token_info["access_token"]
return token_info["access_token"]
except requests.exceptions.HTTPError as http_error:
self._handle_oauth_error(http_error)
@ -576,26 +527,6 @@ class SpotifyOAuth(SpotifyAuthBase):
token_info["scope"] = self.scope
return token_info
def get_cached_token(self):
warnings.warn("Calling get_cached_token directly on the SpotifyOAuth object will be " +
"deprecated. Instead, please specify a CacheFileHandler instance as " +
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
"get_cached_token method. You can replace:\n\tsp.get_cached_token()" +
"\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
DeprecationWarning
)
return self.validate_token(self.cache_handler.get_cached_token())
def _save_token_info(self, token_info):
warnings.warn("Calling _save_token_info directly on the SpotifyOAuth object will be " +
"deprecated. Instead, please specify a CacheFileHandler instance as " +
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
"save_token_to_cache method.",
DeprecationWarning
)
self.cache_handler.save_token_to_cache(token_info)
return None
class SpotifyPKCE(SpotifyAuthBase):
""" Implements PKCE Authorization Flow for client apps
@ -612,18 +543,18 @@ class SpotifyPKCE(SpotifyAuthBase):
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
def __init__(self,
def __init__(
self,
client_id=None,
redirect_uri=None,
state=None,
scope=None,
cache_path=None,
username=None,
cache_handler=None,
proxies=None,
requests_timeout=None,
requests_session=True,
open_browser=True,
cache_handler=None):
open_browser=True
):
"""
Creates Auth Manager with the PKCE Auth flow.
@ -631,22 +562,21 @@ class SpotifyPKCE(SpotifyAuthBase):
* client_id: Must be supplied or set as environment variable
* redirect_uri: Must be supplied or set as environment variable
* state: Optional, no verification is performed
* scope: Optional, either a list of scopes or comma separated string of scopes.
e.g, "playlist-read-private,playlist-read-collaborative"
* cache_path: (deprecated) Optional, will otherwise be generated
(takes precedence over `username`)
* username: (deprecated) Optional or set as environment variable
(will set `cache_path` to `.cache-{username}`)
* proxies: Optional, proxy for the requests library to route through
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* requests_session: A Requests session
* open_browser: Optional, whether the web browser should be opened to
authorize a user
* scope: Optional, either a string of scopes, or an iterable with elements of type
`Scope` or `str`. E.g.,
{Scope.user_modify_playback_state, Scope.user_library_read}
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
Optional, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
* proxies: Optional, proxy for the requests library to route through
* requests_timeout: Optional, tell Requests to stop waiting for a response after
a given number of seconds
* requests_session: A Requests session object or a true value to create one.
A false value disables sessions.
It should generally be a good idea to keep sessions enabled
for performance reasons (connection pooling).
* open_browser: Optional, thether or not the web browser should be opened to
authorize a user
"""
super().__init__(requests_session)
@ -654,31 +584,12 @@ class SpotifyPKCE(SpotifyAuthBase):
self.redirect_uri = redirect_uri
self.state = state
self.scope = self._normalize_scope(scope)
if username or cache_path:
warnings.warn("Specifying cache_path or username as arguments to SpotifyPKCE " +
"will be deprecated. Instead, please create a CacheFileHandler " +
"instance with the desired cache_path and username and pass it " +
"to SpotifyPKCE as the cache_handler. For example:\n\n" +
"\tfrom spotipy.oauth2 import CacheFileHandler\n" +
"\thandler = CacheFileHandler(cache_path=cache_path, " +
"username=username)\n" +
"\tsp = spotipy.SpotifyImplicitGrant(client_id, client_secret, " +
"redirect_uri, cache_handler=handler)",
DeprecationWarning
)
if cache_handler:
warnings.warn("A cache_handler has been specified along with a cache_path or " +
"username. The cache_path and username arguments will be ignored.")
if cache_handler:
assert issubclass(type(cache_handler), CacheHandler), \
"type(cache_handler): " + str(type(cache_handler)) + " != " + str(CacheHandler)
self.cache_handler = cache_handler
else:
username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
self.cache_handler = CacheFileHandler(
username=username,
cache_path=cache_path
)
self.cache_handler = CacheFileHandler()
self.proxies = proxies
self.requests_timeout = requests_timeout
@ -941,285 +852,6 @@ class SpotifyPKCE(SpotifyAuthBase):
def parse_auth_response_url(url):
return SpotifyOAuth.parse_auth_response_url(url)
def get_cached_token(self):
warnings.warn("Calling get_cached_token directly on the SpotifyPKCE object will be " +
"deprecated. Instead, please specify a CacheFileHandler instance as " +
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
"get_cached_token method. You can replace:\n\tsp.get_cached_token()" +
"\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
DeprecationWarning
)
return self.validate_token(self.cache_handler.get_cached_token())
def _save_token_info(self, token_info):
warnings.warn("Calling _save_token_info directly on the SpotifyOAuth object will be " +
"deprecated. Instead, please specify a CacheFileHandler instance as " +
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
"save_token_to_cache method.",
DeprecationWarning
)
self.cache_handler.save_token_to_cache(token_info)
return None
class SpotifyImplicitGrant(SpotifyAuthBase):
""" Implements Implicit Grant Flow for client apps
This auth manager enables *user and non-user* endpoints with only
a client secret, redirect uri, and username. The user will need to
copy and paste a URI from the browser every hour.
Security Warning
-----------------
The OAuth standard no longer recommends the Implicit Grant Flow for
client-side code. Spotify has implemented the OAuth-suggested PKCE
extension that removes the need for a client secret in the
Authentication Code flow. Use the SpotifyPKCE auth manager instead
of SpotifyImplicitGrant.
SpotifyPKCE contains all the functionality of
SpotifyImplicitGrant, plus automatic response retrieval and
refreshable tokens. Only a few replacements need to be made:
* get_auth_response()['access_token'] ->
get_access_token(get_authorization_code())
* get_auth_response() ->
get_access_token(get_authorization_code()); get_cached_token()
* parse_response_token(url)['access_token'] ->
get_access_token(parse_response_code(url))
* parse_response_token(url) ->
get_access_token(parse_response_code(url)); get_cached_token()
The security concern in the Implicit Grant flow is that the token is
returned in the URL and can be intercepted through the browser. A
request with an authorization code and proof of origin could not be
easily intercepted without a compromised network.
"""
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
def __init__(self,
client_id=None,
redirect_uri=None,
state=None,
scope=None,
cache_path=None,
username=None,
show_dialog=False,
cache_handler=None):
""" Creates Auth Manager using the Implicit Grant flow
**See help(SpotifyImplicitGrant) for full Security Warning**
Parameters
----------
* client_id: Must be supplied or set as environment variable
* redirect_uri: Must be supplied or set as environment variable
* state: May be supplied, no verification is performed
* scope: Optional, either a string of scopes, or an iterable with elements of type
`Scope` or `str`. E.g.,
{Scope.user_modify_playback_state, Scope.user_library_read}
* cache_handler: An instance of the `CacheHandler` class to handle
getting and saving cached authorization tokens.
May be supplied, will otherwise use `CacheFileHandler`.
(takes precedence over `cache_path` and `username`)
* cache_path: (deprecated) May be supplied, will otherwise be generated
(takes precedence over `username`)
* username: (deprecated) May be supplied or set as environment variable
(will set `cache_path` to `.cache-{username}`)
* show_dialog: Interpreted as boolean
"""
logger.warning("The OAuth standard no longer recommends the Implicit "
"Grant Flow for client-side code. Use the SpotifyPKCE "
"auth manager instead of SpotifyImplicitGrant. For "
"more details and a guide to switching, see "
"help(SpotifyImplicitGrant).")
self.client_id = client_id
self.redirect_uri = redirect_uri
self.state = state
if username or cache_path:
warnings.warn("Specifying cache_path or username as arguments to " +
"SpotifyImplicitGrant will be deprecated. Instead, please create " +
"a CacheFileHandler instance with the desired cache_path and " +
"username and pass it to SpotifyImplicitGrant as the " +
"cache_handler. For example:\n\n" +
"\tfrom spotipy.oauth2 import CacheFileHandler\n" +
"\thandler = CacheFileHandler(cache_path=cache_path, " +
"username=username)\n" +
"\tsp = spotipy.SpotifyImplicitGrant(client_id, client_secret, " +
"redirect_uri, cache_handler=handler)",
DeprecationWarning
)
if cache_handler:
warnings.warn("A cache_handler has been specified along with a cache_path or " +
"username. The cache_path and username arguments will be ignored.")
if cache_handler:
assert issubclass(type(cache_handler), CacheHandler), \
"type(cache_handler): " + str(type(cache_handler)) + " != " + str(CacheHandler)
self.cache_handler = cache_handler
else:
username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
self.cache_handler = CacheFileHandler(
username=username,
cache_path=cache_path
)
self.scope = self._normalize_scope(scope)
self.show_dialog = show_dialog
self._session = None # As to not break inherited __del__
def validate_token(self, token_info):
if token_info is None:
return None
# if scopes don't match, then bail
if "scope" not in token_info or not self._is_scope_subset(
self.scope, token_info["scope"]
):
return None
if self.is_token_expired(token_info):
return None
return token_info
def get_access_token(self,
state=None,
response=None,
check_cache=True):
""" Gets Auth Token from cache (preferred) or user interaction
Parameters
----------
* state: May be given, overrides (without changing) self.state
* response: URI with token, can break expiration checks
* check_cache: Interpreted as boolean
"""
if check_cache:
token_info = self.validate_token(self.cache_handler.get_cached_token())
if not (token_info is None or self.is_token_expired(token_info)):
return token_info["access_token"]
if response:
token_info = self.parse_response_token(response)
else:
token_info = self.get_auth_response(state)
token_info = self._add_custom_values_to_token_info(token_info)
self.cache_handler.save_token_to_cache(token_info)
return token_info["access_token"]
def get_authorize_url(self, state=None):
""" Gets the URL to use to authorize this app """
payload = {
"client_id": self.client_id,
"response_type": "token",
"redirect_uri": self.redirect_uri,
}
if self.scope:
payload["scope"] = self.scope
if state is None:
state = self.state
if state is not None:
payload["state"] = state
if self.show_dialog:
payload["show_dialog"] = True
urlparams = urllibparse.urlencode(payload)
return f"{self.OAUTH_AUTHORIZE_URL}?{urlparams}"
def parse_response_token(self, url, state=None):
""" Parse the response code in the given response url """
remote_state, token, t_type, exp_in = self.parse_auth_response_url(url)
if state is None:
state = self.state
if state is not None and remote_state != state:
raise SpotifyStateError(state, remote_state)
return {"access_token": token, "token_type": t_type,
"expires_in": exp_in, "state": state}
@staticmethod
def parse_auth_response_url(url):
url_components = urlparse(url)
fragment_s = url_components.fragment
query_s = url_components.query
form = dict(i.split('=') for i
in (fragment_s or query_s or url).split('&'))
if "error" in form:
raise SpotifyOauthError(f"Received error from auth server: {form['error']}",
state=form["state"])
if "expires_in" in form:
form["expires_in"] = int(form["expires_in"])
return tuple(form.get(param) for param in ["state", "access_token",
"token_type", "expires_in"])
def _open_auth_url(self, state=None):
auth_url = self.get_authorize_url(state)
try:
webbrowser.open(auth_url)
logger.info("Opened %s in your browser", auth_url)
except webbrowser.Error:
logger.error("Please navigate here: %s", auth_url)
def get_auth_response(self, state=None):
""" Gets a new auth **token** with user interaction """
logger.info('User authentication requires interaction with your '
'web browser. Once you enter your credentials and '
'give authorization, you will be redirected to '
'a url. Paste that url you were directed to to '
'complete the authorization.')
redirect_info = urlparse(self.redirect_uri)
redirect_host, redirect_port = get_host_port(redirect_info.netloc)
# Implicit Grant tokens are returned in a hash fragment
# which is only available to the browser. Therefore, interactive
# URL retrieval is required.
if (redirect_host in ("127.0.0.1", "localhost")
and redirect_info.scheme == "http" and redirect_port):
logger.warning('Using a local redirect URI with a '
'port, likely expecting automatic '
'retrieval. Due to technical limitations, '
'the authentication token cannot be '
'automatically retrieved and must be '
'copied and pasted.')
self._open_auth_url(state)
logger.info('Paste that url you were directed to in order to '
'complete the authorization')
response = SpotifyImplicitGrant._get_user_input("Enter the URL you "
"were redirected to: ")
return self.parse_response_token(response, state)
def _add_custom_values_to_token_info(self, token_info):
"""
Store some values that aren't directly provided by a Web API
response.
"""
token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
token_info["scope"] = self.scope
return token_info
def get_cached_token(self):
warnings.warn("Calling get_cached_token directly on the SpotifyImplicitGrant " +
"object will be deprecated. Instead, please specify a " +
"CacheFileHandler instance as the cache_handler in SpotifyOAuth " +
"and use the CacheFileHandler's get_cached_token method. " +
"You can replace:\n\tsp.get_cached_token()" +
"\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
DeprecationWarning
)
return self.validate_token(self.cache_handler.get_cached_token())
def _save_token_info(self, token_info):
warnings.warn("Calling _save_token_info directly on the SpotifyImplicitGrant " +
"object will be deprecated. Instead, please specify a " +
"CacheFileHandler instance as the cache_handler in SpotifyOAuth " +
"and use the CacheFileHandler's save_token_to_cache method.",
DeprecationWarning
)
self.cache_handler.save_token_to_cache(token_info)
return None
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):

View File

@ -2,7 +2,7 @@ from __future__ import annotations
""" Shows a user's playlists. This needs to be authenticated via OAuth. """
__all__ = ["CLIENT_CREDS_ENV_VARS", "prompt_for_user_token"]
__all__ = ["CLIENT_CREDS_ENV_VARS"]
import logging
import os
@ -23,91 +23,6 @@ CLIENT_CREDS_ENV_VARS = {
}
def prompt_for_user_token(
username=None,
scope=None,
client_id=None,
client_secret=None,
redirect_uri=None,
cache_path=None,
oauth_manager=None,
show_dialog=False
):
warnings.warn(
"'prompt_for_user_token' is deprecated."
"Use the following instead: "
" auth_manager=SpotifyOAuth(scope=scope)"
" spotipy.Spotify(auth_manager=auth_manager)",
DeprecationWarning
)
"""Prompt the user to login if necessary and returns a user token
suitable for use with the spotipy.Spotify constructor.
Parameters:
- username - the Spotify username. (optional)
- scope - the desired scope of the request. (optional)
- client_id - the client ID of your app. (required)
- client_secret - the client secret of your app. (required)
- redirect_uri - the redirect URI of your app. (required)
- cache_path - path to location to save tokens. (required)
- oauth_manager - OAuth manager object. (optional)
- show_dialog - If True, a login prompt always shows or defaults to False. (optional)
"""
if not oauth_manager:
if not client_id:
client_id = os.getenv("SPOTIPY_CLIENT_ID")
if not client_secret:
client_secret = os.getenv("SPOTIPY_CLIENT_SECRET")
if not redirect_uri:
redirect_uri = os.getenv("SPOTIPY_REDIRECT_URI")
if not client_id:
LOGGER.warning(
"""
You need to set your Spotify API credentials.
You can do this by setting environment variables like so:
export SPOTIPY_CLIENT_ID='your-spotify-client-id'
export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret'
export SPOTIPY_REDIRECT_URI='your-app-redirect-url'
Get your credentials at
https://developer.spotify.com/my-applications
"""
)
raise spotipy.SpotifyException(550, -1, "no credentials set")
sp_oauth = oauth_manager or spotipy.SpotifyOAuth(
client_id,
client_secret,
redirect_uri,
scope=scope,
cache_path=cache_path,
username=username,
show_dialog=show_dialog
)
# try to get a valid token for this user, from the cache,
# if not in the cache, then create a new (this will send
# the user to a web page where they can authorize this app)
token_info = sp_oauth.validate_token(sp_oauth.cache_handler.get_cached_token())
if not token_info:
code = sp_oauth.get_auth_response()
token = sp_oauth.get_access_token(code, as_dict=False)
else:
return token_info["access_token"]
# Auth'ed API request
if token:
return token
else:
return None
def get_host_port(netloc):
""" Split the network location string into host and port and returns a tuple
where the host is a string and the the port is an integer.
@ -123,53 +38,3 @@ def get_host_port(netloc):
port = None
return host, port
def normalize_scope(scope):
"""Normalize the scope to verify that it is a list or tuple. A string
input will split the string by commas to create a list of scopes.
A list or tuple input is used directly.
Parameters:
- scope - a string representing scopes separated by commas,
or a list/tuple of scopes.
"""
if scope:
if isinstance(scope, str):
scopes = scope.split(',')
elif isinstance(scope, list) or isinstance(scope, tuple):
scopes = scope
else:
raise Exception(
"Unsupported scope value, please either provide a list of scopes, "
"or a string of scopes separated by commas."
)
return " ".join(sorted(scopes))
else:
return None
class Retry(urllib3.Retry):
"""
Custom class for printing a warning when a rate/request limit is reached.
"""
def increment(
self,
method: str | None = None,
url: str | None = None,
response: urllib3.BaseHTTPResponse | None = None,
error: Exception | None = None,
_pool: urllib3.connectionpool.ConnectionPool | None = None,
_stacktrace: TracebackType | None = None,
) -> urllib3.Retry:
if response:
retry_header = response.headers.get("Retry-After")
if self.is_retry(method, response.status, bool(retry_header)):
logging.warning("Your application has reached a rate/request limit. "
f"Retry will occur after: {retry_header}")
return super().increment(method,
url,
response=response,
error=error,
_pool=_pool,
_stacktrace=_stacktrace)

View File

@ -70,7 +70,8 @@ class AuthTestSpotipy(unittest.TestCase):
@classmethod
def setUpClass(self):
self.spotify = Spotify(
client_credentials_manager=SpotifyClientCredentials())
auth_manager=SpotifyClientCredentials()
)
self.spotify.trace = False
def test_audio_analysis(self):
@ -341,9 +342,9 @@ class AuthTestSpotipy(unittest.TestCase):
self.assertTrue(find_album())
def test_search_timeout(self):
client_credentials_manager = SpotifyClientCredentials()
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(requests_timeout=0.01,
client_credentials_manager=client_credentials_manager)
auth_manager=auth_manager)
# depending on the timing or bandwidth, this raises a timeout or connection error
self.assertRaises((requests.exceptions.Timeout, requests.exceptions.ConnectionError),
@ -352,17 +353,15 @@ class AuthTestSpotipy(unittest.TestCase):
@unittest.skip("flaky test, need a better method to test retries")
def test_max_retries_reached_get(self):
spotify_no_retry = Spotify(
client_credentials_manager=SpotifyClientCredentials(),
auth_manager=SpotifyClientCredentials(),
retries=0)
i = 0
while i < 100:
for i in range(100):
try:
spotify_no_retry.search(q='foo')
except SpotifyException as e:
self.assertIsInstance(e, SpotifyException)
self.assertEqual(e.http_status, 429)
return
i += 1
self.fail()
def test_album_search(self):
@ -460,7 +459,7 @@ class AuthTestSpotipy(unittest.TestCase):
sess = requests.Session()
sess.headers["user-agent"] = "spotipy-test"
with_custom_session = spotipy.Spotify(
client_credentials_manager=SpotifyClientCredentials(),
auth_manager=SpotifyClientCredentials(),
requests_session=sess)
self.assertTrue(
with_custom_session.user(
@ -469,7 +468,7 @@ class AuthTestSpotipy(unittest.TestCase):
def test_force_no_requests_session(self):
with_no_session = spotipy.Spotify(
client_credentials_manager=SpotifyClientCredentials(),
auth_manager=SpotifyClientCredentials(),
requests_session=False)
self.assertNotIsInstance(with_no_session._session, requests.Session)
user = with_no_session.user(user="akx")

View File

@ -2,15 +2,30 @@ import os
from spotipy import (
CLIENT_CREDS_ENV_VARS as CCEV,
prompt_for_user_token,
Spotify,
SpotifyException,
SpotifyImplicitGrant,
SpotifyPKCE
SpotifyOAuth,
SpotifyPKCE,
CacheFileHandler
)
import unittest
from tests import helpers
def _make_spotify(scopes=None, retries=None):
retries = retries or Spotify.max_retries
auth_manager = SpotifyOAuth(
scope=scopes,
cache_handler=CacheFileHandler()
)
spotify = Spotify(
auth_manager=auth_manager,
retries=retries
)
return spotify
class SpotipyPlaylistApiTest(unittest.TestCase):
@classmethod
@ -48,10 +63,10 @@ class SpotipyPlaylistApiTest(unittest.TestCase):
'user-read-playback-state'
)
token = prompt_for_user_token(cls.username, scope=scope)
cls.spotify = _make_spotify(scopes=scope)
cls.spotify_no_retry = _make_spotify(scopes=scope, retries=0)
cls.spotify = Spotify(auth=token)
cls.spotify_no_retry = Spotify(auth=token, retries=0)
cls.new_playlist_name = 'spotipy-playlist-test'
cls.new_playlist = helpers.get_spotify_playlist(
cls.spotify, cls.new_playlist_name, cls.username) or \
@ -201,17 +216,6 @@ class SpotipyPlaylistApiTest(unittest.TestCase):
return
self.fail()
def test_deprecated_starred(self):
pl = self.spotify.user_playlist(self.username)
self.assertTrue(pl["tracks"] is None)
self.assertTrue(pl["owner"] is None)
def test_deprecated_user_playlist(self):
# Test without user due to change from
# https://developer.spotify.com/community/news/2018/06/12/changes-to-playlist-uris/
pl = self.spotify.user_playlist(None, self.new_playlist['id'])
self.assertEqual(pl["tracks"]["total"], 0)
class SpotipyLibraryApiTests(unittest.TestCase):
@classmethod
@ -240,10 +244,7 @@ class SpotipyLibraryApiTests(unittest.TestCase):
'ugc-image-upload '
'user-read-playback-state'
)
token = prompt_for_user_token(cls.username, scope=scope)
cls.spotify = Spotify(auth=token)
cls.spotify = _make_spotify(scopes=scope)
def test_track_bad_id(self):
with self.assertRaises(SpotifyException):
@ -330,9 +331,7 @@ class SpotipyUserApiTests(unittest.TestCase):
'user-read-playback-state'
)
token = prompt_for_user_token(cls.username, scope=scope)
cls.spotify = Spotify(auth=token)
cls.spotify = _make_spotify(scopes=scope)
def test_basic_user_profile(self):
user = self.spotify.user(self.username)
@ -360,9 +359,7 @@ class SpotipyUserApiTests(unittest.TestCase):
class SpotipyBrowseApiTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
username = os.getenv(CCEV['client_username'])
token = prompt_for_user_token(username)
cls.spotify = Spotify(auth=token)
cls.spotify = _make_spotify()
def test_category(self):
rock_cat_id = '0JQ5DAqbMKFDXXwE9BDJAr'
@ -455,9 +452,7 @@ class SpotipyFollowApiTests(unittest.TestCase):
'user-read-playback-state'
)
token = prompt_for_user_token(cls.username, scope=scope)
cls.spotify = Spotify(auth=token)
cls.spotify = _make_spotify(scopes=scope)
def test_current_user_follows(self):
response = self.spotify.current_user_followed_artists()
@ -510,9 +505,7 @@ class SpotipyPlayerApiTests(unittest.TestCase):
'user-read-playback-state'
)
token = prompt_for_user_token(cls.username, scope=scope)
cls.spotify = Spotify(auth=token)
cls.spotify = _make_spotify(scopes=scope)
def test_devices(self):
# No devices playing by default
@ -526,23 +519,6 @@ class SpotipyPlayerApiTests(unittest.TestCase):
# not much more to test if account is inactive and has no recently played tracks
class SpotipyImplicitGrantTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
scope = (
'user-follow-read '
'user-follow-modify '
)
auth_manager = SpotifyImplicitGrant(scope=scope,
cache_path=".cache-implicittest")
cls.spotify = Spotify(auth_manager=auth_manager)
def test_current_user(self):
c_user = self.spotify.current_user()
user = self.spotify.user(c_user['id'])
self.assertEqual(c_user['display_name'], user['display_name'])
class SpotifyPKCETests(unittest.TestCase):
@classmethod
@ -551,7 +527,8 @@ class SpotifyPKCETests(unittest.TestCase):
'user-follow-read '
'user-follow-modify '
)
auth_manager = SpotifyPKCE(scope=scope, cache_path=".cache-pkcetest")
cache_handler = CacheFileHandler(cache_path=".cache-pkcetest")
auth_manager = SpotifyPKCE(scope=scope, cache_handler=cache_handler)
cls.spotify = Spotify(auth_manager=auth_manager)
def test_current_user(self):

View File

@ -5,10 +5,11 @@ import unittest
import unittest.mock as mock
import urllib.parse as urllibparse
from spotipy import SpotifyOAuth, SpotifyImplicitGrant, SpotifyPKCE
from spotipy.cache_handler import MemoryCacheHandler
from spotipy import SpotifyOAuth, SpotifyPKCE
from spotipy.cache_handler import CacheHandler
from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOauthError
from spotipy.oauth2 import SpotifyStateError
from spotipy import CacheFileHandler
patch = mock.patch
DEFAULT = mock.DEFAULT
@ -38,10 +39,6 @@ def _make_oauth(*args, **kwargs):
return SpotifyOAuth("CLID", "CLISEC", "REDIR", "STATE", *args, **kwargs)
def _make_implicitgrantauth(*args, **kwargs):
return SpotifyImplicitGrant("CLID", "REDIR", "STATE", *args, **kwargs)
def _make_pkceauth(*args, **kwargs):
return SpotifyPKCE("CLID", "REDIR", "STATE", *args, **kwargs)
@ -60,13 +57,12 @@ class OAuthCacheTest(unittest.TestCase):
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_oauth(scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = _make_oauth(scope, cache_handler=cache_handler)
cached_tok = spot.validate_token(spot.cache_handler.get_cached_token())
cached_tok_legacy = spot.get_cached_token()
opener.assert_called_with(path)
self.assertIsNotNone(cached_tok)
self.assertIsNotNone(cached_tok_legacy)
self.assertEqual(refresh_access_token.call_count, 0)
@patch.multiple(SpotifyOAuth,
@ -83,7 +79,8 @@ class OAuthCacheTest(unittest.TestCase):
opener.return_value = token_file
refresh_access_token.return_value = fresh_tok
spot = _make_oauth(scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = _make_oauth(scope, cache_handler=cache_handler)
spot.validate_token(spot.cache_handler.get_cached_token())
is_token_expired.assert_called_with(expired_tok)
@ -103,7 +100,9 @@ class OAuthCacheTest(unittest.TestCase):
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_oauth(requested_scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = _make_oauth(requested_scope, cache_handler=cache_handler)
cached_tok = spot.validate_token(spot.cache_handler.get_cached_token())
opener.assert_called_with(path)
@ -119,27 +118,13 @@ class OAuthCacheTest(unittest.TestCase):
fi = _fake_file()
opener.return_value = fi
spot = SpotifyOAuth("CLID", "CLISEC", "REDIR", "STATE", scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = SpotifyOAuth("CLID", "CLISEC", "REDIR", "STATE", scope, cache_handler=cache_handler)
spot.cache_handler.save_token_to_cache(tok)
opener.assert_called_with(path, 'w')
self.assertTrue(fi.write.called)
@patch('spotipy.cache_handler.open', create=True)
def test_saves_to_cache_path_legacy(self, opener):
scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, scope)
fi = _fake_file()
opener.return_value = fi
spot = SpotifyOAuth("CLID", "CLISEC", "REDIR", "STATE", scope, path)
spot._save_token_info(tok)
opener.assert_called_with(path, 'w')
self.assertTrue(fi.write.called)
def test_cache_handler(self):
scope = "playlist-modify-private"
tok = _make_fake_token(1, 1, scope)
@ -243,142 +228,6 @@ class TestSpotifyClientCredentials(unittest.TestCase):
oauth.get_access_token(check_cache=False)
self.assertEqual(error.exception.error, 'invalid_client')
class ImplicitGrantCacheTest(unittest.TestCase):
@patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT)
@patch('spotipy.cache_handler.open', create=True)
def test_gets_from_cache_path(self, opener, is_token_expired):
scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, scope)
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_implicitgrantauth(scope, path)
cached_tok = spot.cache_handler.get_cached_token()
cached_tok_legacy = spot.get_cached_token()
opener.assert_called_with(path)
self.assertIsNotNone(cached_tok)
self.assertIsNotNone(cached_tok_legacy)
@patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT)
@patch('spotipy.cache_handler.open', create=True)
def test_expired_token_returns_none(self, opener, is_token_expired):
scope = "playlist-modify-private"
path = ".cache-username"
expired_tok = _make_fake_token(0, None, scope)
token_file = _token_file(json.dumps(expired_tok, ensure_ascii=False))
opener.return_value = token_file
spot = _make_implicitgrantauth(scope, path)
cached_tok = spot.validate_token(spot.cache_handler.get_cached_token())
is_token_expired.assert_called_with(expired_tok)
opener.assert_any_call(path)
self.assertIsNone(cached_tok)
@patch.object(SpotifyImplicitGrant, "is_token_expired", DEFAULT)
@patch('spotipy.cache_handler.open', create=True)
def test_badly_scoped_token_bails(self, opener, is_token_expired):
token_scope = "playlist-modify-public"
requested_scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, token_scope)
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_implicitgrantauth(requested_scope, path)
cached_tok = spot.validate_token(spot.cache_handler.get_cached_token())
opener.assert_called_with(path)
self.assertIsNone(cached_tok)
@patch('spotipy.cache_handler.open', create=True)
def test_saves_to_cache_path(self, opener):
scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, scope)
fi = _fake_file()
opener.return_value = fi
spot = SpotifyImplicitGrant("CLID", "REDIR", "STATE", scope, path)
spot.cache_handler.save_token_to_cache(tok)
opener.assert_called_with(path, 'w')
self.assertTrue(fi.write.called)
@patch('spotipy.cache_handler.open', create=True)
def test_saves_to_cache_path_legacy(self, opener):
scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, scope)
fi = _fake_file()
opener.return_value = fi
spot = SpotifyImplicitGrant("CLID", "REDIR", "STATE", scope, path)
spot._save_token_info(tok)
opener.assert_called_with(path, 'w')
self.assertTrue(fi.write.called)
class TestSpotifyImplicitGrant(unittest.TestCase):
def test_get_authorize_url_doesnt_pass_state_by_default(self):
auth = SpotifyImplicitGrant("CLID", "REDIR")
url = auth.get_authorize_url()
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertNotIn('state', parsed_qs)
def test_get_authorize_url_passes_state_from_constructor(self):
state = "STATE"
auth = SpotifyImplicitGrant("CLID", "REDIR", state)
url = auth.get_authorize_url()
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertEqual(parsed_qs['state'][0], state)
def test_get_authorize_url_passes_state_from_func_call(self):
state = "STATE"
auth = SpotifyImplicitGrant("CLID", "REDIR", "NOT STATE")
url = auth.get_authorize_url(state=state)
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertEqual(parsed_qs['state'][0], state)
def test_get_authorize_url_does_not_show_dialog_by_default(self):
auth = SpotifyImplicitGrant("CLID", "REDIR")
url = auth.get_authorize_url()
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertNotIn('show_dialog', parsed_qs)
def test_get_authorize_url_shows_dialog_when_requested(self):
auth = SpotifyImplicitGrant("CLID", "REDIR", show_dialog=True)
url = auth.get_authorize_url()
parsed_url = urllibparse.urlparse(url)
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertTrue(parsed_qs['show_dialog'])
class SpotifyPKCECacheTest(unittest.TestCase):
@patch.multiple(SpotifyPKCE,
@ -393,13 +242,12 @@ class SpotifyPKCECacheTest(unittest.TestCase):
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_pkceauth(scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = _make_pkceauth(scope, cache_handler=cache_handler)
cached_tok = spot.cache_handler.get_cached_token()
cached_tok_legacy = spot.get_cached_token()
opener.assert_called_with(path)
self.assertIsNotNone(cached_tok)
self.assertIsNotNone(cached_tok_legacy)
self.assertEqual(refresh_access_token.call_count, 0)
@patch.multiple(SpotifyPKCE,
@ -416,7 +264,8 @@ class SpotifyPKCECacheTest(unittest.TestCase):
opener.return_value = token_file
refresh_access_token.return_value = fresh_tok
spot = _make_pkceauth(scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = _make_pkceauth(scope, cache_handler=cache_handler)
spot.validate_token(spot.cache_handler.get_cached_token())
is_token_expired.assert_called_with(expired_tok)
@ -436,7 +285,8 @@ class SpotifyPKCECacheTest(unittest.TestCase):
opener.return_value = _token_file(json.dumps(tok, ensure_ascii=False))
is_token_expired.return_value = False
spot = _make_pkceauth(requested_scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = _make_pkceauth(requested_scope, cache_handler=cache_handler)
cached_tok = spot.validate_token(spot.cache_handler.get_cached_token())
opener.assert_called_with(path)
@ -452,27 +302,13 @@ class SpotifyPKCECacheTest(unittest.TestCase):
fi = _fake_file()
opener.return_value = fi
spot = SpotifyPKCE("CLID", "REDIR", "STATE", scope, path)
cache_handler = CacheFileHandler(cache_path=path)
spot = SpotifyPKCE("CLID", "REDIR", "STATE", scope, cache_handler=cache_handler)
spot.cache_handler.save_token_to_cache(tok)
opener.assert_called_with(path, 'w')
self.assertTrue(fi.write.called)
@patch('spotipy.cache_handler.open', create=True)
def test_saves_to_cache_path_legacy(self, opener):
scope = "playlist-modify-private"
path = ".cache-username"
tok = _make_fake_token(1, 1, scope)
fi = _fake_file()
opener.return_value = fi
spot = SpotifyPKCE("CLID", "REDIR", "STATE", scope, path)
spot._save_token_info(tok)
opener.assert_called_with(path, 'w')
self.assertTrue(fi.write.called)
class TestSpotifyPKCE(unittest.TestCase):

View File

@ -88,3 +88,28 @@ class SpotipyScopeTest(TestCase):
self.assertEqual(normalized_scope_string_2, "")
self.assertIsNone(self.normalize_scope(None))
def test_all_scopes(self):
expected_scopes = {
Scope.user_read_currently_playing,
Scope.playlist_read_collaborative,
Scope.playlist_modify_private,
Scope.user_read_playback_position,
Scope.user_library_modify,
Scope.user_top_read,
Scope.user_read_playback_state,
Scope.user_read_email,
Scope.ugc_image_upload,
Scope.user_read_private,
Scope.playlist_modify_public,
Scope.user_library_read,
Scope.streaming,
Scope.user_read_recently_played,
Scope.user_follow_read,
Scope.user_follow_modify,
Scope.app_remote_control,
Scope.playlist_read_private,
Scope.user_modify_playback_state,
}
self.assertEqual(expected_scopes, Scope.all())