Skip to content

Commit

Permalink
chore: refactor provenance level 3 check
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Selwyn-Smith <[email protected]>
  • Loading branch information
benmss committed Aug 15, 2024
1 parent f4fd86f commit c0e88a4
Show file tree
Hide file tree
Showing 15 changed files with 483 additions and 616 deletions.
4 changes: 4 additions & 0 deletions src/macaron/provenance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This package contains the provenance tools for software components."""
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@
import urllib.parse

from packageurl import PackageURL
from pydriller import Git

from macaron.errors import ProvenanceError
from macaron.json_tools import JsonType, json_extract
from macaron.repo_finder.commit_finder import (
AbstractPurlType,
determine_abstract_purl_type,
extract_commit_from_version,
)
from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type
from macaron.repo_finder.repo_finder import to_domain_from_known_purl_types
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload

Expand Down Expand Up @@ -275,27 +270,18 @@ def check_if_input_repo_provenance_conflict(


def check_if_input_purl_provenance_conflict(
git_obj: Git,
repo_path_input: bool,
digest_input: bool,
provenance_repo_url: str | None,
provenance_commit_digest: str | None,
purl: PackageURL,
) -> bool:
"""Test if the input repository type PURL's repo and commit match the contents of the provenance.
Parameters
----------
git_obj: Git
The Git object.
repo_path_input: bool
True if there is a repo as input.
digest_input: str
True if there is a commit as input.
provenance_repo_url: str | None
The repo url from provenance.
provenance_commit_digest: str | None
The commit digest from provenance.
purl: PackageURL
The input repository PURL.
Expand All @@ -318,18 +304,6 @@ def check_if_input_purl_provenance_conflict(
)
return True

# Check the PURL commit against the provenance.
if not digest_input and provenance_commit_digest and purl.version:
purl_commit = extract_commit_from_version(git_obj, purl.version)
if purl_commit and purl_commit != provenance_commit_digest:
logger.debug(
"The commit digest passed via purl input does not match what exists in the "
"provenance. Purl Commit: %s, Provenance Commit: %s.",
purl_commit,
provenance_commit_digest,
)
return True

return False


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,38 +107,6 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
logger.debug("No provenance found.")
return []

def verify_provenance(self, purl: PackageURL, provenance: list[InTotoPayload]) -> bool:
"""Verify the passed provenance.
Parameters
----------
purl: PackageURL
The PURL of the analysis target.
provenance: list[InTotoPayload]
The list of provenance.
Returns
-------
bool
True if the provenance could be verified, or False otherwise.
"""
if determine_abstract_purl_type(purl) == AbstractPurlType.REPOSITORY:
# Do not perform default verification for repository type targets.
return False

verification_function = None

if purl.type == "npm":
verification_function = partial(verify_npm_provenance, purl, provenance)

# TODO other verification functions go here.

if verification_function:
return verification_function()

logger.debug("Provenance verification not supported for PURL type: %s", purl.type)
return False


def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoPayload]:
"""Find and download the NPM based provenance for the passed PURL.
Expand Down Expand Up @@ -213,72 +181,6 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP
return []


def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool:
"""Compare the unsigned payload subject digest with the signed payload digest, if available.
Parameters
----------
purl: PackageURL
The PURL of the analysis target.
provenance: list[InTotoPayload]
The provenances to verify.
Returns
-------
bool
True if the provenance was verified, or False otherwise.
"""
if len(provenance) != 2:
logger.debug("Expected unsigned and signed provenance.")
return False

signed_subjects = provenance[1].statement.get("subject")
if not signed_subjects:
return False

unsigned_subjects = provenance[0].statement.get("subject")
if not unsigned_subjects:
return False

found_signed_subject = None
for signed_subject in signed_subjects:
name = signed_subject.get("name")
if name and name == str(purl):
found_signed_subject = signed_subject
break

if not found_signed_subject:
return False

found_unsigned_subject = None
for unsigned_subject in unsigned_subjects:
name = unsigned_subject.get("name")
if name and name == str(purl):
found_unsigned_subject = unsigned_subject
break

if not found_unsigned_subject:
return False

signed_digest = found_signed_subject.get("digest")
unsigned_digest = found_unsigned_subject.get("digest")
if not (signed_digest and unsigned_digest):
return False

# For signed and unsigned to match, the digests must be identical.
if signed_digest != unsigned_digest:
return False

key = list(signed_digest.keys())[0]
logger.debug(
"Verified provenance against signed companion. Signed: %s, Unsigned: %s.",
signed_digest[key][:7],
unsigned_digest[key][:7],
)

return True


def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[InTotoPayload]:
"""Find and download the GAV based provenance for the passed PURL.
Expand Down Expand Up @@ -373,7 +275,9 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
return provenances[:1]


def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> InTotoPayload | None:
def find_provenance_from_ci(
analyze_ctx: AnalyzeContext, git_obj: Git | None, download_path: str
) -> InTotoPayload | None:
"""Try to find provenance from CI services of the repository.
Note that we stop going through the CI services once we encounter a CI service
Expand All @@ -385,9 +289,11 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) ->
Parameters
----------
analyze_ctx: AnalyzeContext
The contenxt of the ongoing analysis.
The context of the ongoing analysis.
git_obj: Git | None
The Pydriller Git object representing the repository, if any.
download_path: str
The pre-existing location to download discovered files to.
Returns
-------
Expand Down Expand Up @@ -463,9 +369,7 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) ->
ci_info["provenance_assets"].extend(provenance_assets)

# Download the provenance assets and load the provenance payloads.
download_provenances_from_github_actions_ci_service(
ci_info,
)
download_provenances_from_ci_service(ci_info, download_path)

# TODO consider how to handle multiple payloads here.
return ci_info["provenances"][0].payload if ci_info["provenances"] else None
Expand All @@ -476,56 +380,60 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) ->
return None


def download_provenances_from_github_actions_ci_service(ci_info: CIInfo) -> None:
def download_provenances_from_ci_service(ci_info: CIInfo, download_path: str) -> None:
"""Download provenances from GitHub Actions.
Parameters
----------
ci_info: CIInfo,
A ``CIInfo`` instance that holds a GitHub Actions git service object.
download_path: str
The pre-existing location to download discovered files to.
"""
ci_service = ci_info["service"]
prov_assets = ci_info["provenance_assets"]

if not os.path.isdir(download_path):
logger.debug("Download location is not a valid directory.")
return
try:
with tempfile.TemporaryDirectory() as temp_path:
downloaded_provs = []
for prov_asset in prov_assets:
# Check the size before downloading.
if prov_asset.size_in_bytes > defaults.getint(
"slsa.verifier",
"max_download_size",
fallback=1000000,
):
logger.info(
"Skip verifying the provenance %s: asset size too large.",
prov_asset.name,
)
continue
downloaded_provs = []
for prov_asset in prov_assets:
# Check the size before downloading.
if prov_asset.size_in_bytes > defaults.getint(
"slsa.verifier",
"max_download_size",
fallback=1000000,
):
logger.info(
"Skip verifying the provenance %s: asset size too large.",
prov_asset.name,
)
continue

provenance_filepath = os.path.join(temp_path, prov_asset.name)
provenance_filepath = os.path.join(download_path, prov_asset.name)

if not ci_service.api_client.download_asset(
prov_asset.url,
provenance_filepath,
):
logger.debug(
"Could not download the provenance %s. Skip verifying...",
prov_asset.name,
)
continue
if not ci_service.api_client.download_asset(
prov_asset.url,
provenance_filepath,
):
logger.debug(
"Could not download the provenance %s. Skip verifying...",
prov_asset.name,
)
continue

# Read the provenance.
try:
payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as error:
logger.error("Error logging provenance: %s", error)
continue
# Read the provenance.
try:
payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as error:
logger.error("Error logging provenance: %s", error)
continue

# Add the provenance file.
downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset))
# Add the provenance file.
downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset))

# Persist the provenance payloads into the CIInfo object.
ci_info["provenances"] = downloaded_provs

except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)
Loading

0 comments on commit c0e88a4

Please sign in to comment.