-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
33 changed files
with
999 additions
and
613 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,3 @@ | ||
DEFAULT_REALMS = [ | ||
'master' | ||
] | ||
|
||
DEFAULT_CLIENTS = [ | ||
'account', | ||
'admin-cli', | ||
|
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
|
||
|
||
class VoidFlag: | ||
|
||
def __init__(self, **kwargs): | ||
self.has_vuln = False | ||
super().__init__(**kwargs) | ||
|
||
def set_vuln(self): | ||
assert not hasattr(super(), 'set_vuln') | ||
|
||
|
||
class VulnFlag(VoidFlag): | ||
|
||
def __init__(self, has_vuln=False, **kwargs): | ||
self.has_vuln = has_vuln | ||
super().__init__(**kwargs) | ||
|
||
def set_vuln(self): | ||
self.has_vuln = True | ||
super().set_vuln() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,90 @@ | ||
from typing import List | ||
import re | ||
from typing import List, Dict, Any, Sized | ||
|
||
from keycloak_scanner.logging.printlogger import PrintLogger | ||
from keycloak_scanner.logging.vuln_flag import VulnFlag | ||
from keycloak_scanner.scanners.scanner import Scanner | ||
|
||
|
||
class MasterScanner: | ||
def to_camel_case(text: str): | ||
return re.sub('([a-z]+)([A-Z])', r'\1_\2', text).lower() | ||
|
||
def __init__(self, scans: List[Scanner]): | ||
|
||
class DuplicateResultException(Exception): | ||
pass | ||
|
||
|
||
class ScanResults(PrintLogger): | ||
|
||
def __init__(self, previous_deps: Dict[str, Any], **kwargs): | ||
if previous_deps is None: | ||
previous_deps = {} | ||
self.results: Dict[str, Any] = previous_deps | ||
super().__init__(**kwargs) | ||
|
||
def add(self, result: Any): | ||
key = to_camel_case(result.__class__.__name__) | ||
if key in self.results: | ||
raise DuplicateResultException(result) | ||
super().verbose(f'new result with key: {key} ({result})') | ||
self.results[key] = result | ||
|
||
def __repr__(self): | ||
return repr(self.results) | ||
|
||
|
||
class NoneResultException(Exception): | ||
pass | ||
|
||
|
||
class ScanStatus: | ||
|
||
def __init__(self, has_error=False, has_vulns=False): | ||
self.has_error = has_error | ||
self.has_vulns = has_vulns | ||
|
||
|
||
class MasterScanner(PrintLogger): | ||
|
||
def __init__(self, scans: List[Scanner], previous_deps: Dict[str, Any] = None, verbose=False, **kwargs): | ||
if previous_deps is None: | ||
previous_deps = {} | ||
self.scans = scans | ||
self.scan_properties = {} | ||
self.results = ScanResults(previous_deps, verbose=verbose) | ||
super().__init__(verbose=verbose, **kwargs) | ||
|
||
def start(self) -> ScanStatus: | ||
|
||
has_errors = False | ||
vf = VulnFlag() | ||
|
||
for scanner in self.scans: | ||
|
||
super().info(f'Start scanner {scanner.name()}...') | ||
|
||
try: | ||
|
||
result, has_vuln = scanner.perform(**self.results.results) | ||
|
||
if has_vuln.has_vuln: | ||
vf.set_vuln() | ||
|
||
if result is None: | ||
super().warn(f'None result for scanner {scanner.name()}') | ||
raise NoneResultException() | ||
|
||
if isinstance(result, Sized) and len(result) == 0: | ||
super().warn(f'Result of {scanner.name()} as no results (void list), subsequent scans can be void too.') | ||
|
||
self.results.add(result) | ||
|
||
except TypeError as e: | ||
print(f'Missing dependency for {scanner.__class__.__name__}: ({str(e)}). ' | ||
f'A required previous scanner as fail.') | ||
has_errors = True | ||
|
||
except Exception as e: | ||
print(f'Failed scan : {scanner.__class__.__name__}: ({str(e)}). ') | ||
has_errors = True | ||
|
||
def start(self): | ||
for scan in self.scans: | ||
scan.perform(scan_properties=self.scan_properties) | ||
return ScanStatus(has_errors, vf.has_vuln) |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,71 @@ | ||
from typing import List | ||
|
||
from keycloak_scanner.properties import add_list | ||
from keycloak_scanner.logging.vuln_flag import VulnFlag | ||
from keycloak_scanner.scanners.realm_scanner import Realms | ||
from keycloak_scanner.scanners.scanner import Scanner | ||
from keycloak_scanner.scanners.scanner_pieces import Need2 | ||
from keycloak_scanner.scanners.well_known_scanner import WellKnownDict | ||
|
||
URL_PATTERN = '{}/auth/realms/{}/{}' | ||
|
||
|
||
class ClientScanner(Scanner): | ||
class Client: | ||
|
||
def __init__(self, name: str, url: str, auth_endpoint: str = None): | ||
self.name = name | ||
self.url = url | ||
self.auth_endpoint = auth_endpoint | ||
|
||
def __repr__(self): | ||
return f"Client('{self.name}', '{self.url}', '{self.auth_endpoint}')" | ||
|
||
def __eq__(self, other): | ||
if isinstance(other, Client): | ||
return self.name == other.name and self.url == other.url and self.auth_endpoint == other.auth_endpoint | ||
return NotImplemented | ||
|
||
|
||
class Clients(List[Client]): | ||
pass | ||
|
||
|
||
class ClientScanner(Need2[Realms, WellKnownDict], Scanner[Clients]): | ||
|
||
def __init__(self, clients: List[str], **kwargs): | ||
self.clients = clients | ||
super().__init__(**kwargs) | ||
|
||
def perform(self, scan_properties): | ||
|
||
realms = scan_properties['realms'].keys() | ||
def perform(self, realms: Realms, well_known_dict: WellKnownDict, **kwargs) -> (Clients, VulnFlag): | ||
|
||
scan_properties['clients'] = {} | ||
result: Clients = Clients() | ||
|
||
for realm in realms: | ||
for client in self.clients: | ||
url = URL_PATTERN.format(super().base_url(), realm, client) | ||
r = super().session().get(url) | ||
for client_name in self.clients: | ||
url = URL_PATTERN.format(super().base_url(), realm.name, client_name) | ||
|
||
if r.status_code != 200: | ||
url = scan_properties['wellknowns'][realm]['authorization_endpoint'] | ||
r = super().session().get(url, params={'client_id': client}, allow_redirects=False) | ||
try: | ||
r = super().session().get(url) | ||
r.raise_for_status() | ||
|
||
except Exception as e: | ||
super().info('f [ClientScanner]: {e}') | ||
url = None | ||
|
||
try: | ||
|
||
auth_url = well_known_dict[realm.name].json['authorization_endpoint'] | ||
|
||
r = super().session().get(auth_url, params={'client_id': client_name}, allow_redirects=False) | ||
if r.status_code == 302: | ||
super().info('Find a client for realm {}: {}'.format(realm, client)) | ||
add_list(scan_properties['clients'], realm, client) | ||
super().info('Find a client for realm {}: {}'.format(realm.name, client_name)) | ||
result.append(Client(name=client_name, url=url, auth_endpoint=auth_url)) | ||
else: | ||
super().verbose('client {} seems to not exists'.format(client)) | ||
else: | ||
super().info('Find a client for realm {}: {} ({})'.format(realm, client, url)) | ||
add_list(scan_properties['clients'], realm, client) | ||
super().verbose('client {} seems to not exists'.format(client_name)) | ||
|
||
except KeyError as e: | ||
print(f'realm {realm.name}\'s wellknown doesn\t existsor do not have "authorization_endpoint". ({well_known_dict})') | ||
auth_url = None | ||
|
||
result.append(Client(name=client_name, auth_endpoint=auth_url, url=url)) | ||
|
||
return result, VulnFlag(False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.