Skip to content

Commit

Permalink
feat: 🔨 Add indemnification level from external DB (#3311)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeroen Dekkers <[email protected]>
Co-authored-by: originalsouth <[email protected]>
Co-authored-by: Jan Klopper <[email protected]>
  • Loading branch information
4 people authored Aug 19, 2024
1 parent 9e359e9 commit 6c4acac
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions boefjes/boefjes/plugins/kat_external_db/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
IP_ADDRESS_ITEM_PATH = ["address"]
DOMAIN_LIST_PATH = ["domains"]
DOMAIN_ITEM_PATH = ["name"]
INDEMNIFICATION_ITEM_PATH = ["indemnification_level"]
DEFAULT_INDEMNIFICATION_LEVEL = 3


def follow_path_in_dict(path, path_dict):
Expand All @@ -29,6 +31,18 @@ def follow_path_in_dict(path, path_dict):
return path_dict


def get_indemnification_level(path_dict):
"""Return indemnification level from metadata or default."""
try:
indemnification_level = int(follow_path_in_dict(path=INDEMNIFICATION_ITEM_PATH, path_dict=path_dict))
if 0 <= indemnification_level < 5:
return indemnification_level
raise ValueError(f"Invalid indemnificationlevel {indemnification_level}, aborting.")
except KeyError:
logging.info("No integer indemnification level found, using default.")
return DEFAULT_INDEMNIFICATION_LEVEL


def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]:
"""Yields hostnames, IPv4/6 addresses or netblocks."""
results = json.loads(raw)
Expand All @@ -37,6 +51,7 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]:

for address_item in follow_path_in_dict(path=IP_ADDRESS_LIST_PATH, path_dict=results):
interface = ip_interface(follow_path_in_dict(path=IP_ADDRESS_ITEM_PATH, path_dict=address_item))
indemnification_level = get_indemnification_level(path_dict=address_item)
address, mask_str = interface.with_prefixlen.split("/")
mask = int(mask_str)

Expand All @@ -50,7 +65,7 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]:

ip_address = address_type(address=address, network=network.reference)
yield ip_address
yield DeclaredScanProfile(reference=ip_address.reference, level=3)
yield DeclaredScanProfile(reference=ip_address.reference, level=indemnification_level)
addresses_count += 1

if mask < interface.ip.max_prefixlen:
Expand All @@ -60,15 +75,17 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]:
network=network.reference,
)
yield block
yield DeclaredScanProfile(reference=block.reference, level=3)
yield DeclaredScanProfile(reference=block.reference, level=indemnification_level)
blocks_count += 1

for hostname in follow_path_in_dict(path=DOMAIN_LIST_PATH, path_dict=results):
for hostname_data in follow_path_in_dict(path=DOMAIN_LIST_PATH, path_dict=results):
hostname = Hostname(
name=follow_path_in_dict(path=DOMAIN_ITEM_PATH, path_dict=hostname), network=network.reference
name=follow_path_in_dict(path=DOMAIN_ITEM_PATH, path_dict=hostname_data), network=network.reference
)
yield hostname
yield DeclaredScanProfile(reference=hostname.reference, level=3)
yield DeclaredScanProfile(
reference=hostname.reference, level=get_indemnification_level(path_dict=hostname_data)
)
hostnames_count += 1

logging.info(
Expand Down

0 comments on commit 6c4acac

Please sign in to comment.