mirror of
https://github.com/spotipy-dev/spotipy.git
synced 2026-06-19 09:13:53 +00:00
oauth2.py: add local http server to serve redirects (#243)
When spotify redirects the browser to localhost, serve the page with a simple http server implemented by HTTPServer. By this way, we can obtain the tokens the oauth service want so provide us, and no need to input the user the redirect URL. This method only works when the application in spotify developer dashboard is configured to redirect the requests to localhost. To use this method it is required to specify the redirect_url pointing to localhost with simple http, to a non-privilegized port (unless the script is run as root).
This commit is contained in:
parent
7a06046e2b
commit
acdcfcecf8
@ -17,11 +17,13 @@ import time
|
|||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from spotipy.util import CLIENT_CREDS_ENV_VARS
|
from spotipy.util import CLIENT_CREDS_ENV_VARS, get_host_port
|
||||||
|
|
||||||
# Workaround to support both python 2 & 3
|
# Workaround to support both python 2 & 3
|
||||||
import six
|
import six
|
||||||
import six.moves.urllib.parse as urllibparse
|
import six.moves.urllib.parse as urllibparse
|
||||||
|
from six.moves.BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
|
||||||
|
from six.moves.urllib_parse import urlparse, parse_qsl
|
||||||
|
|
||||||
|
|
||||||
class SpotifyOauthError(Exception):
|
class SpotifyOauthError(Exception):
|
||||||
@ -294,6 +296,42 @@ class SpotifyOAuth(SpotifyAuthBase):
|
|||||||
def _make_authorization_headers(self):
|
def _make_authorization_headers(self):
|
||||||
return _make_authorization_headers(self.client_id, self.client_secret)
|
return _make_authorization_headers(self.client_id, self.client_secret)
|
||||||
|
|
||||||
|
def _open_auth_url(self):
|
||||||
|
auth_url = self.get_authorize_url()
|
||||||
|
|
||||||
|
try:
|
||||||
|
import webbrowser
|
||||||
|
webbrowser.open(auth_url)
|
||||||
|
print("Opened %s in your browser" % auth_url)
|
||||||
|
except webbrowser.Error:
|
||||||
|
print("Please navigate here: %s" % auth_url)
|
||||||
|
|
||||||
|
def _get_auth_response_interactive(self):
|
||||||
|
self._open_auth_url()
|
||||||
|
print("")
|
||||||
|
print("")
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = raw_input("Enter the URL you were redirected to: ")
|
||||||
|
except NameError:
|
||||||
|
response = input("Enter the URL you were redirected to: ")
|
||||||
|
print("")
|
||||||
|
print("")
|
||||||
|
return self.parse_response_code(response)
|
||||||
|
|
||||||
|
def _get_auth_response_local_server(self, redirect_port):
|
||||||
|
|
||||||
|
with start_local_http_server(redirect_port) as server:
|
||||||
|
self._open_auth_url()
|
||||||
|
server.handle_request()
|
||||||
|
|
||||||
|
if server.auth_code is not None:
|
||||||
|
return server.auth_code
|
||||||
|
elif server.error is not None:
|
||||||
|
raise SpotifyOauthError("Received error from OAuth server: {}".format(server.error))
|
||||||
|
else:
|
||||||
|
raise SpotifyOauthError("Server listening on localhost has not been accessed")
|
||||||
|
|
||||||
def get_auth_response(self):
|
def get_auth_response(self):
|
||||||
print(
|
print(
|
||||||
"""
|
"""
|
||||||
@ -301,28 +339,22 @@ class SpotifyOAuth(SpotifyAuthBase):
|
|||||||
User authentication requires interaction with your
|
User authentication requires interaction with your
|
||||||
web browser. Once you enter your credentials and
|
web browser. Once you enter your credentials and
|
||||||
give authorization, you will be redirected to
|
give authorization, you will be redirected to
|
||||||
a url. Paste that url you were directed to to
|
a url.
|
||||||
complete the authorization.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
auth_url = self.get_authorize_url()
|
|
||||||
try:
|
|
||||||
import webbrowser
|
|
||||||
|
|
||||||
webbrowser.open(auth_url)
|
redirect_info = urlparse(self.redirect_uri)
|
||||||
print("Opened %s in your browser" % auth_url)
|
redirect_host, redirect_port = get_host_port(redirect_info.netloc)
|
||||||
except BaseException:
|
|
||||||
print("Please navigate here: %s" % auth_url)
|
if redirect_host in ("127.0.0.1", "localhost") and redirect_info.scheme == "http":
|
||||||
print("")
|
return self._get_auth_response_local_server(redirect_port)
|
||||||
print("")
|
else:
|
||||||
try:
|
print("""
|
||||||
response = raw_input("Enter the URL you were redirected to: ")
|
Paste that url you were directed to to
|
||||||
except NameError:
|
complete the authorization.
|
||||||
response = input("Enter the URL you were redirected to: ")
|
""")
|
||||||
print("")
|
return self._get_auth_response_interactive()
|
||||||
print("")
|
|
||||||
return response
|
|
||||||
|
|
||||||
def get_authorization_code(self, response=None):
|
def get_authorization_code(self, response=None):
|
||||||
return self.parse_response_code(response or self.get_auth_response())
|
return self.parse_response_code(response or self.get_auth_response())
|
||||||
@ -432,3 +464,46 @@ class SpotifyOAuth(SpotifyAuthBase):
|
|||||||
|
|
||||||
def _warn(self, msg):
|
def _warn(self, msg):
|
||||||
print("warning:" + msg, file=sys.stderr)
|
print("warning:" + msg, file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
|
class RequestHandler(BaseHTTPRequestHandler):
|
||||||
|
def do_GET(self):
|
||||||
|
query_s = urlparse(self.path).query
|
||||||
|
form = dict(parse_qsl(query_s))
|
||||||
|
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "text/html")
|
||||||
|
self.end_headers()
|
||||||
|
|
||||||
|
if "code" in form:
|
||||||
|
self.server.auth_code = form["code"]
|
||||||
|
self.server.error = None
|
||||||
|
status = "successful"
|
||||||
|
elif "error" in form:
|
||||||
|
self.server.error = form["error"]
|
||||||
|
self.server.auth_code = None
|
||||||
|
status = "failed ({})".format(form["error"])
|
||||||
|
else:
|
||||||
|
self._write("<html><body><h1>Invalid request</h1></body></html>")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._write("""<html>
|
||||||
|
<body>
|
||||||
|
<h1>Authentication status: {}</h1>
|
||||||
|
Now you can close this window or tab.
|
||||||
|
</body>
|
||||||
|
</html>""".format(status))
|
||||||
|
|
||||||
|
def _write(self, text):
|
||||||
|
return self.wfile.write(text.encode("utf-8"))
|
||||||
|
|
||||||
|
def log_message(self, format, *args):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def start_local_http_server(port, handler=RequestHandler):
|
||||||
|
server = HTTPServer(("127.0.0.1", port), handler)
|
||||||
|
server.allow_reuse_address = True
|
||||||
|
server.auth_code = None
|
||||||
|
server.error = None
|
||||||
|
return server
|
||||||
|
|||||||
@ -87,8 +87,7 @@ def prompt_for_user_token(
|
|||||||
token_info = sp_oauth.get_cached_token()
|
token_info = sp_oauth.get_cached_token()
|
||||||
|
|
||||||
if not token_info:
|
if not token_info:
|
||||||
url = sp_oauth.get_auth_response()
|
code = sp_oauth.get_auth_response()
|
||||||
code = sp_oauth.parse_response_code(url)
|
|
||||||
token = sp_oauth.get_access_token(code, as_dict=False)
|
token = sp_oauth.get_access_token(code, as_dict=False)
|
||||||
else:
|
else:
|
||||||
return token_info["access_token"]
|
return token_info["access_token"]
|
||||||
@ -98,3 +97,14 @@ def prompt_for_user_token(
|
|||||||
return token
|
return token
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_host_port(netloc, default_port=80):
|
||||||
|
if ":" in netloc:
|
||||||
|
host, port = netloc.split(":", 1)
|
||||||
|
port = int(port)
|
||||||
|
else:
|
||||||
|
host = netloc
|
||||||
|
port = default_port
|
||||||
|
|
||||||
|
return (host, port)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user