Skip to content

Commit

Permalink
Adjust format to black
Browse files Browse the repository at this point in the history
  • Loading branch information
pitbulk committed Jun 23, 2024
1 parent 7d8184a commit 260b78d
Show file tree
Hide file tree
Showing 34 changed files with 2,729 additions and 3,378 deletions.
7 changes: 4 additions & 3 deletions demo-django/demo/urls.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from django.urls import re_path
from django.contrib import admin
from .views import attrs, index, metadata

admin.autodiscover()

urlpatterns = [
re_path(r'^$', index, name='index'),
re_path(r'^attrs/$', attrs, name='attrs'),
re_path(r'^metadata/$', metadata, name='metadata'),
re_path(r"^$", index, name="index"),
re_path(r"^attrs/$", attrs, name="attrs"),
re_path(r"^metadata/$", metadata, name="metadata"),
]
98 changes: 48 additions & 50 deletions demo-django/demo/views.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from django.conf import settings
from django.urls import reverse
from django.http import (HttpResponse, HttpResponseRedirect,
HttpResponseServerError)
from django.http import HttpResponse, HttpResponseRedirect, HttpResponseServerError
from django.shortcuts import render

from onelogin.saml2.auth import OneLogin_Saml2_Auth
Expand All @@ -17,13 +16,13 @@ def init_saml_auth(req):
def prepare_django_request(request):
# If server is behind proxys or balancers use the HTTP_X_FORWARDED fields
result = {
'https': 'on' if request.is_secure() else 'off',
'http_host': request.META['HTTP_HOST'],
'script_name': request.META['PATH_INFO'],
'get_data': request.GET.copy(),
"https": "on" if request.is_secure() else "off",
"http_host": request.META["HTTP_HOST"],
"script_name": request.META["PATH_INFO"],
"get_data": request.GET.copy(),
# Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144
# 'lowercase_urlencoding': True,
'post_data': request.POST.copy()
"post_data": request.POST.copy(),
}
return result

Expand All @@ -38,61 +37,61 @@ def index(request):
attributes = False
paint_logout = False

if 'sso' in req['get_data']:
if "sso" in req["get_data"]:
return HttpResponseRedirect(auth.login())
# If AuthNRequest ID need to be stored in order to later validate it, do instead
# sso_built_url = auth.login()
# request.session['AuthNRequestID'] = auth.get_last_request_id()
# return HttpResponseRedirect(sso_built_url)
elif 'sso2' in req['get_data']:
return_to = OneLogin_Saml2_Utils.get_self_url(req) + reverse('attrs')
elif "sso2" in req["get_data"]:
return_to = OneLogin_Saml2_Utils.get_self_url(req) + reverse("attrs")
return HttpResponseRedirect(auth.login(return_to))
elif 'slo' in req['get_data']:
elif "slo" in req["get_data"]:
name_id = session_index = name_id_format = name_id_nq = name_id_spnq = None
if 'samlNameId' in request.session:
name_id = request.session['samlNameId']
if 'samlSessionIndex' in request.session:
session_index = request.session['samlSessionIndex']
if 'samlNameIdFormat' in request.session:
name_id_format = request.session['samlNameIdFormat']
if 'samlNameIdNameQualifier' in request.session:
name_id_nq = request.session['samlNameIdNameQualifier']
if 'samlNameIdSPNameQualifier' in request.session:
name_id_spnq = request.session['samlNameIdSPNameQualifier']
if "samlNameId" in request.session:
name_id = request.session["samlNameId"]
if "samlSessionIndex" in request.session:
session_index = request.session["samlSessionIndex"]
if "samlNameIdFormat" in request.session:
name_id_format = request.session["samlNameIdFormat"]
if "samlNameIdNameQualifier" in request.session:
name_id_nq = request.session["samlNameIdNameQualifier"]
if "samlNameIdSPNameQualifier" in request.session:
name_id_spnq = request.session["samlNameIdSPNameQualifier"]

return HttpResponseRedirect(auth.logout(name_id=name_id, session_index=session_index, nq=name_id_nq, name_id_format=name_id_format, spnq=name_id_spnq))
# If LogoutRequest ID need to be stored in order to later validate it, do instead
# slo_built_url = auth.logout(name_id=name_id, session_index=session_index)
# request.session['LogoutRequestID'] = auth.get_last_request_id()
# return HttpResponseRedirect(slo_built_url)
elif 'acs' in req['get_data']:
elif "acs" in req["get_data"]:
request_id = None
if 'AuthNRequestID' in request.session:
request_id = request.session['AuthNRequestID']
if "AuthNRequestID" in request.session:
request_id = request.session["AuthNRequestID"]

auth.process_response(request_id=request_id)
errors = auth.get_errors()
not_auth_warn = not auth.is_authenticated()

if not errors:
if 'AuthNRequestID' in request.session:
del request.session['AuthNRequestID']
request.session['samlUserdata'] = auth.get_attributes()
request.session['samlNameId'] = auth.get_nameid()
request.session['samlNameIdFormat'] = auth.get_nameid_format()
request.session['samlNameIdNameQualifier'] = auth.get_nameid_nq()
request.session['samlNameIdSPNameQualifier'] = auth.get_nameid_spnq()
request.session['samlSessionIndex'] = auth.get_session_index()
if 'RelayState' in req['post_data'] and OneLogin_Saml2_Utils.get_self_url(req) != req['post_data']['RelayState']:
if "AuthNRequestID" in request.session:
del request.session["AuthNRequestID"]
request.session["samlUserdata"] = auth.get_attributes()
request.session["samlNameId"] = auth.get_nameid()
request.session["samlNameIdFormat"] = auth.get_nameid_format()
request.session["samlNameIdNameQualifier"] = auth.get_nameid_nq()
request.session["samlNameIdSPNameQualifier"] = auth.get_nameid_spnq()
request.session["samlSessionIndex"] = auth.get_session_index()
if "RelayState" in req["post_data"] and OneLogin_Saml2_Utils.get_self_url(req) != req["post_data"]["RelayState"]:
# To avoid 'Open Redirect' attacks, before execute the redirection confirm
# the value of the req['post_data']['RelayState'] is a trusted URL.
return HttpResponseRedirect(auth.redirect_to(req['post_data']['RelayState']))
return HttpResponseRedirect(auth.redirect_to(req["post_data"]["RelayState"]))
elif auth.get_settings().is_debug_active():
error_reason = auth.get_last_error_reason()
elif 'sls' in req['get_data']:
elif "sls" in req["get_data"]:
request_id = None
if 'LogoutRequestID' in request.session:
request_id = request.session['LogoutRequestID']
if "LogoutRequestID" in request.session:
request_id = request.session["LogoutRequestID"]
dscb = lambda: request.session.flush()
url = auth.process_slo(request_id=request_id, delete_session_cb=dscb)
errors = auth.get_errors()
Expand All @@ -106,26 +105,25 @@ def index(request):
elif auth.get_settings().is_debug_active():
error_reason = auth.get_last_error_reason()

if 'samlUserdata' in request.session:
if "samlUserdata" in request.session:
paint_logout = True
if len(request.session['samlUserdata']) > 0:
attributes = request.session['samlUserdata'].items()
if len(request.session["samlUserdata"]) > 0:
attributes = request.session["samlUserdata"].items()

return render(request, 'index.html', {'errors': errors, 'error_reason': error_reason, 'not_auth_warn': not_auth_warn, 'success_slo': success_slo,
'attributes': attributes, 'paint_logout': paint_logout})
return render(
request, "index.html", {"errors": errors, "error_reason": error_reason, "not_auth_warn": not_auth_warn, "success_slo": success_slo, "attributes": attributes, "paint_logout": paint_logout}
)


def attrs(request):
paint_logout = False
attributes = False

if 'samlUserdata' in request.session:
if "samlUserdata" in request.session:
paint_logout = True
if len(request.session['samlUserdata']) > 0:
attributes = request.session['samlUserdata'].items()
return render(request, 'attrs.html',
{'paint_logout': paint_logout,
'attributes': attributes})
if len(request.session["samlUserdata"]) > 0:
attributes = request.session["samlUserdata"].items()
return render(request, "attrs.html", {"paint_logout": paint_logout, "attributes": attributes})


def metadata(request):
Expand All @@ -137,7 +135,7 @@ def metadata(request):
errors = saml_settings.validate_metadata(metadata)

if len(errors) == 0:
resp = HttpResponse(content=metadata, content_type='text/xml')
resp = HttpResponse(content=metadata, content_type="text/xml")
else:
resp = HttpResponseServerError(content=', '.join(errors))
resp = HttpResponseServerError(content=", ".join(errors))
return resp
2 changes: 2 additions & 0 deletions demo-django/demo/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
"""

import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "demo.settings")

from django.core.wsgi import get_wsgi_application # noqa: E402

application = get_wsgi_application()
116 changes: 53 additions & 63 deletions demo-flask/index.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,35 @@
import os

from flask import (Flask, request, render_template, redirect, session,
make_response)
from flask import Flask, request, render_template, redirect, session, make_response

from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.utils import OneLogin_Saml2_Utils


app = Flask(__name__)
app.config['SECRET_KEY'] = 'onelogindemopytoolkit'
app.config['SAML_PATH'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'saml')
app.config["SECRET_KEY"] = "onelogindemopytoolkit"
app.config["SAML_PATH"] = os.path.join(os.path.dirname(os.path.abspath(__file__)), "saml")


def init_saml_auth(req):
auth = OneLogin_Saml2_Auth(req, custom_base_path=app.config['SAML_PATH'])
auth = OneLogin_Saml2_Auth(req, custom_base_path=app.config["SAML_PATH"])
return auth


def prepare_flask_request(request):
# If server is behind proxys or balancers use the HTTP_X_FORWARDED fields
return {
'https': 'on' if request.scheme == 'https' else 'off',
'http_host': request.host,
'script_name': request.path,
'get_data': request.args.copy(),
"https": "on" if request.scheme == "https" else "off",
"http_host": request.host,
"script_name": request.path,
"get_data": request.args.copy(),
# Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144
# 'lowercase_urlencoding': True,
'post_data': request.form.copy()
"post_data": request.form.copy(),
}


@app.route('/', methods=['GET', 'POST'])
@app.route("/", methods=["GET", "POST"])
def index():
req = prepare_flask_request(request)
auth = init_saml_auth(req)
Expand All @@ -41,57 +40,57 @@ def index():
attributes = False
paint_logout = False

if 'sso' in request.args:
if "sso" in request.args:
return redirect(auth.login())
# If AuthNRequest ID need to be stored in order to later validate it, do instead
# sso_built_url = auth.login()
# request.session['AuthNRequestID'] = auth.get_last_request_id()
# return redirect(sso_built_url)
elif 'sso2' in request.args:
return_to = '%sattrs/' % request.host_url
elif "sso2" in request.args:
return_to = "%sattrs/" % request.host_url
return redirect(auth.login(return_to))
elif 'slo' in request.args:
elif "slo" in request.args:
name_id = session_index = name_id_format = name_id_nq = name_id_spnq = None
if 'samlNameId' in session:
name_id = session['samlNameId']
if 'samlSessionIndex' in session:
session_index = session['samlSessionIndex']
if 'samlNameIdFormat' in session:
name_id_format = session['samlNameIdFormat']
if 'samlNameIdNameQualifier' in session:
name_id_nq = session['samlNameIdNameQualifier']
if 'samlNameIdSPNameQualifier' in session:
name_id_spnq = session['samlNameIdSPNameQualifier']
if "samlNameId" in session:
name_id = session["samlNameId"]
if "samlSessionIndex" in session:
session_index = session["samlSessionIndex"]
if "samlNameIdFormat" in session:
name_id_format = session["samlNameIdFormat"]
if "samlNameIdNameQualifier" in session:
name_id_nq = session["samlNameIdNameQualifier"]
if "samlNameIdSPNameQualifier" in session:
name_id_spnq = session["samlNameIdSPNameQualifier"]

return redirect(auth.logout(name_id=name_id, session_index=session_index, nq=name_id_nq, name_id_format=name_id_format, spnq=name_id_spnq))
elif 'acs' in request.args:
elif "acs" in request.args:
request_id = None
if 'AuthNRequestID' in session:
request_id = session['AuthNRequestID']
if "AuthNRequestID" in session:
request_id = session["AuthNRequestID"]

auth.process_response(request_id=request_id)
errors = auth.get_errors()
not_auth_warn = not auth.is_authenticated()
if len(errors) == 0:
if 'AuthNRequestID' in session:
del session['AuthNRequestID']
session['samlUserdata'] = auth.get_attributes()
session['samlNameId'] = auth.get_nameid()
session['samlNameIdFormat'] = auth.get_nameid_format()
session['samlNameIdNameQualifier'] = auth.get_nameid_nq()
session['samlNameIdSPNameQualifier'] = auth.get_nameid_spnq()
session['samlSessionIndex'] = auth.get_session_index()
if "AuthNRequestID" in session:
del session["AuthNRequestID"]
session["samlUserdata"] = auth.get_attributes()
session["samlNameId"] = auth.get_nameid()
session["samlNameIdFormat"] = auth.get_nameid_format()
session["samlNameIdNameQualifier"] = auth.get_nameid_nq()
session["samlNameIdSPNameQualifier"] = auth.get_nameid_spnq()
session["samlSessionIndex"] = auth.get_session_index()
self_url = OneLogin_Saml2_Utils.get_self_url(req)
if 'RelayState' in request.form and self_url != request.form['RelayState']:
if "RelayState" in request.form and self_url != request.form["RelayState"]:
# To avoid 'Open Redirect' attacks, before execute the redirection confirm
# the value of the request.form['RelayState'] is a trusted URL.
return redirect(auth.redirect_to(request.form['RelayState']))
return redirect(auth.redirect_to(request.form["RelayState"]))
elif auth.get_settings().is_debug_active():
error_reason = auth.get_last_error_reason()
elif 'sls' in request.args:
elif "sls" in request.args:
request_id = None
if 'LogoutRequestID' in session:
request_id = session['LogoutRequestID']
if "LogoutRequestID" in session:
request_id = session["LogoutRequestID"]
dscb = lambda: session.clear()
url = auth.process_slo(request_id=request_id, delete_session_cb=dscb)
errors = auth.get_errors()
Expand All @@ -105,37 +104,28 @@ def index():
elif auth.get_settings().is_debug_active():
error_reason = auth.get_last_error_reason()

if 'samlUserdata' in session:
if "samlUserdata" in session:
paint_logout = True
if len(session['samlUserdata']) > 0:
attributes = session['samlUserdata'].items()
if len(session["samlUserdata"]) > 0:
attributes = session["samlUserdata"].items()

return render_template(
'index.html',
errors=errors,
error_reason=error_reason,
not_auth_warn=not_auth_warn,
success_slo=success_slo,
attributes=attributes,
paint_logout=paint_logout
)
return render_template("index.html", errors=errors, error_reason=error_reason, not_auth_warn=not_auth_warn, success_slo=success_slo, attributes=attributes, paint_logout=paint_logout)


@app.route('/attrs/')
@app.route("/attrs/")
def attrs():
paint_logout = False
attributes = False

if 'samlUserdata' in session:
if "samlUserdata" in session:
paint_logout = True
if len(session['samlUserdata']) > 0:
attributes = session['samlUserdata'].items()
if len(session["samlUserdata"]) > 0:
attributes = session["samlUserdata"].items()

return render_template('attrs.html', paint_logout=paint_logout,
attributes=attributes)
return render_template("attrs.html", paint_logout=paint_logout, attributes=attributes)


@app.route('/metadata/')
@app.route("/metadata/")
def metadata():
req = prepare_flask_request(request)
auth = init_saml_auth(req)
Expand All @@ -145,11 +135,11 @@ def metadata():

if len(errors) == 0:
resp = make_response(metadata, 200)
resp.headers['Content-Type'] = 'text/xml'
resp.headers["Content-Type"] = "text/xml"
else:
resp = make_response(', '.join(errors), 500)
resp = make_response(", ".join(errors), 500)
return resp


if __name__ == "__main__":
app.run(host='0.0.0.0', port=8000, debug=True)
app.run(host="0.0.0.0", port=8000, debug=True)
4 changes: 2 additions & 2 deletions demo-tornado/Settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

BASE_DIR = os.path.dirname(__file__)

SAML_PATH = os.path.join(BASE_DIR, 'saml')
TEMPLATE_PATH = os.path.join(BASE_DIR, 'templates')
SAML_PATH = os.path.join(BASE_DIR, "saml")
TEMPLATE_PATH = os.path.join(BASE_DIR, "templates")
Loading

0 comments on commit 260b78d

Please sign in to comment.