Check the state (#509)

* - Verify that the state received alongside the authorization code is consistent with the one sent
- Refactor URL parsing for the local server way and the interactive way
- Add tests for interactive way

* Resurrect public methods parse_response_code and get_authorization_code

* Use new method parse_oatuh_response_url for parse_response_code implementation.
This commit is contained in:
foobuzz 2020-06-20 22:48:19 +02:00 committed by GitHub
parent d7ebc611b2
commit ed136d15df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 20 deletions

View File

@ -322,11 +322,24 @@ class SpotifyOAuth(SpotifyAuthBase):
Parameters:
- url - the response url
"""
url_split = url.split("?code=")
if len(url_split) <= 1:
_, code, _ = self.parse_oauth_response_url(url)
if code is None:
return url
else:
return url_split[1].split("&")[0]
return code
@staticmethod
def parse_oauth_response_url(url):
query_s = urlparse(url).query
form = dict(parse_qsl(query_s))
return tuple(form.get(param) for param in ['state', 'code', 'error'])
@staticmethod
def _get_user_input(prompt):
try:
return raw_input(prompt)
except NameError:
return input(prompt)
def _make_authorization_headers(self):
return _make_authorization_headers(self.client_id, self.client_secret)
@ -341,18 +354,20 @@ class SpotifyOAuth(SpotifyAuthBase):
def _get_auth_response_interactive(self):
self._open_auth_url()
try:
response = raw_input("Enter the URL you were redirected to: ")
except NameError:
response = input("Enter the URL you were redirected to: ")
return self.parse_response_code(response)
response = SpotifyOAuth._get_user_input("Enter the URL you were redirected to: ")
state, code, _ = SpotifyOAuth.parse_oauth_response_url(response)
if self.state is not None and self.state != state:
raise SpotifyOauthError("Received inconsistent state from OAuth server.")
return code
def _get_auth_response_local_server(self, redirect_port):
server = start_local_http_server(redirect_port)
self._open_auth_url()
server.handle_request()
if self.state is not None and server.state != self.state:
raise SpotifyOauthError("Received inconsistent state from OAuth server.")
if server.auth_code is not None:
return server.auth_code
elif server.error is not None:
@ -420,7 +435,7 @@ class SpotifyOAuth(SpotifyAuthBase):
payload = {
"redirect_uri": self.redirect_uri,
"code": code or self.get_authorization_code(),
"code": code or self.get_auth_response(),
"grant_type": "authorization_code",
}
if self.scope:
@ -507,21 +522,19 @@ class SpotifyOAuth(SpotifyAuthBase):
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
query_s = urlparse(self.path).query
form = dict(parse_qsl(query_s))
state, auth_code, error = SpotifyOAuth.parse_oauth_response_url(self.path)
self.server.state = state
self.server.auth_code = auth_code
self.server.error = error
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
if "code" in form:
self.server.auth_code = form["code"]
self.server.error = None
if self.server.auth_code:
status = "successful"
elif "error" in form:
self.server.error = form["error"]
self.server.auth_code = None
status = "failed ({})".format(form["error"])
elif self.server.error:
status = "failed ({})".format(self.server.error)
else:
self._write("<html><body><h1>Invalid request</h1></body></html>")
return

View File

@ -26,6 +26,7 @@ def prompt_for_user_token(
client_id=None,
client_secret=None,
redirect_uri=None,
state=None,
cache_path=None,
oauth_manager=None,
show_dialog=False
@ -84,6 +85,7 @@ def prompt_for_user_token(
client_id,
client_secret,
redirect_uri,
state=state,
scope=scope,
cache_path=cache_path,
show_dialog=show_dialog

View File

@ -119,7 +119,7 @@ class OAuthCacheTest(unittest.TestCase):
self.assertTrue(fi.write.called)
class TestSpotifyOAuth(unittest.TestCase):
class TestSpotifyOAuthGetAuthorizeUrl(unittest.TestCase):
def test_get_authorize_url_doesnt_pass_state_by_default(self):
oauth = SpotifyOAuth("CLID", "CLISEC", "REDIR")
@ -168,6 +168,46 @@ class TestSpotifyOAuth(unittest.TestCase):
parsed_qs = urllibparse.parse_qs(parsed_url.query)
self.assertTrue(parsed_qs['show_dialog'])
class TestSpotifyOAuthGetAuthResponseInteractive(unittest.TestCase):
@patch('spotipy.oauth2.webbrowser')
@patch(
'spotipy.oauth2.SpotifyOAuth._get_user_input',
return_value="redir.io?code=abcde"
)
def test_get_auth_response_without_state(self, webbrowser_mock, get_user_input_mock):
oauth = SpotifyOAuth("CLID", "CLISEC", "redir.io")
code = oauth.get_auth_response()
self.assertEqual(code, "abcde")
@patch('spotipy.oauth2.webbrowser')
@patch(
'spotipy.oauth2.SpotifyOAuth._get_user_input',
return_value="redir.io?code=abcde&state=wxyz"
)
def test_get_auth_response_with_consistent_state(self, webbrowser_mock, get_user_input_mock):
oauth = SpotifyOAuth("CLID", "CLISEC", "redir.io", state='wxyz')
code = oauth.get_auth_response()
self.assertEqual(code, "abcde")
@patch('spotipy.oauth2.webbrowser')
@patch(
'spotipy.oauth2.SpotifyOAuth._get_user_input',
return_value="redir.io?code=abcde&state=someotherstate"
)
def test_get_auth_response_with_inconsistent_state(self, webbrowser_mock, get_user_input_mock):
oauth = SpotifyOAuth("CLID", "CLISEC", "redir.io", state='wxyz')
with self.assertRaisesRegexp(
SpotifyOauthError,
"Received inconsistent state from OAuth server."
):
oauth.get_auth_response()
class TestSpotifyClientCredentials(unittest.TestCase):
def test_spotify_client_credentials_get_access_token(self):
oauth = SpotifyClientCredentials(client_id='ID', client_secret='SECRET')
with self.assertRaises(SpotifyOauthError) as error: