From c0e88a46d6c89fc4b18971a17de588b1cda20f93 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 15 Aug 2024 13:00:46 +1000 Subject: [PATCH] chore: refactor provenance level 3 check Signed-off-by: Ben Selwyn-Smith --- src/macaron/provenance/__init__.py | 4 + .../provenance_extractor.py | 28 +- .../provenance_finder.py | 182 ++------ src/macaron/provenance/provenance_verifier.py | 341 +++++++++++++++ src/macaron/repo_finder/__init__.py | 4 +- src/macaron/slsa_analyzer/analyze_context.py | 3 + src/macaron/slsa_analyzer/analyzer.py | 83 ++-- .../checks/provenance_l3_check.py | 390 +----------------- .../checks/provenance_verified_check.py | 34 +- .../cases/urllib3_expectation_dir/policy.dl | 3 + .../cases/urllib3_expectation_file/policy.dl | 3 + .../urllib3_invalid_expectation/policy.dl | 3 + tests/provenance/__init__.py | 2 + .../test_provenance_extractor.py | 2 +- .../test_provenance_finder.py | 17 +- 15 files changed, 483 insertions(+), 616 deletions(-) create mode 100644 src/macaron/provenance/__init__.py rename src/macaron/{repo_finder => provenance}/provenance_extractor.py (92%) rename src/macaron/{repo_finder => provenance}/provenance_finder.py (75%) create mode 100644 src/macaron/provenance/provenance_verifier.py create mode 100644 tests/provenance/__init__.py rename tests/{repo_finder => provenance}/test_provenance_extractor.py (99%) rename tests/{repo_finder => provenance}/test_provenance_finder.py (92%) diff --git a/src/macaron/provenance/__init__.py b/src/macaron/provenance/__init__.py new file mode 100644 index 000000000..ea53b485d --- /dev/null +++ b/src/macaron/provenance/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This package contains the provenance tools for software components.""" diff --git a/src/macaron/repo_finder/provenance_extractor.py b/src/macaron/provenance/provenance_extractor.py similarity index 92% rename from src/macaron/repo_finder/provenance_extractor.py rename to src/macaron/provenance/provenance_extractor.py index 5c3307c58..1c5c8f28c 100644 --- a/src/macaron/repo_finder/provenance_extractor.py +++ b/src/macaron/provenance/provenance_extractor.py @@ -6,15 +6,10 @@ import urllib.parse from packageurl import PackageURL -from pydriller import Git from macaron.errors import ProvenanceError from macaron.json_tools import JsonType, json_extract -from macaron.repo_finder.commit_finder import ( - AbstractPurlType, - determine_abstract_purl_type, - extract_commit_from_version, -) +from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type from macaron.repo_finder.repo_finder import to_domain_from_known_purl_types from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload @@ -275,27 +270,18 @@ def check_if_input_repo_provenance_conflict( def check_if_input_purl_provenance_conflict( - git_obj: Git, repo_path_input: bool, - digest_input: bool, provenance_repo_url: str | None, - provenance_commit_digest: str | None, purl: PackageURL, ) -> bool: """Test if the input repository type PURL's repo and commit match the contents of the provenance. Parameters ---------- - git_obj: Git - The Git object. repo_path_input: bool True if there is a repo as input. - digest_input: str - True if there is a commit as input. provenance_repo_url: str | None The repo url from provenance. - provenance_commit_digest: str | None - The commit digest from provenance. purl: PackageURL The input repository PURL. @@ -318,18 +304,6 @@ def check_if_input_purl_provenance_conflict( ) return True - # Check the PURL commit against the provenance. - if not digest_input and provenance_commit_digest and purl.version: - purl_commit = extract_commit_from_version(git_obj, purl.version) - if purl_commit and purl_commit != provenance_commit_digest: - logger.debug( - "The commit digest passed via purl input does not match what exists in the " - "provenance. Purl Commit: %s, Provenance Commit: %s.", - purl_commit, - provenance_commit_digest, - ) - return True - return False diff --git a/src/macaron/repo_finder/provenance_finder.py b/src/macaron/provenance/provenance_finder.py similarity index 75% rename from src/macaron/repo_finder/provenance_finder.py rename to src/macaron/provenance/provenance_finder.py index 5f065900e..d76eec298 100644 --- a/src/macaron/repo_finder/provenance_finder.py +++ b/src/macaron/provenance/provenance_finder.py @@ -107,38 +107,6 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload] logger.debug("No provenance found.") return [] - def verify_provenance(self, purl: PackageURL, provenance: list[InTotoPayload]) -> bool: - """Verify the passed provenance. - - Parameters - ---------- - purl: PackageURL - The PURL of the analysis target. - provenance: list[InTotoPayload] - The list of provenance. - - Returns - ------- - bool - True if the provenance could be verified, or False otherwise. - """ - if determine_abstract_purl_type(purl) == AbstractPurlType.REPOSITORY: - # Do not perform default verification for repository type targets. - return False - - verification_function = None - - if purl.type == "npm": - verification_function = partial(verify_npm_provenance, purl, provenance) - - # TODO other verification functions go here. - - if verification_function: - return verification_function() - - logger.debug("Provenance verification not supported for PURL type: %s", purl.type) - return False - def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoPayload]: """Find and download the NPM based provenance for the passed PURL. @@ -213,72 +181,6 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP return [] -def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool: - """Compare the unsigned payload subject digest with the signed payload digest, if available. - - Parameters - ---------- - purl: PackageURL - The PURL of the analysis target. - provenance: list[InTotoPayload] - The provenances to verify. - - Returns - ------- - bool - True if the provenance was verified, or False otherwise. - """ - if len(provenance) != 2: - logger.debug("Expected unsigned and signed provenance.") - return False - - signed_subjects = provenance[1].statement.get("subject") - if not signed_subjects: - return False - - unsigned_subjects = provenance[0].statement.get("subject") - if not unsigned_subjects: - return False - - found_signed_subject = None - for signed_subject in signed_subjects: - name = signed_subject.get("name") - if name and name == str(purl): - found_signed_subject = signed_subject - break - - if not found_signed_subject: - return False - - found_unsigned_subject = None - for unsigned_subject in unsigned_subjects: - name = unsigned_subject.get("name") - if name and name == str(purl): - found_unsigned_subject = unsigned_subject - break - - if not found_unsigned_subject: - return False - - signed_digest = found_signed_subject.get("digest") - unsigned_digest = found_unsigned_subject.get("digest") - if not (signed_digest and unsigned_digest): - return False - - # For signed and unsigned to match, the digests must be identical. - if signed_digest != unsigned_digest: - return False - - key = list(signed_digest.keys())[0] - logger.debug( - "Verified provenance against signed companion. Signed: %s, Unsigned: %s.", - signed_digest[key][:7], - unsigned_digest[key][:7], - ) - - return True - - def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[InTotoPayload]: """Find and download the GAV based provenance for the passed PURL. @@ -373,7 +275,9 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[ return provenances[:1] -def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> InTotoPayload | None: +def find_provenance_from_ci( + analyze_ctx: AnalyzeContext, git_obj: Git | None, download_path: str +) -> InTotoPayload | None: """Try to find provenance from CI services of the repository. Note that we stop going through the CI services once we encounter a CI service @@ -385,9 +289,11 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> Parameters ---------- analyze_ctx: AnalyzeContext - The contenxt of the ongoing analysis. + The context of the ongoing analysis. git_obj: Git | None The Pydriller Git object representing the repository, if any. + download_path: str + The pre-existing location to download discovered files to. Returns ------- @@ -463,9 +369,7 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> ci_info["provenance_assets"].extend(provenance_assets) # Download the provenance assets and load the provenance payloads. - download_provenances_from_github_actions_ci_service( - ci_info, - ) + download_provenances_from_ci_service(ci_info, download_path) # TODO consider how to handle multiple payloads here. return ci_info["provenances"][0].payload if ci_info["provenances"] else None @@ -476,56 +380,60 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> return None -def download_provenances_from_github_actions_ci_service(ci_info: CIInfo) -> None: +def download_provenances_from_ci_service(ci_info: CIInfo, download_path: str) -> None: """Download provenances from GitHub Actions. Parameters ---------- ci_info: CIInfo, A ``CIInfo`` instance that holds a GitHub Actions git service object. + download_path: str + The pre-existing location to download discovered files to. """ ci_service = ci_info["service"] prov_assets = ci_info["provenance_assets"] - + if not os.path.isdir(download_path): + logger.debug("Download location is not a valid directory.") + return try: - with tempfile.TemporaryDirectory() as temp_path: - downloaded_provs = [] - for prov_asset in prov_assets: - # Check the size before downloading. - if prov_asset.size_in_bytes > defaults.getint( - "slsa.verifier", - "max_download_size", - fallback=1000000, - ): - logger.info( - "Skip verifying the provenance %s: asset size too large.", - prov_asset.name, - ) - continue + downloaded_provs = [] + for prov_asset in prov_assets: + # Check the size before downloading. + if prov_asset.size_in_bytes > defaults.getint( + "slsa.verifier", + "max_download_size", + fallback=1000000, + ): + logger.info( + "Skip verifying the provenance %s: asset size too large.", + prov_asset.name, + ) + continue - provenance_filepath = os.path.join(temp_path, prov_asset.name) + provenance_filepath = os.path.join(download_path, prov_asset.name) - if not ci_service.api_client.download_asset( - prov_asset.url, - provenance_filepath, - ): - logger.debug( - "Could not download the provenance %s. Skip verifying...", - prov_asset.name, - ) - continue + if not ci_service.api_client.download_asset( + prov_asset.url, + provenance_filepath, + ): + logger.debug( + "Could not download the provenance %s. Skip verifying...", + prov_asset.name, + ) + continue - # Read the provenance. - try: - payload = load_provenance_payload(provenance_filepath) - except LoadIntotoAttestationError as error: - logger.error("Error logging provenance: %s", error) - continue + # Read the provenance. + try: + payload = load_provenance_payload(provenance_filepath) + except LoadIntotoAttestationError as error: + logger.error("Error logging provenance: %s", error) + continue - # Add the provenance file. - downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset)) + # Add the provenance file. + downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset)) # Persist the provenance payloads into the CIInfo object. ci_info["provenances"] = downloaded_provs + except OSError as error: logger.error("Error while storing provenance in the temporary directory: %s", error) diff --git a/src/macaron/provenance/provenance_verifier.py b/src/macaron/provenance/provenance_verifier.py new file mode 100644 index 000000000..06cd55f4e --- /dev/null +++ b/src/macaron/provenance/provenance_verifier.py @@ -0,0 +1,341 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains methods for verifying provenance files.""" +import glob +import hashlib +import logging +import os +import subprocess # nosec B404 +import tarfile +import zipfile +from functools import partial +from pathlib import Path + +from packageurl import PackageURL + +from macaron.config.defaults import defaults +from macaron.config.global_config import global_config +from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type +from macaron.slsa_analyzer.analyze_context import AnalyzeContext +from macaron.slsa_analyzer.asset import AssetLocator +from macaron.slsa_analyzer.ci_service import BaseCIService +from macaron.slsa_analyzer.git_url import get_repo_dir_name +from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV01Payload, v01 +from macaron.slsa_analyzer.specs.ci_spec import CIInfo + +logger: logging.Logger = logging.getLogger(__name__) + + +def verify_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool: + """Verify the passed provenance. + + Parameters + ---------- + purl: PackageURL + The PURL of the analysis target. + provenance: list[InTotoPayload] + The list of provenance. + + Returns + ------- + bool + True if the provenance could be verified, or False otherwise. + """ + if determine_abstract_purl_type(purl) == AbstractPurlType.REPOSITORY: + # Do not perform default verification for repository type targets. + return False + + verification_function = None + + if purl.type == "npm": + verification_function = partial(verify_npm_provenance, purl, provenance) + + # TODO other verification functions go here. + + if verification_function: + return verification_function() + + logger.debug("Provenance verification not supported for PURL type: %s", purl.type) + return False + + +def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool: + """Compare the unsigned payload subject digest with the signed payload digest, if available. + + Parameters + ---------- + purl: PackageURL + The PURL of the analysis target. + provenance: list[InTotoPayload] + The provenances to verify. + + Returns + ------- + bool + True if the provenance was verified, or False otherwise. + """ + if len(provenance) != 2: + logger.debug("Expected unsigned and signed provenance.") + return False + + signed_subjects = provenance[1].statement.get("subject") + if not signed_subjects: + return False + + unsigned_subjects = provenance[0].statement.get("subject") + if not unsigned_subjects: + return False + + found_signed_subject = None + for signed_subject in signed_subjects: + name = signed_subject.get("name") + if name and name == str(purl): + found_signed_subject = signed_subject + break + + if not found_signed_subject: + return False + + found_unsigned_subject = None + for unsigned_subject in unsigned_subjects: + name = unsigned_subject.get("name") + if name and name == str(purl): + found_unsigned_subject = unsigned_subject + break + + if not found_unsigned_subject: + return False + + signed_digest = found_signed_subject.get("digest") + unsigned_digest = found_unsigned_subject.get("digest") + if not (signed_digest and unsigned_digest): + return False + + # For signed and unsigned to match, the digests must be identical. + if signed_digest != unsigned_digest: + return False + + key = list(signed_digest.keys())[0] + logger.debug( + "Verified provenance against signed companion. Signed: %s, Unsigned: %s.", + signed_digest[key][:7], + unsigned_digest[key][:7], + ) + + return True + + +def verify_ci_provenance(analyze_ctx: AnalyzeContext, ci_info: CIInfo, download_path: str) -> bool: + """Try to verify the CI provenance in terms of SLSA level 3 requirements. + + Involves running the SLSA verifier. + + Parameters + ---------- + analyze_ctx: AnalyzeContext + The context of the analysis. + ci_info: CIInfo + A ``CIInfo`` instance that holds a GitHub Actions git service object. + download_path: str + The location to search for downloaded files. + + Returns + ------- + bool + True if the provenance could be verified. + """ + # TODO: During verification, we need to fetch the workflow and verify that it's not + # using self-hosted runners, custom containers or services, etc. + ci_service = ci_info["service"] + for provenance in ci_info["provenances"]: + if not isinstance(provenance.payload, InTotoV01Payload): + logger.debug("Cannot verify provenance type: %s", type(provenance.payload)) + continue + + all_assets = ci_info["release"]["assets"] + + # Iterate through the subjects and verify. + for subject in provenance.payload.statement["subject"]: + sub_asset = _find_subject_asset(subject, all_assets, download_path, ci_service) + + if not sub_asset: + logger.debug("Sub asset not found for: %s.", provenance.payload.statement["subject"]) + return False + if not Path(download_path, sub_asset["name"]).is_file(): + if "size" in sub_asset and sub_asset["size"] > defaults.getint( + "slsa.verifier", "max_download_size", fallback=1000000 + ): + logger.debug("Sub asset too large to verify: %s", sub_asset["name"]) + return False + if "url" in sub_asset and not ci_service.api_client.download_asset( + sub_asset["url"], os.path.join(download_path, sub_asset["name"]) + ): + logger.debug("Sub asset not found: %s", sub_asset["name"]) + return False + + sub_verified = _verify_slsa( + analyze_ctx.macaron_path, + download_path, + provenance.asset, + sub_asset["name"], + analyze_ctx.component.repository.remote_path, + ) + + if not sub_verified: + return False + + if sub_verified: + logger.info("Successfully verified sub asset: %s", sub_asset["name"]) + + return True + + +def _find_subject_asset( + subject: v01.InTotoV01Subject, + all_assets: list[dict[str, str]], + download_path: str, + ci_service: BaseCIService, +) -> dict | None: + """Find the artifacts that appear in the provenance subject. + + The artifacts can be directly found as a release asset or in an archive file. + """ + sub_asset = next( + (item for item in all_assets if item["name"] == os.path.basename(subject["name"])), + None, + ) + + if sub_asset: + return sub_asset + + extracted_artifact = glob.glob(os.path.join(download_path, "**", os.path.basename(subject["name"])), recursive=True) + for artifact_path in extracted_artifact: + try: + with open(artifact_path, "rb") as file: + if hashlib.sha256(file.read()).hexdigest() == subject["digest"]["sha256"]: + return {"name": str(Path(artifact_path).relative_to(download_path))} + except OSError as error: + logger.error("Error in check: %s", error) + continue + + for item in all_assets: + item_path = os.path.join(download_path, item["name"]) + # Make sure to download an archive just once. + if not Path(item_path).is_file(): + # TODO: check that it's not too large. + if not ci_service.api_client.download_asset(item["url"], item_path): + logger.info("Could not download artifact %s. Skip verifying...", os.path.basename(item_path)) + break + + if _extract_archive(file_path=item_path, temp_path=download_path): + return _find_subject_asset(subject, all_assets, download_path, ci_service) + + return None + + +def _extract_archive(file_path: str, temp_path: str) -> bool: + """Extract the archive file to the temporary path. + + Returns + ------- + bool + Returns True if successful. + """ + + def _validate_path_traversal(path: str) -> bool: + """Check for path traversal attacks.""" + if path.startswith("/") or ".." in path: + logger.debug("Found suspicious path in the archive file: %s.", path) + return False + try: + # Check if there are any symbolic links. + if os.path.realpath(path): + return True + except OSError as error: + logger.debug("Failed to extract artifact from archive file: %s", error) + return False + return False + + try: + if zipfile.is_zipfile(file_path): + with zipfile.ZipFile(file_path, "r") as zip_file: + members = (path for path in zip_file.namelist() if _validate_path_traversal(path)) + zip_file.extractall(temp_path, members=members) # nosec B202:tarfile_unsafe_members + return True + elif tarfile.is_tarfile(file_path): + with tarfile.open(file_path, mode="r:gz") as tar_file: + members_tarinfo = ( + tarinfo for tarinfo in tar_file.getmembers() if _validate_path_traversal(tarinfo.name) + ) + tar_file.extractall(temp_path, members=members_tarinfo) # nosec B202:tarfile_unsafe_members + return True + except (tarfile.TarError, zipfile.BadZipFile, zipfile.LargeZipFile, OSError, ValueError) as error: + logger.info(error) + + return False + + +def _verify_slsa( + macaron_path: str, download_path: str, prov_asset: AssetLocator, asset_name: str, repository_url: str +) -> bool: + """Run SLSA verifier to verify the artifact.""" + source_path = get_repo_dir_name(repository_url, sanitize=False) + if not source_path: + logger.error("Invalid repository source path to verify: %s.", repository_url) + return False + + errors: list[str] = [] + verified = False + cmd = [ + os.path.join(macaron_path, "bin/slsa-verifier"), + "verify-artifact", + os.path.join(download_path, asset_name), + "--provenance-path", + os.path.join(download_path, prov_asset.name), + "--source-uri", + source_path, + ] + + try: + verifier_output = subprocess.run( # nosec B603 + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True, + cwd=download_path, + timeout=defaults.getint("slsa.verifier", "timeout", fallback=120), + ) + + output = verifier_output.stdout.decode("utf-8") + verified = "PASSED: Verified SLSA provenance" in output + + log_path = os.path.join(global_config.build_log_path, f"{os.path.basename(source_path)}.slsa_verifier.log") + with open(log_path, mode="a", encoding="utf-8") as log_file: + logger.info("Storing SLSA verifier output for %s to %s", asset_name, log_path) + log_file.writelines( + [f"SLSA verifier output for cmd: {' '.join(cmd)}\n", output, "--------------------------------\n"] + ) + + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as error: + logger.error(error) + errors.append(error.output.decode("utf-8")) + except OSError as error: + logger.error(error) + errors.append(str(error)) + + if errors: + verified = False + try: + error_log_path = os.path.join( + global_config.build_log_path, f"{os.path.basename(source_path)}.slsa_verifier.errors" + ) + with open(error_log_path, mode="a", encoding="utf-8") as log_file: + logger.info("Storing SLSA verifier log for%s to %s", asset_name, error_log_path) + log_file.write(f"SLSA verifier output for cmd: {' '.join(cmd)}\n") + log_file.writelines(errors) + log_file.write("--------------------------------\n") + except OSError as error: + logger.error(error) + + return verified diff --git a/src/macaron/repo_finder/__init__.py b/src/macaron/repo_finder/__init__.py index c406a64cc..a3558d431 100644 --- a/src/macaron/repo_finder/__init__.py +++ b/src/macaron/repo_finder/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. -"""This package contains the dependency resolvers for Java projects.""" +"""This package contains the repository and commit finding tools for software components.""" diff --git a/src/macaron/slsa_analyzer/analyze_context.py b/src/macaron/slsa_analyzer/analyze_context.py index e54363f98..bf1647478 100644 --- a/src/macaron/slsa_analyzer/analyze_context.py +++ b/src/macaron/slsa_analyzer/analyze_context.py @@ -52,6 +52,8 @@ class ChecksOutputs(TypedDict): """The commit digest extracted from provenance, if applicable.""" provenance_verified: bool """True if the provenance exists and has been verified against a signed companion provenance.""" + provenance_l3_verified: bool + """True if the provenance exists and has been verified to the SLSA level 3 standard.""" class AnalyzeContext: @@ -106,6 +108,7 @@ def __init__( provenance_repo_url=None, provenance_commit_digest=None, provenance_verified=False, + provenance_l3_verified=False, ) @property diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 904d68c14..bfac6232c 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -6,6 +6,7 @@ import os import re import sys +import tempfile from datetime import datetime, timezone from pathlib import Path from typing import Any, NamedTuple @@ -34,14 +35,15 @@ ) from macaron.output_reporter.reporter import FileReporter from macaron.output_reporter.results import Record, Report, SCMStatus -from macaron.repo_finder import repo_finder -from macaron.repo_finder.commit_finder import find_commit -from macaron.repo_finder.provenance_extractor import ( +from macaron.provenance.provenance_extractor import ( check_if_input_purl_provenance_conflict, check_if_input_repo_provenance_conflict, extract_repo_and_commit_from_provenance, ) -from macaron.repo_finder.provenance_finder import ProvenanceFinder, find_provenance_from_ci +from macaron.provenance.provenance_finder import ProvenanceFinder, find_provenance_from_ci +from macaron.provenance.provenance_verifier import verify_ci_provenance, verify_provenance +from macaron.repo_finder import repo_finder +from macaron.repo_finder.commit_finder import find_commit from macaron.slsa_analyzer import git_url from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.asset import VirtualReleaseAsset @@ -329,11 +331,10 @@ def run_single( if provenances: provenance_payload = provenances[0] if defaults.getboolean("analyzer", "verify_provenance"): - provenance_is_verified = provenance_finder.verify_provenance(parsed_purl, provenances) + provenance_is_verified = verify_provenance(parsed_purl, provenances) # Try to extract the repository URL and commit digest from the Provenance, if it exists. repo_path_input: str | None = config.get_value("path") - digest_input: str | None = config.get_value("digest") provenance_repo_url = provenance_commit_digest = None if provenance_payload: try: @@ -341,10 +342,8 @@ def run_single( provenance_payload ) except ProvenanceError as error: - logger.debug("Failed to extract repo or commit from provenance: %s", error) - - # Try to validate the input repo against provenance contents. - if provenance_repo_url and check_if_input_repo_provenance_conflict(repo_path_input, provenance_repo_url): + logger.debug("Failed to extract from provenance: %s", error) + if check_if_input_repo_provenance_conflict(repo_path_input, provenance_repo_url): return Record( record_id=repo_id, description="Input mismatch between repo and provenance.", @@ -378,18 +377,15 @@ def run_single( ) # Check if only one of the repo or digest came from direct input. - if git_obj and (provenance_repo_url or provenance_commit_digest) and parsed_purl: + if parsed_purl: if check_if_input_purl_provenance_conflict( - git_obj, bool(repo_path_input), - bool(digest_input), provenance_repo_url, - provenance_commit_digest, parsed_purl, ): return Record( record_id=repo_id, - description="Input mismatch between repo/commit (purl) and provenance.", + description="Input mismatch between repo (purl) and provenance.", pre_config=config, status=SCMStatus.ANALYSIS_FAILED, ) @@ -438,32 +434,37 @@ def run_single( if not provenance_payload: # Look for provenance using the CI. - provenance_payload = find_provenance_from_ci(analyze_ctx, git_obj) - # If found, verify analysis target against new provenance - if provenance_payload: - # If repository URL was not provided as input, check the one found during analysis. - if not repo_path_input and component.repository: - repo_path_input = component.repository.remote_path - - # Extract the digest and repository URL from provenance. - provenance_repo_url = provenance_commit_digest = None - try: - provenance_repo_url, provenance_commit_digest = extract_repo_and_commit_from_provenance( - provenance_payload - ) - except ProvenanceError as error: - logger.debug("Failed to extract repo or commit from provenance: %s", error) - - # Try to validate the input repo against provenance contents. - if provenance_repo_url and check_if_input_repo_provenance_conflict( - repo_path_input, provenance_repo_url - ): - return Record( - record_id=repo_id, - description="Input mismatch between repo/commit and provenance.", - pre_config=config, - status=SCMStatus.ANALYSIS_FAILED, - ) + with tempfile.TemporaryDirectory() as temp_dir: + provenance_payload = find_provenance_from_ci(analyze_ctx, git_obj, temp_dir) + # If found, validate analysis target against new provenance. + if provenance_payload: + # If repository URL was not provided as input, check the one found during analysis. + if not repo_path_input and component.repository: + repo_path_input = component.repository.remote_path + provenance_repo_url = provenance_commit_digest = None + try: + provenance_repo_url, provenance_commit_digest = extract_repo_and_commit_from_provenance( + provenance_payload + ) + except ProvenanceError as error: + logger.debug("Failed to extract from provenance: %s", error) + + if check_if_input_repo_provenance_conflict(repo_path_input, provenance_repo_url): + return Record( + record_id=repo_id, + description="Input mismatch between repo/commit and provenance.", + pre_config=config, + status=SCMStatus.ANALYSIS_FAILED, + ) + + # Also try to verify CI provenance contents. + verified = [] + for ci_info in analyze_ctx.dynamic_data["ci_services"]: + verified.append(verify_ci_provenance(analyze_ctx, ci_info, temp_dir)) + if not verified: + break + if verified and all(verified): + analyze_ctx.dynamic_data["provenance_l3_verified"] = True analyze_ctx.dynamic_data["provenance"] = provenance_payload if provenance_payload: diff --git a/src/macaron/slsa_analyzer/checks/provenance_l3_check.py b/src/macaron/slsa_analyzer/checks/provenance_l3_check.py index 76029d461..e133e8f29 100644 --- a/src/macaron/slsa_analyzer/checks/provenance_l3_check.py +++ b/src/macaron/slsa_analyzer/checks/provenance_l3_check.py @@ -3,40 +3,16 @@ """This module implements a check to verify a target repo has intoto provenance level 3.""" -import glob -import hashlib -import json -import logging -import os -import subprocess # nosec B404 -import tarfile -import tempfile -import zipfile -from dataclasses import dataclass -from enum import Enum -from pathlib import Path -from typing import NamedTuple - from sqlalchemy import ForeignKey from sqlalchemy.orm import Mapped, mapped_column -from macaron.config.defaults import defaults -from macaron.config.global_config import global_config -from macaron.database.table_definitions import CheckFacts, HashDigest, Provenance, ReleaseArtifact +from macaron.database.table_definitions import CheckFacts from macaron.slsa_analyzer.analyze_context import AnalyzeContext -from macaron.slsa_analyzer.asset import AssetLocator from macaron.slsa_analyzer.checks.base_check import BaseCheck from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence -from macaron.slsa_analyzer.ci_service.base_ci_service import BaseCIService, NoneCIService -from macaron.slsa_analyzer.git_url import get_repo_dir_name -from macaron.slsa_analyzer.provenance.intoto import InTotoV01Payload, v01 -from macaron.slsa_analyzer.provenance.intoto.errors import InTotoAttestationError, UnsupportedInTotoVersionError -from macaron.slsa_analyzer.provenance.loader import load_provenance_payload from macaron.slsa_analyzer.registry import registry from macaron.slsa_analyzer.slsa_req import ReqName -logger: logging.Logger = logging.getLogger(__name__) - class ProvenanceL3VerifiedFacts(CheckFacts): """The ORM mapping for justifications in provenance_l3 check.""" @@ -51,40 +27,6 @@ class ProvenanceL3VerifiedFacts(CheckFacts): } -class _VerifyArtifactResultType(Enum): - """Result of attempting to verify an asset.""" - - # slsa-verifier succeeded and the artifact passed verification - PASSED = "verify passed" - # slsa-verifier succeeded and the artifact failed verification - FAILED = "verify failed" - # An error occurred running slsa-verifier or downloading the artifact - ERROR = "verify error" - # The artifact was unable to be downloaded because the url was missing or malformed - NO_DOWNLOAD = "unable to download asset" - # The artifact was unable to be downloaded because the file was too large - TOO_LARGE = "asset file too large to download" - - def is_skip(self) -> bool: - """Return whether the verification was skipped.""" - return self in (_VerifyArtifactResultType.NO_DOWNLOAD, _VerifyArtifactResultType.TOO_LARGE) - - def is_fail(self) -> bool: - """Return whether the verification failed.""" - return self in (_VerifyArtifactResultType.FAILED, _VerifyArtifactResultType.ERROR) - - -@dataclass -class _VerifyArtifactResult: - """Dataclass storing the result of verifying a single asset.""" - - result: _VerifyArtifactResultType - artifact_name: str - - def __str__(self) -> str: - return f"{str(self.result.value)} : {self.artifact_name}" - - class ProvenanceL3Check(BaseCheck): """This Check checks whether the target repo has SLSA provenance level 3.""" @@ -111,161 +53,6 @@ def __init__(self) -> None: result_on_skip=CheckResultType.FAILED, ) - def _size_large(self, asset_size: int) -> bool: - """Check the size of the asset.""" - return asset_size > defaults.getint("slsa.verifier", "max_download_size", fallback=1000000) - - def _verify_slsa( - self, macaron_path: str, temp_path: str, prov_asset: AssetLocator, asset_name: str, repository_url: str - ) -> _VerifyArtifactResult: - """Run SLSA verifier to verify the artifact.""" - source_path = get_repo_dir_name(repository_url, sanitize=False) - if not source_path: - logger.error("Invalid repository source path to verify: %s.", repository_url) - return _VerifyArtifactResult(_VerifyArtifactResultType.NO_DOWNLOAD, asset_name) - - errors: list[str] = [] - result: _VerifyArtifactResult - cmd = [ - os.path.join(macaron_path, "bin/slsa-verifier"), - "verify-artifact", - os.path.join(temp_path, asset_name), - "--provenance-path", - os.path.join(temp_path, prov_asset.name), - "--source-uri", - source_path, - ] - - try: - verifier_output = subprocess.run( # nosec B603 - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - check=True, - cwd=temp_path, - timeout=defaults.getint("slsa.verifier", "timeout", fallback=120), - ) - - output = verifier_output.stdout.decode("utf-8") - if "PASSED: Verified SLSA provenance" in output: - result = _VerifyArtifactResult(_VerifyArtifactResultType.PASSED, asset_name) - else: - result = _VerifyArtifactResult(_VerifyArtifactResultType.FAILED, asset_name) - - log_path = os.path.join(global_config.build_log_path, f"{os.path.basename(source_path)}.slsa_verifier.log") - with open(log_path, mode="a", encoding="utf-8") as log_file: - logger.info("Storing SLSA verifier output for %s to %s", asset_name, log_path) - log_file.writelines( - [f"SLSA verifier output for cmd: {' '.join(cmd)}\n", output, "--------------------------------\n"] - ) - - except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as error: - logger.error(error) - errors.append(error.output.decode("utf-8")) - except OSError as error: - logger.error(error) - errors.append(str(error)) - - if errors: - result = _VerifyArtifactResult(result=_VerifyArtifactResultType.ERROR, artifact_name=asset_name) - try: - error_log_path = os.path.join( - global_config.build_log_path, f"{os.path.basename(source_path)}.slsa_verifier.errors" - ) - with open(error_log_path, mode="a", encoding="utf-8") as log_file: - logger.info("Storing SLSA verifier log for%s to %s", asset_name, error_log_path) - log_file.write(f"SLSA verifier output for cmd: {' '.join(cmd)}\n") - log_file.writelines(errors) - log_file.write("--------------------------------\n") - except OSError as error: - logger.error(error) - - return result - - def _extract_archive(self, file_path: str, temp_path: str) -> bool: - """Extract the archive file to the temporary path. - - Returns - ------- - bool - Returns True if successful. - """ - - def _validate_path_traversal(path: str) -> bool: - """Check for path traversal attacks.""" - if path.startswith("/") or ".." in path: - logger.debug("Found suspicious path in the archive file: %s.", path) - return False - try: - # Check if there are any symbolic links. - if os.path.realpath(path): - return True - except OSError as error: - logger.debug("Failed to extract artifact from archive file: %s", error) - return False - return False - - try: - if zipfile.is_zipfile(file_path): - with zipfile.ZipFile(file_path, "r") as zip_file: - members = (path for path in zip_file.namelist() if _validate_path_traversal(path)) - zip_file.extractall(temp_path, members=members) # nosec B202:tarfile_unsafe_members - return True - elif tarfile.is_tarfile(file_path): - with tarfile.open(file_path, mode="r:gz") as tar_file: - members_tarinfo = ( - tarinfo for tarinfo in tar_file.getmembers() if _validate_path_traversal(tarinfo.name) - ) - tar_file.extractall(temp_path, members=members_tarinfo) # nosec B202:tarfile_unsafe_members - return True - except (tarfile.TarError, zipfile.BadZipFile, zipfile.LargeZipFile, OSError, ValueError) as error: - logger.info(error) - - return False - - def _find_asset( - self, - subject: v01.InTotoV01Subject, - all_assets: list[dict[str, str]], - temp_path: str, - ci_service: BaseCIService, - ) -> dict | None: - """Find the artifacts that appear in the provenance subject. - - The artifacts can be directly found as a release asset or in an archive file. - """ - sub_asset = next( - (item for item in all_assets if item["name"] == os.path.basename(subject["name"])), - None, - ) - - if sub_asset: - return sub_asset - - extracted_artifact = glob.glob(os.path.join(temp_path, "**", os.path.basename(subject["name"])), recursive=True) - for artifact_path in extracted_artifact: - try: - with open(artifact_path, "rb") as file: - if hashlib.sha256(file.read()).hexdigest() == subject["digest"]["sha256"]: - return {"name": str(Path(artifact_path).relative_to(temp_path))} - except OSError as error: - logger.error("Error in check %s: %s", self.check_info.check_id, error) - continue - - for item in all_assets: - item_path = os.path.join(temp_path, item["name"]) - # Make sure to download an archive just once. - if not Path(item_path).is_file(): - # TODO: check that it's not too large. - if not ci_service.api_client.download_asset(item["url"], item_path): - logger.info("Could not download artifact %s. Skip verifying...", os.path.basename(item_path)) - break - - if self._extract_archive(file_path=item_path, temp_path=temp_path): - return self._find_asset(subject, all_assets, temp_path, ci_service) - - return None - def run_check(self, ctx: AnalyzeContext) -> CheckResultData: """Implement the check in this method. @@ -279,180 +66,11 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: CheckResultData The result of the check. """ - # TODO: During verification, we need to fetch the workflow and verify that it's not - # using self-hosted runners, custom containers or services, etc. - - class Feedback(NamedTuple): - """Store feedback item.""" - - #: The CI service name. - ci_service_name: str - - #: The provenance asset url. - prov_asset_url: str - - #: The verification result. - verify_result: _VerifyArtifactResult - - all_feedback: list[Feedback] = [] - ci_services = ctx.dynamic_data["ci_services"] - result_tables: list[CheckFacts] = [] - - for ci_info in ci_services: - ci_service = ci_info["service"] - - # Checking if a CI service is discovered for this repo. - if isinstance(ci_service, NoneCIService): - continue - - # Checking if we have found a release for the repo. - if not ci_info["release"] or "assets" not in ci_info["release"]: - logger.info("Could not find any release assets for the repository.") - break - - # Checking if we have found a SLSA provenance for the repo. - if not ci_info["provenance_assets"]: - logger.info("Could not find SLSA provenances.") - break - - prov_assets = ci_info["provenance_assets"] - all_assets = ci_info["release"]["assets"] - - # Download and verify the artifacts if they are not large. - # Create a temporary directory and automatically remove it when we are done. - try: - with tempfile.TemporaryDirectory() as temp_path: - downloaded_provs = [] - for prov_asset in prov_assets: - # Check the size before downloading. - if self._size_large(prov_asset.size_in_bytes): - logger.info("Skip verifying the provenance %s: asset size too large.", prov_asset.name) - continue - - if not ci_service.api_client.download_asset( - prov_asset.url, os.path.join(temp_path, prov_asset.name) - ): - logger.info("Could not download the provenance %s. Skip verifying...", prov_asset.name) - continue - - # Read the provenance. - provenance_payload = load_provenance_payload( - os.path.join(temp_path, prov_asset.name), - ) - - if not isinstance(provenance_payload, InTotoV01Payload): - raise UnsupportedInTotoVersionError( - f"The provenance asset '{prov_asset.name}' is under an unsupported in-toto version." - ) - - # Add the provenance file. - downloaded_provs.append(provenance_payload.statement) - - # Output provenance - prov = Provenance() - # TODO: fix commit reference for provenance when release/artifact as an analysis entrypoint is - # implemented ensure the provenance commit matches the actual release analyzed - prov.version = "0.2" - prov.release_commit_sha = "" - prov.provenance_json = json.dumps(provenance_payload.statement) - prov.release_tag = ci_info["release"]["tag_name"] - prov.component = ctx.component - - # Iterate through the subjects and verify. - for subject in provenance_payload.statement["subject"]: - sub_asset = self._find_asset(subject, all_assets, temp_path, ci_service) - - result: None | _VerifyArtifactResult = None - for _ in range(1): - if not sub_asset: - result = _VerifyArtifactResult( - result=_VerifyArtifactResultType.NO_DOWNLOAD, artifact_name=subject["name"] - ) - break - if not Path(temp_path, sub_asset["name"]).is_file(): - if "size" in sub_asset and self._size_large(sub_asset["size"]): - result = _VerifyArtifactResult( - result=_VerifyArtifactResultType.TOO_LARGE, - artifact_name=sub_asset["name"], - ) - break - if "url" in sub_asset and not ci_service.api_client.download_asset( - sub_asset["url"], os.path.join(temp_path, sub_asset["name"]) - ): - result = _VerifyArtifactResult( - result=_VerifyArtifactResultType.NO_DOWNLOAD, - artifact_name=sub_asset["name"], - ) - break - - result = self._verify_slsa( - ctx.macaron_path, - temp_path, - prov_asset, - sub_asset["name"], - ctx.component.repository.remote_path, - ) - - if result: - if result.result.is_skip(): - logger.info("Skipped verifying artifact: %s", result.result) - if result.result.is_fail(): - logger.info("Error verifying artifact: %s", result.result) - if result.result == _VerifyArtifactResultType.FAILED: - logger.info("Failed verifying artifact: %s", result.result) - if result.result == _VerifyArtifactResultType.PASSED: - logger.info("Successfully verified artifact: %s", result.result) - - all_feedback.append( - Feedback( - ci_service_name=ci_service.name, - prov_asset_url=prov_asset.url, - verify_result=result, - ) - ) - - # Store artifact information result to database. - artifact = ReleaseArtifact() - artifact.name = subject["name"] - artifact.slsa_verified = result.result == _VerifyArtifactResultType.PASSED - artifact.provenance = prov # pylint: disable=protected-access - - for k, val in subject["digest"].items(): - digest = HashDigest() - digest.digest_algorithm = k - digest.digest = val - # Foreign key relation. - digest.artifact = artifact - - except (OSError, InTotoAttestationError) as error: - logger.error(" %s: %s.", self.check_info.check_id, error) - return CheckResultData( - result_tables=result_tables, - result_type=CheckResultType.FAILED, - ) - result_value = CheckResultType.FAILED - if all_feedback: - failed = [ - feedback - for feedback in all_feedback - if feedback.verify_result.result == _VerifyArtifactResultType.FAILED - ] - - skipped = [ - feedback - for feedback in all_feedback - if feedback.verify_result.result - not in [_VerifyArtifactResultType.FAILED, _VerifyArtifactResultType.PASSED] - ] - - if failed or skipped: - result_value = CheckResultType.FAILED - else: - result_tables.append(ProvenanceL3VerifiedFacts(confidence=Confidence.HIGH)) - result_value = CheckResultType.PASSED - return CheckResultData(result_tables=result_tables, result_type=result_value) + if ctx.dynamic_data["provenance_l3_verified"]: + result_tables.append(ProvenanceL3VerifiedFacts(confidence=Confidence.HIGH)) + result_value = CheckResultType.PASSED return CheckResultData(result_tables=result_tables, result_type=result_value) diff --git a/src/macaron/slsa_analyzer/checks/provenance_verified_check.py b/src/macaron/slsa_analyzer/checks/provenance_verified_check.py index 4bcbc3a4c..6d762dbcf 100644 --- a/src/macaron/slsa_analyzer/checks/provenance_verified_check.py +++ b/src/macaron/slsa_analyzer/checks/provenance_verified_check.py @@ -68,7 +68,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: The result of the check. """ if ctx.dynamic_data["is_inferred_prov"] or not ctx.dynamic_data["provenance"]: - # Provenance is not available. + # 0. Provenance is not available. return CheckResultData( result_tables=[ProvenanceVerifiedFacts(build_level=0, confidence=Confidence.HIGH)], result_type=CheckResultType.FAILED, @@ -79,22 +79,20 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: if predicate: build_type = json_extract(predicate, ["buildType"], str) - if not ctx.dynamic_data["provenance_verified"]: - # Provenance is not verified. + if ( + build_type == "https://github.com/slsa-framework/slsa-github-generator/generic@v1" + and ctx.dynamic_data["provenance_l3_verified"] + ): + # 3. Provenance is created by the SLSA GitHub generator and verified. return CheckResultData( result_tables=[ - ProvenanceVerifiedFacts( - build_level=1, - build_type=build_type, - confidence=Confidence.HIGH, - ) + ProvenanceVerifiedFacts(build_level=3, build_type=build_type, confidence=Confidence.HIGH) ], - result_type=CheckResultType.FAILED, + result_type=CheckResultType.PASSED, ) - if build_type != "https://github.com/slsa-framework/slsa-github-generator/generic@v1": - # Provenance is verified but the build service does not isolate generation in the control plane from the - # untrusted build process. + if ctx.dynamic_data["provenance_verified"]: + # 2. Provenance is verified. return CheckResultData( result_tables=[ ProvenanceVerifiedFacts( @@ -106,10 +104,16 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: result_type=CheckResultType.PASSED, ) - # Provenance is created by the SLSA GitHub generator and verified. + # 1. Provenance is not verified. return CheckResultData( - result_tables=[ProvenanceVerifiedFacts(build_level=3, build_type=build_type, confidence=Confidence.HIGH)], - result_type=CheckResultType.PASSED, + result_tables=[ + ProvenanceVerifiedFacts( + build_level=1, + build_type=build_type, + confidence=Confidence.HIGH, + ) + ], + result_type=CheckResultType.FAILED, ) diff --git a/tests/integration/cases/urllib3_expectation_dir/policy.dl b/tests/integration/cases/urllib3_expectation_dir/policy.dl index 048252508..21353ca4f 100644 --- a/tests/integration/cases/urllib3_expectation_dir/policy.dl +++ b/tests/integration/cases/urllib3_expectation_dir/policy.dl @@ -13,6 +13,9 @@ Policy("test_policy", component_id, "") :- check_passed(component_id, "mcn_provenance_derived_commit_1"), check_passed(component_id, "mcn_provenance_derived_repo_1"), check_passed(component_id, "mcn_provenance_expectation_1"), + check_passed(component_id, "mcn_provenance_verified_1"), + provenance_verified_check(_, build_level, _), + build_level = 3, check_failed(component_id, "mcn_infer_artifact_pipeline_1"), check_failed(component_id, "mcn_provenance_witness_level_one_1"), check_failed(component_id, "mcn_trusted_builder_level_three_1"), diff --git a/tests/integration/cases/urllib3_expectation_file/policy.dl b/tests/integration/cases/urllib3_expectation_file/policy.dl index 79bfae7ee..d940b04ff 100644 --- a/tests/integration/cases/urllib3_expectation_file/policy.dl +++ b/tests/integration/cases/urllib3_expectation_file/policy.dl @@ -13,6 +13,9 @@ Policy("test_policy", component_id, "") :- check_passed(component_id, "mcn_provenance_derived_repo_1"), check_passed(component_id, "mcn_provenance_expectation_1"), check_passed(component_id, "mcn_provenance_level_three_1"), + check_passed(component_id, "mcn_provenance_verified_1"), + provenance_verified_check(_, build_level, _), + build_level = 3, check_failed(component_id, "mcn_infer_artifact_pipeline_1"), check_failed(component_id, "mcn_provenance_witness_level_one_1"), check_failed(component_id, "mcn_trusted_builder_level_three_1"), diff --git a/tests/integration/cases/urllib3_invalid_expectation/policy.dl b/tests/integration/cases/urllib3_invalid_expectation/policy.dl index e8a017826..c4c7fc3d2 100644 --- a/tests/integration/cases/urllib3_invalid_expectation/policy.dl +++ b/tests/integration/cases/urllib3_invalid_expectation/policy.dl @@ -12,6 +12,9 @@ Policy("test_policy", component_id, "") :- check_passed(component_id, "mcn_provenance_derived_commit_1"), check_passed(component_id, "mcn_provenance_derived_repo_1"), check_passed(component_id, "mcn_provenance_level_three_1"), + check_passed(component_id, "mcn_provenance_verified_1"), + provenance_verified_check(_, build_level, _), + build_level = 3, check_failed(component_id, "mcn_infer_artifact_pipeline_1"), check_failed(component_id, "mcn_provenance_witness_level_one_1"), check_failed(component_id, "mcn_trusted_builder_level_three_1"), diff --git a/tests/provenance/__init__.py b/tests/provenance/__init__.py new file mode 100644 index 000000000..c8a50abb7 --- /dev/null +++ b/tests/provenance/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. diff --git a/tests/repo_finder/test_provenance_extractor.py b/tests/provenance/test_provenance_extractor.py similarity index 99% rename from tests/repo_finder/test_provenance_extractor.py rename to tests/provenance/test_provenance_extractor.py index 0fc2460ef..d8e5df7e1 100644 --- a/tests/repo_finder/test_provenance_extractor.py +++ b/tests/provenance/test_provenance_extractor.py @@ -9,7 +9,7 @@ from macaron.errors import ProvenanceError from macaron.json_tools import JsonType, json_extract -from macaron.repo_finder.provenance_extractor import ( +from macaron.provenance.provenance_extractor import ( check_if_repository_purl_and_url_match, extract_repo_and_commit_from_provenance, ) diff --git a/tests/repo_finder/test_provenance_finder.py b/tests/provenance/test_provenance_finder.py similarity index 92% rename from tests/repo_finder/test_provenance_finder.py rename to tests/provenance/test_provenance_finder.py index 3426fed0d..37e890a6f 100644 --- a/tests/repo_finder/test_provenance_finder.py +++ b/tests/provenance/test_provenance_finder.py @@ -14,7 +14,7 @@ from pydriller import Git from macaron.code_analyzer.call_graph import BaseNode, CallGraph -from macaron.repo_finder.provenance_finder import find_gav_provenance, find_npm_provenance, find_provenance_from_ci +from macaron.provenance.provenance_finder import find_gav_provenance, find_npm_provenance, find_provenance_from_ci from macaron.slsa_analyzer.ci_service import BaseCIService, CircleCI, GitHubActions, GitLabCI, Jenkins, Travis from macaron.slsa_analyzer.git_service.api_client import GhAPIClient from macaron.slsa_analyzer.package_registry import JFrogMavenRegistry, NPMRegistry @@ -165,8 +165,9 @@ def test_provenance_on_unsupported_ci(macaron_path: Path, service: BaseCIService ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir="") ctx.dynamic_data["ci_services"] = [ci_info] - provenance = find_provenance_from_ci(ctx, None) - assert provenance is None + with tempfile.TemporaryDirectory() as temp_dir: + provenance = find_provenance_from_ci(ctx, None, temp_dir) + assert provenance is None def test_provenance_on_supported_ci(macaron_path: Path, test_dir: Path) -> None: @@ -190,13 +191,15 @@ def test_provenance_on_supported_ci(macaron_path: Path, test_dir: Path) -> None: # Test with a valid setup. git_obj = MockGit() - provenance = find_provenance_from_ci(ctx, git_obj) - assert provenance + with tempfile.TemporaryDirectory() as temp_dir: + provenance = find_provenance_from_ci(ctx, git_obj, temp_dir) + assert provenance # Test with a repo that doesn't have any accepted provenance. api_client.release = {"assets": [{"name": "attestation.intoto", "url": "URL", "size": 10}]} - provenance = find_provenance_from_ci(ctx, MockGit()) - assert provenance is None + with tempfile.TemporaryDirectory() as temp_dir: + provenance = find_provenance_from_ci(ctx, MockGit(), temp_dir) + assert provenance is None def test_provenance_available_on_npm_registry(