-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprogram.py
135 lines (108 loc) · 4.36 KB
/
program.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""This script performs Authorization Code + PKCE Authentication against CONNECT data services"""
import base64
import hashlib
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import secrets
from urllib.parse import urlparse, parse_qs
import webbrowser
import requests
def get_appsettings():
"""Open and parse the appsettings.json file"""
# Try to open the configuration file
try:
with open(
'appsettings.json',
'r',
) as f:
appsettings = json.load(f)
except Exception as error:
print(f'Error: {str(error)}')
print(f'Could not open/read appsettings.json')
exit()
return appsettings
def main(test_script=None):
"""Main sample script
Performs Authorization Code + PKCE Authentication against CONNECT data services.
If test=True, will use Selenium to perform browser authentication automatically.
"""
try:
appsettings = get_appsettings()
resource = appsettings.get('Resource')
tenant_id = appsettings.get('TenantId')
client_id = appsettings.get('ClientId')
redirect_uri = 'http://localhost:5004/callback.html'
scope = 'openid ocsapi'
# Set up PKCE Verifier and Code Challenge
verifier = base64.urlsafe_b64encode(
secrets.token_bytes(32)).rstrip(b'=')
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier).digest()).rstrip(b'=')
# Get OAuth endpoint configuration
print()
print('Step 1: Get OAuth endpoint configuration...')
endpoint = json.loads(requests.get(
resource + '/identity/.well-known/openid-configuration').content)
auth_endpoint = endpoint.get('authorization_endpoint')
token_endpoint = endpoint.get('token_endpoint')
# Set up request handler for web browser login
print()
print('Step 2: Set up server to process authorization response...')
class RequestHandler(BaseHTTPRequestHandler):
"""Handles authentication redirect uri and extracts authorization code from URL"""
code = ''
def do_GET(self):
"""Handles GET request against this temporary local server"""
# Parse out authorization code from query string in request
RequestHandler.code = parse_qs(
urlparse(self.path).query)['code'][0]
# Write response
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.end_headers()
self.wfile.write(
'<h1>You can now return to the application.</h1>'.encode())
# Set up server for web browser login
server = HTTPServer(('', 5004), RequestHandler)
# Open web browser against authorization endpoint
print()
print('Step 3: Authorize the user...')
auth_url = auth_endpoint + \
'?response_type=code&code_challenge=' + challenge.decode() + \
'&code_challenge_method=S256&client_id=' + client_id + \
'&redirect_uri=' + redirect_uri + \
'&scope=' + scope + \
'&acr_values=tenant:' + tenant_id
if test_script:
test_script(auth_url)
else:
# Open user default web browser at Auth page
webbrowser.open(auth_url)
# Wait for response in browser
print()
print('Step 4: Set server to handle one request...')
server.handle_request()
# Use authorization code to get bearer token
print()
print('Step 5: Get a token using the authorization code...')
token = requests.post(token_endpoint, [
('grant_type', 'authorization_code'),
('client_id', client_id),
('code_verifier', verifier),
('code', RequestHandler.code),
('redirect_uri', redirect_uri)])
token = json.loads(token.content).get('access_token')
print()
print('Step 6: Read the Access Token:')
print(token)
if test_script:
assert token, "Failed to obtain access token"
print()
print('Complete!')
except Exception as error:
print()
msg = "Encountered Error: {error}".format(error=error)
print(msg)
assert False, msg
if __name__ == "__main__":
main()