Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable webauthn plugin for security keys #1528

Merged
merged 10 commits into from
Jun 6, 2024
71 changes: 71 additions & 0 deletions google/oauth2/challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,22 @@
from google.auth import _helpers
from google.auth import exceptions

from google.oauth2.webauthn_types import (
PublicKeyCredentialDescriptor,
AuthenticationExtensionsClientInputs,
GetRequest,
GetResponse
)


from google.oauth2 import webauthn_handler, webauthn_handler_factory


REAUTH_ORIGIN = "https://accounts.google.com"
SAML_CHALLENGE_MESSAGE = (
"Please run `gcloud auth login` to complete reauthentication with SAML."
)
WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout


def get_user_password(text):
Expand Down Expand Up @@ -110,6 +121,17 @@ def is_locally_eligible(self):

@_helpers.copy_docstring(ReauthChallenge)
def obtain_challenge_input(self, metadata):
# Check if there is an available Webauthn Handler, if not use pyu2f
try:
factory = webauthn_handler_factory.WebauthnHandlerFactory()
webauthn_handler = factory.get_handler()
if webauthn_handler is not None:
return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
except Exception as e:
# Attempt pyu2f if exception in webauthn flow
# traceback.print_exc()
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
pass

try:
import pyu2f.convenience.authenticator # type: ignore
import pyu2f.errors # type: ignore
Expand Down Expand Up @@ -173,6 +195,55 @@ def obtain_challenge_input(self, metadata):
sys.stderr.write("No security key found.\n")
return None

def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
sk = metadata["securityKey"]
challenges = sk["challenges"]
application_id = sk["applicationId"]
relying_party_id = sk["relyingPartyId"]

allow_credentials = []
for challenge in challenges:
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
key_handle = self._urlsafe_b64recode(challenge['keyHandle'])
# TODO: do we need to set transports
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
allow_credentials.append(
PublicKeyCredentialDescriptor(id = key_handle))

extension = AuthenticationExtensionsClientInputs(
appid = application_id)

get_request = GetRequest(
origin = REAUTH_ORIGIN,
rpid = relying_party_id,
challenge = self._urlsafe_b64recode(challenges[0]['challenge']),
timeout_ms = WEBAUTHN_TIMEOUT_MS,
allow_credentials = allow_credentials,
user_verification = 'required',
extensions = extension
)

try:
sys.stderr.write('Please insert and touch your security key\n')
get_response = webauthn_handler.get(get_request)
except Exception as e:
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
sys.stderr.write("Webauthn Error: {}.\n".format(e))
raise e

response = {
'clientData': get_response.response.client_data_json,
'authenticatorData': get_response.response.authenticator_data,
'signatureData': get_response.response.signature,
'applicationId': application_id,
'keyHandle': get_response.id,
'securityKeyReplyType': 2
}
return {"securityKey": response}

def _urlsafe_b64recode(self, s):
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
"""Converts standard b64 encoded string to url safe b64 encoded string
with no padding."""
b = base64.urlsafe_b64decode(s)
return base64.urlsafe_b64encode(b).decode().rstrip('=')


class SamlChallenge(ReauthChallenge):
"""Challenge that asks the users to browse to their ID Providers.
Expand Down
17 changes: 17 additions & 0 deletions google/oauth2/webauthn_handler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import sys

from typing import List

from google.oauth2.webauthn_handler import WebAuthnHandler, PluginHandler

class WebauthnHandlerFactory:
handlers: List[WebAuthnHandler]

def __init__(self):
self.handlers = [PluginHandler()]

def get_handler(self) -> WebAuthnHandler:
for handler in self.handlers:
if handler.is_available():
return handler
return None
111 changes: 111 additions & 0 deletions tests/oauth2/test_challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Tests for the reauth module."""

import base64
import os
import sys

import mock
Expand All @@ -23,6 +24,13 @@

from google.auth import exceptions
from google.oauth2 import challenges
from google.oauth2.webauthn_types import (
AuthenticatorAssertionResponse,
GetRequest,
GetResponse,
PublicKeyCredentialDescriptor,
AuthenticationExtensionsClientInputs
)


def test_get_user_password():
Expand Down Expand Up @@ -54,6 +62,8 @@ def test_security_key():

# Test the case that security key challenge is passed with applicationId and
# relyingPartyId the same.
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)

with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
with mock.patch(
"pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
Expand All @@ -70,6 +80,18 @@ def test_security_key():
print_callback=sys.stderr.write,
)

# Test the case that webauthn plugin is available
os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin"

with mock.patch("google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn",
return_value={"securityKey": "security key response"}):

assert challenge.obtain_challenge_input(metadata) == {
"securityKey": "security key response"
}
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)


# Test the case that security key challenge is passed with applicationId and
# relyingPartyId different, first call works.
metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id"
Expand Down Expand Up @@ -173,6 +195,95 @@ def test_security_key():
assert excinfo.match(r"pyu2f dependency is required")


def test_security_key_webauthn():
metadata = {
"status": "READY",
"challengeId": 2,
"challengeType": "SECURITY_KEY",
"securityKey": {
"applicationId": "security_key_application_id",
"challenges": [
{
"keyHandle": "some_key",
"challenge": base64.urlsafe_b64encode(
"some_challenge".encode("ascii")
).decode("ascii"),
}
],
"relyingPartyId": "security_key_application_id",
},
}

challenge = challenges.SecurityKeyChallenge()

sk = metadata["securityKey"]
sk_challenges = sk["challenges"]

application_id = sk["applicationId"]
relying_party_id = sk["relyingPartyId"]

allow_credentials = []
for sk_challenge in sk_challenges:
allow_credentials.append(
PublicKeyCredentialDescriptor(id = sk_challenge['keyHandle']))

extension = AuthenticationExtensionsClientInputs(
appid = application_id)

get_request = GetRequest(
origin = challenges.REAUTH_ORIGIN,
rpid = application_id,
challenge = challenge._urlsafe_b64recode(sk_challenge['challenge']),
timeout_ms = challenges.WEBAUTHN_TIMEOUT_MS,
allow_credentials = allow_credentials,
user_verification = 'required',
extensions = extension
)

assertion_resp = AuthenticatorAssertionResponse(
client_data_json="clientDataJSON",
authenticator_data="authenticatorData",
signature="signature",
user_handle="userHandle",
)
get_response = GetResponse(
id="id",
response=assertion_resp,
authenticator_attachment="authenticatorAttachment",
client_extension_results="clientExtensionResults",
)
response = {
'clientData': get_response.response.client_data_json,
'authenticatorData': get_response.response.authenticator_data,
'signatureData': get_response.response.signature,
'applicationId': "security_key_application_id",
'keyHandle': get_response.id,
'securityKeyReplyType': 2
}

mock_handler = mock.Mock()
mock_handler.get.return_value = get_response

# Test success case
assert challenge._obtain_challenge_input_webauthn(metadata, mock_handler) == {
"securityKey": response
}
mock_handler.get.assert_called_with(get_request)

# Test exceptions
mock_handler.get.side_effect = exceptions.MalformedError
with pytest.raises(exceptions.MalformedError):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)

mock_handler.get.side_effect = exceptions.InvalidResource
with pytest.raises(exceptions.InvalidResource):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)

mock_handler.get.side_effect = exceptions.ReauthFailError
with pytest.raises(exceptions.ReauthFailError):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)


@mock.patch("getpass.getpass", return_value="foo")
def test_password_challenge(getpass_mock):
challenge = challenges.PasswordChallenge()
Expand Down
33 changes: 33 additions & 0 deletions tests/oauth2/test_webauthn_handler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import json
import struct

import mock
import pytest # type: ignore

from google.auth import exceptions
from google.oauth2 import webauthn_handler
from google.oauth2 import webauthn_handler_factory
from google.oauth2 import webauthn_types


@pytest.fixture
def os_get_stub():
with mock.patch.object(
webauthn_handler.os.environ,
"get",
return_value="gcloud_webauthn_plugin",
name="fake os.environ.get",
) as mock_os_environ_get:
yield mock_os_environ_get

# Check that get_handler returns a value when env is set,
# that type is PluginHandler, and that no value is returned
# if env not set.
def test_WebauthHandlerFactory_get(os_get_stub):
factory = webauthn_handler_factory.WebauthnHandlerFactory()
assert factory.get_handler() is not None

assert isinstance(factory.get_handler(), webauthn_handler.PluginHandler)

os_get_stub.return_value = None
assert factory.get_handler() is None