diff --git a/spotipy/__init__.py b/spotipy/__init__.py index 7f3d859..bb2e201 100644 --- a/spotipy/__init__.py +++ b/spotipy/__init__.py @@ -3,3 +3,4 @@ from .client import * # noqa from .exceptions import * # noqa from .oauth2 import * # noqa from .util import * # noqa +from .scope import * # noqa diff --git a/spotipy/cache_handler.py b/spotipy/cache_handler.py index 7ae94a2..99f0720 100644 --- a/spotipy/cache_handler.py +++ b/spotipy/cache_handler.py @@ -14,33 +14,32 @@ import os from spotipy.util import CLIENT_CREDS_ENV_VARS from redis import RedisError +from abc import ABC, abstractmethod logger = logging.getLogger(__name__) -class CacheHandler(): +class CacheHandler(ABC): """ An abstraction layer for handling the caching and retrieval of authorization tokens. - Custom extensions of this class must implement get_cached_token - and save_token_to_cache methods with the same input and output - structure as the CacheHandler class. + Clients are expected to subclass this class and override the + get_cached_token and save_token_to_cache methods with the same + type signatures of this class. """ + @abstractmethod def get_cached_token(self): """ Get and return a token_info dictionary object. """ - # return token_info - raise NotImplementedError() + @abstractmethod def save_token_to_cache(self, token_info): """ Save a token_info dictionary object to the cache and return None. """ - raise NotImplementedError() - return None class CacheFileHandler(CacheHandler): diff --git a/spotipy/oauth2.py b/spotipy/oauth2.py index 1591e77..828d4f0 100644 --- a/spotipy/oauth2.py +++ b/spotipy/oauth2.py @@ -277,7 +277,11 @@ class SpotifyOAuth(SpotifyAuthBase): * client_secret: Must be supplied or set as environment variable * redirect_uri: Must be supplied or set as environment variable * state: Optional, no verification is performed - * scope: Optional, either a list of scopes or comma separated string of scopes. + * scope: Optional, either a string of scopes, or an iterable with elements of type + `Scope` or `str`. E.g., + {Scope.user_modify_playback_state, Scope.user_library_read} + + iterable of scopes or comma separated string of scopes. e.g, "playlist-read-private,playlist-read-collaborative" * cache_path: (deprecated) Optional, will otherwise be generated (takes precedence over `username`) @@ -1011,8 +1015,9 @@ class SpotifyImplicitGrant(SpotifyAuthBase): * client_id: Must be supplied or set as environment variable * redirect_uri: Must be supplied or set as environment variable * state: May be supplied, no verification is performed - * scope: Optional, either a list of scopes or comma separated string of scopes. - e.g, "playlist-read-private,playlist-read-collaborative" + * scope: Optional, either a string of scopes, or an iterable with elements of type + `Scope` or `str`. E.g., + {Scope.user_modify_playback_state, Scope.user_library_read} * cache_handler: An instance of the `CacheHandler` class to handle getting and saving cached authorization tokens. May be supplied, will otherwise use `CacheFileHandler`. diff --git a/spotipy/scope.py b/spotipy/scope.py new file mode 100644 index 0000000..79a4adc --- /dev/null +++ b/spotipy/scope.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- + +__all__ = ["Scope"] + +from enum import Enum +import re +from typing import Iterable, Set + + +class Scope(Enum): + """ + The Spotify authorization scopes + + Create a Scope from a string: + + scope = Scope("playlist-modify-private") + + Create a set of scopes: + + scopes = { + Scope.user_read_currently_playing, + Scope.playlist_read_collaborative, + Scope.playlist_modify_public + } + """ + + user_read_currently_playing = "user-read-currently-playing" + playlist_read_collaborative = "playlist-read-collaborative" + playlist_modify_private = "playlist-modify-private" + user_read_playback_position = "user-read-playback-position" + user_library_modify = "user-library-modify" + user_top_read = "user-top-read" + user_read_playback_state = "user-read-playback-state" + user_read_email = "user-read-email" + ugc_image_upload = "ugc-image-upload" + user_read_private = "user-read-private" + playlist_modify_public = "playlist-modify-public" + user_library_read = "user-library-read" + streaming = "streaming" + user_read_recently_played = "user-read-recently-played" + user_follow_read = "user-follow-read" + user_follow_modify = "user-follow-modify" + app_remote_control = "app-remote-control" + playlist_read_private = "playlist-read-private" + user_modify_playback_state = "user-modify-playback-state" + + @staticmethod + def all() -> Set['Scope']: + """Returns all of the authorization scopes""" + + return set(Scope) + + @staticmethod + def make_string(scopes: Iterable['Scope']) -> str: + """ + Converts an iterable of scopes to a space-separated string. + + * scopes: An iterable of scopes. + + returns: a space-separated string of scopes + """ + return " ".join([scope.value for scope in scopes]) + + @staticmethod + def from_string(scope_string: str) -> Set['Scope']: + """ + Converts a string of (usuallly space-separated) scopes into a + set of scopes + + Any scope-strings that do not match any of the known scopes are + ignored. + + * scope_string: a string of scopes + + returns: a set of scopes. + """ + scope_string_list = re.split(pattern=r"[^\w-]+", string=scope_string) + scopes = set() + for scope_string in sorted(scope_string_list): + try: + scope = Scope(scope_string) + scopes.add(scope) + except ValueError: + pass + return scopes diff --git a/tests/unit/test_scopes.py b/tests/unit/test_scopes.py new file mode 100644 index 0000000..dcaefe8 --- /dev/null +++ b/tests/unit/test_scopes.py @@ -0,0 +1,90 @@ +from unittest import TestCase +from spotipy.scope import Scope +from spotipy.oauth2 import SpotifyAuthBase + + +class SpotipyScopeTest(TestCase): + + @classmethod + def setUpClass(cls): + cls.auth_manager = SpotifyAuthBase(requests_session=True) + + def normalize_scope(self, scope): + return self.auth_manager._normalize_scope(scope) + + def test_empty_scope(self): + scopes = set() + scope_string = Scope.make_string(scopes) + + normalized_scope_string = self.normalize_scope(scopes) + normalized_scope_string_2 = self.normalize_scope(scope_string) + + self.assertEqual(scope_string, "") + self.assertEqual(normalized_scope_string, "") + self.assertEqual(normalized_scope_string_2, "") + + converted_scopes = Scope.from_string(scope_string) + self.assertEqual(converted_scopes, set()) + + def test_scopes(self): + scopes = { + Scope.playlist_modify_public, + Scope.playlist_read_collaborative, + Scope.user_read_playback_state, + Scope.ugc_image_upload + } + normalized_scope_string = self.normalize_scope(scopes) + scope_string = Scope.make_string(scopes) + self.assertEqual(scope_string, normalized_scope_string) + + normalized_scope_string_2 = self.normalize_scope(scope_string) + + converted_scopes = Scope.from_string(scope_string) + normalized_converted_scope = Scope.from_string(normalized_scope_string) + normalized_converted_scope_2 = Scope.from_string(normalized_scope_string_2) + self.assertEqual(scopes, converted_scopes) + self.assertEqual(scopes, normalized_converted_scope) + self.assertEqual(scopes, normalized_converted_scope_2) + + def test_single_scope(self): + scope_string = "user-modify-playback-state" + scope = Scope(scope_string) + self.assertEqual(scope, Scope.user_modify_playback_state) + self.assertEqual(scope_string, scope.value) + + def test_scope_string(self): + scope_string = ( + "user-read-currently-playing playlist-read-collaborative,user-library-read " + "playlist-read-private user-read-email" + ) + expected_scopes = { + Scope.user_read_currently_playing, + Scope.playlist_read_collaborative, + Scope.user_library_read, + Scope.playlist_read_private, + Scope.user_read_email + } + parsed_scopes = Scope.from_string(scope_string) + normalized_scope_string = self.normalize_scope(scope_string) + normalized_parsed_scopes = Scope.from_string(normalized_scope_string) + self.assertEqual(parsed_scopes, expected_scopes) + self.assertEqual(normalized_parsed_scopes, expected_scopes) + + def test_invalid_types(self): + + numbers = [1, 2, 3] + with self.assertRaises(TypeError): + self.normalize_scope(numbers) + + with self.assertRaises(TypeError): + self.normalize_scope(True) + + def test_normalize_scope(self): + + normalized_scope_string = self.normalize_scope([]) + self.assertEqual(normalized_scope_string, "") + + normalized_scope_string_2 = self.normalize_scope(()) + self.assertEqual(normalized_scope_string_2, "") + + self.assertIsNone(self.normalize_scope(None))