Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor provenance level 3 check into analysis #817

Open
wants to merge 2 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
macaron.provenance package
==========================

.. automodule:: macaron.provenance
:members:
:undoc-members:
:show-inheritance:

Submodules
----------

macaron.provenance.provenance\_extractor module
-----------------------------------------------

.. automodule:: macaron.provenance.provenance_extractor
:members:
:undoc-members:
:show-inheritance:

macaron.provenance.provenance\_finder module
--------------------------------------------

.. automodule:: macaron.provenance.provenance_finder
:members:
:undoc-members:
:show-inheritance:

macaron.provenance.provenance\_verifier module
----------------------------------------------

.. automodule:: macaron.provenance.provenance_verifier
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,6 @@ macaron.repo\_finder.commit\_finder module
:undoc-members:
:show-inheritance:

macaron.repo\_finder.provenance\_extractor module
-------------------------------------------------

.. automodule:: macaron.repo_finder.provenance_extractor
:members:
:undoc-members:
:show-inheritance:

macaron.repo\_finder.provenance\_finder module
----------------------------------------------

.. automodule:: macaron.repo_finder.provenance_finder
:members:
:undoc-members:
:show-inheritance:

macaron.repo\_finder.repo\_finder module
----------------------------------------

Expand Down
1 change: 1 addition & 0 deletions docs/source/pages/developers_guide/apidoc/macaron.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Subpackages
macaron.output_reporter
macaron.parsers
macaron.policy_engine
macaron.provenance
macaron.repo_finder
macaron.slsa_analyzer
macaron.vsa
Expand Down
4 changes: 4 additions & 0 deletions src/macaron/provenance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This package contains the provenance tools for software components."""
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@
import urllib.parse

from packageurl import PackageURL
from pydriller import Git

from macaron.errors import ProvenanceError
from macaron.json_tools import JsonType, json_extract
from macaron.repo_finder.commit_finder import (
AbstractPurlType,
determine_abstract_purl_type,
extract_commit_from_version,
)
from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type
from macaron.repo_finder.repo_finder import to_domain_from_known_purl_types
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload

Expand Down Expand Up @@ -275,27 +270,18 @@ def check_if_input_repo_provenance_conflict(


def check_if_input_purl_provenance_conflict(
git_obj: Git,
repo_path_input: bool,
digest_input: bool,
provenance_repo_url: str | None,
provenance_commit_digest: str | None,
purl: PackageURL,
) -> bool:
"""Test if the input repository type PURL's repo and commit match the contents of the provenance.

Parameters
----------
git_obj: Git
The Git object.
repo_path_input: bool
True if there is a repo as input.
digest_input: str
True if there is a commit as input.
provenance_repo_url: str | None
The repo url from provenance.
provenance_commit_digest: str | None
The commit digest from provenance.
purl: PackageURL
The input repository PURL.

Expand All @@ -318,18 +304,6 @@ def check_if_input_purl_provenance_conflict(
)
return True

# Check the PURL commit against the provenance.
tromai marked this conversation as resolved.
Show resolved Hide resolved
if not digest_input and provenance_commit_digest and purl.version:
purl_commit = extract_commit_from_version(git_obj, purl.version)
if purl_commit and purl_commit != provenance_commit_digest:
logger.debug(
"The commit digest passed via purl input does not match what exists in the "
"provenance. Purl Commit: %s, Provenance Commit: %s.",
purl_commit,
provenance_commit_digest,
)
return True

return False


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,38 +107,6 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
logger.debug("No provenance found.")
return []

def verify_provenance(self, purl: PackageURL, provenance: list[InTotoPayload]) -> bool:
"""Verify the passed provenance.

Parameters
----------
purl: PackageURL
The PURL of the analysis target.
provenance: list[InTotoPayload]
The list of provenance.

Returns
-------
bool
True if the provenance could be verified, or False otherwise.
"""
if determine_abstract_purl_type(purl) == AbstractPurlType.REPOSITORY:
# Do not perform default verification for repository type targets.
return False

verification_function = None

if purl.type == "npm":
verification_function = partial(verify_npm_provenance, purl, provenance)

# TODO other verification functions go here.

if verification_function:
return verification_function()

logger.debug("Provenance verification not supported for PURL type: %s", purl.type)
return False


def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoPayload]:
"""Find and download the NPM based provenance for the passed PURL.
Expand Down Expand Up @@ -213,72 +181,6 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP
return []


def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool:
"""Compare the unsigned payload subject digest with the signed payload digest, if available.

Parameters
----------
purl: PackageURL
The PURL of the analysis target.
provenance: list[InTotoPayload]
The provenances to verify.

Returns
-------
bool
True if the provenance was verified, or False otherwise.
"""
if len(provenance) != 2:
logger.debug("Expected unsigned and signed provenance.")
return False

signed_subjects = provenance[1].statement.get("subject")
if not signed_subjects:
return False

unsigned_subjects = provenance[0].statement.get("subject")
if not unsigned_subjects:
return False

found_signed_subject = None
for signed_subject in signed_subjects:
name = signed_subject.get("name")
if name and name == str(purl):
found_signed_subject = signed_subject
break

if not found_signed_subject:
return False

found_unsigned_subject = None
for unsigned_subject in unsigned_subjects:
name = unsigned_subject.get("name")
if name and name == str(purl):
found_unsigned_subject = unsigned_subject
break

if not found_unsigned_subject:
return False

signed_digest = found_signed_subject.get("digest")
unsigned_digest = found_unsigned_subject.get("digest")
if not (signed_digest and unsigned_digest):
return False

# For signed and unsigned to match, the digests must be identical.
if signed_digest != unsigned_digest:
return False

key = list(signed_digest.keys())[0]
logger.debug(
"Verified provenance against signed companion. Signed: %s, Unsigned: %s.",
signed_digest[key][:7],
unsigned_digest[key][:7],
)

return True


def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[InTotoPayload]:
"""Find and download the GAV based provenance for the passed PURL.

Expand Down Expand Up @@ -373,7 +275,9 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
return provenances[:1]


def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> InTotoPayload | None:
def find_provenance_from_ci(
analyze_ctx: AnalyzeContext, git_obj: Git | None, download_path: str
) -> InTotoPayload | None:
"""Try to find provenance from CI services of the repository.

Note that we stop going through the CI services once we encounter a CI service
Expand All @@ -385,9 +289,11 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) ->
Parameters
----------
analyze_ctx: AnalyzeContext
The contenxt of the ongoing analysis.
The context of the ongoing analysis.
git_obj: Git | None
The Pydriller Git object representing the repository, if any.
download_path: str
The pre-existing location to download discovered files to.

Returns
-------
Expand Down Expand Up @@ -463,9 +369,7 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) ->
ci_info["provenance_assets"].extend(provenance_assets)

# Download the provenance assets and load the provenance payloads.
download_provenances_from_github_actions_ci_service(
ci_info,
)
download_provenances_from_ci_service(ci_info, download_path)

# TODO consider how to handle multiple payloads here.
return ci_info["provenances"][0].payload if ci_info["provenances"] else None
Expand All @@ -476,56 +380,60 @@ def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) ->
return None


def download_provenances_from_github_actions_ci_service(ci_info: CIInfo) -> None:
def download_provenances_from_ci_service(ci_info: CIInfo, download_path: str) -> None:
tromai marked this conversation as resolved.
Show resolved Hide resolved
"""Download provenances from GitHub Actions.

Parameters
----------
ci_info: CIInfo,
A ``CIInfo`` instance that holds a GitHub Actions git service object.
download_path: str
The pre-existing location to download discovered files to.
"""
ci_service = ci_info["service"]
prov_assets = ci_info["provenance_assets"]

if not os.path.isdir(download_path):
logger.debug("Download location is not a valid directory.")
return
try:
with tempfile.TemporaryDirectory() as temp_path:
downloaded_provs = []
for prov_asset in prov_assets:
# Check the size before downloading.
if prov_asset.size_in_bytes > defaults.getint(
"slsa.verifier",
"max_download_size",
fallback=1000000,
):
logger.info(
"Skip verifying the provenance %s: asset size too large.",
prov_asset.name,
)
continue
downloaded_provs = []
for prov_asset in prov_assets:
# Check the size before downloading.
if prov_asset.size_in_bytes > defaults.getint(
"slsa.verifier",
"max_download_size",
fallback=1000000,
):
logger.info(
"Skip verifying the provenance %s: asset size too large.",
prov_asset.name,
)
continue

provenance_filepath = os.path.join(temp_path, prov_asset.name)
provenance_filepath = os.path.join(download_path, prov_asset.name)

if not ci_service.api_client.download_asset(
prov_asset.url,
provenance_filepath,
):
logger.debug(
"Could not download the provenance %s. Skip verifying...",
prov_asset.name,
)
continue
if not ci_service.api_client.download_asset(
prov_asset.url,
provenance_filepath,
):
logger.debug(
"Could not download the provenance %s. Skip verifying...",
prov_asset.name,
)
continue

# Read the provenance.
try:
payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as error:
logger.error("Error logging provenance: %s", error)
continue
# Read the provenance.
try:
payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as error:
logger.error("Error logging provenance: %s", error)
continue

# Add the provenance file.
downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset))
# Add the provenance file.
downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset))

# Persist the provenance payloads into the CIInfo object.
ci_info["provenances"] = downloaded_provs

except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)
Loading
Loading