From 69dab0b5166608c9399693de36953d22480454ed Mon Sep 17 00:00:00 2001 From: Yeison Vargas Date: Fri, 19 Jan 2024 20:03:46 -0500 Subject: [PATCH] fix: add back the license legacy cmd (#498) --- safety/cli.py | 65 ++++++++++++++- safety/cli_util.py | 2 +- safety/errors.py | 4 +- safety/reqs_scan.py | 69 --------------- safety/safety.py | 175 +-------------------------------------- safety/scan/constants.py | 2 + safety/util.py | 8 ++ tests/test_cli.py | 25 ++++++ tests/test_safety.py | 2 +- 9 files changed, 104 insertions(+), 248 deletions(-) delete mode 100644 safety/reqs_scan.py diff --git a/safety/cli.py b/safety/cli.py index a2c3768e..3e8bcad0 100644 --- a/safety/cli.py +++ b/safety/cli.py @@ -21,7 +21,7 @@ from safety.alerts import alert from safety.auth import auth, inject_session, proxy_options, auth_options from safety.auth.models import Organization -from safety.scan.constants import CLI_MAIN_INTRODUCTION, CLI_DEBUG_HELP, CLI_DISABLE_OPTIONAL_TELEMETRY_DATA_HELP, \ +from safety.scan.constants import CLI_LICENSES_COMMAND_HELP, CLI_MAIN_INTRODUCTION, CLI_DEBUG_HELP, CLI_DISABLE_OPTIONAL_TELEMETRY_DATA_HELP, \ DEFAULT_EPILOG, DEFAULT_SPINNER, CLI_CHECK_COMMAND_HELP, CLI_CHECK_UPDATES_HELP, CLI_CONFIGURE_HELP, CLI_GENERATE_HELP, \ CLI_CONFIGURE_PROXY_TIMEOUT, CLI_CONFIGURE_PROXY_REQUIRED, CLI_CONFIGURE_ORGANIZATION_ID, CLI_CONFIGURE_ORGANIZATION_NAME, \ CLI_CONFIGURE_SAVE_TO_SYSTEM, CLI_CONFIGURE_PROXY_HOST_HELP, CLI_CONFIGURE_PROXY_PORT_HELP, CLI_CONFIGURE_PROXY_PROTOCOL_HELP, \ @@ -308,6 +308,69 @@ def check(ctx, db, full_report, stdin, files, cache, ignore, ignore_unpinned_req output_exception(exception, exit_code_output=exit_code) +def clean_license_command(f): + """ + Main entry point for validation. + """ + @wraps(f) + def inner(ctx, *args, **kwargs): + # TODO: Remove this soon, for now it keeps a legacy behavior + kwargs.pop("key", None) + kwargs.pop('proxy_protocol', None) + kwargs.pop('proxy_host', None) + kwargs.pop('proxy_port', None) + + return f(ctx, *args, **kwargs) + + return inner + + +@cli.command(cls=SafetyCLILegacyCommand, utility_command=True, help=CLI_LICENSES_COMMAND_HELP) +@proxy_options +@auth_options(stage=False) +@click.option("--db", default="", + help="Path to a local license database. Default: empty") +@click.option('--output', "-o", type=click.Choice(['screen', 'text', 'json', 'bare'], case_sensitive=False), + default='screen') +@click.option("--cache", default=0, + help='Whether license database file should be cached.' + 'Default: 0 seconds') +@click.option("files", "--file", "-r", multiple=True, type=click.File(), + help="Read input from one (or multiple) requirement files. Default: empty") +@click.pass_context +@clean_license_command +def license(ctx, db, output, cache, files): + """ + Find the open source licenses used by your Python dependencies. + """ + LOG.info('Running license command') + packages = get_packages(files, False) + licenses_db = {} + + SafetyContext().params = ctx.params + + try: + licenses_db = safety.get_licenses(session=ctx.obj.auth.client, db_mirror=db, cached=cache, + telemetry=ctx.obj.config.telemetry_enabled) + except SafetyError as e: + LOG.exception('Expected SafetyError happened: %s', e) + output_exception(e, exit_code_output=False) + except Exception as e: + LOG.exception('Unexpected Exception happened: %s', e) + exception = e if isinstance(e, SafetyException) else SafetyException(info=e) + output_exception(exception, exit_code_output=False) + + filtered_packages_licenses = get_packages_licenses(packages=packages, licenses_db=licenses_db) + + announcements = [] + if not db: + announcements = safety.get_announcements(session=ctx.obj.auth.client, telemetry=ctx.obj.config.telemetry_enabled) + + output_report = SafetyFormatter(output=output).render_licenses(announcements, filtered_packages_licenses) + + click.secho(output_report, nl=True) + + @cli.command(cls=SafetyCLILegacyCommand, utility_command=True, help=CLI_GENERATE_HELP) @click.option("--path", default=".", help=CLI_GENERATE_PATH) @click.argument('name', required=True) diff --git a/safety/cli_util.py b/safety/cli_util.py index abac0e96..02692396 100644 --- a/safety/cli_util.py +++ b/safety/cli_util.py @@ -540,7 +540,7 @@ def invoke(self, ctx): # Workaround for legacy check options, that now are global options subcommand_args = set(args) PROXY_HOST_OPTIONS = set(["--proxy-host", "-ph"]) - if "check" in ctx.protected_args and (bool(PROXY_HOST_OPTIONS.intersection(subcommand_args) or "--key" in subcommand_args)) : + if "check" in ctx.protected_args or "license" in ctx.protected_args and (bool(PROXY_HOST_OPTIONS.intersection(subcommand_args) or "--key" in subcommand_args)) : proxy_options, key = self.parse_legacy_args(args) if proxy_options: ctx.params.update(proxy_options) diff --git a/safety/errors.py b/safety/errors.py index c50d789f..35a8bce6 100644 --- a/safety/errors.py +++ b/safety/errors.py @@ -81,10 +81,10 @@ def get_exit_code(self): class InvalidCredentialError(DatabaseFetchError): - def __init__(self, credential: Optional[str] = None, message="Your authentication credential '{credential}' is invalid. See {link}.", reason=None): + def __init__(self, credential: Optional[str] = None, message="Your authentication credential{credential}is invalid. See {link}.", reason=None): self.credential = credential self.link = 'https://bit.ly/3OY2wEI' - self.message = message.format(credential=self.credential, link=self.link) if self.credential else message.format(link=self.link) + self.message = message.format(credential=f" '{self.credential}' ", link=self.link) if self.credential else message.format(credential=' ', link=self.link) info = f" Reason: {reason}" self.message = self.message + (info if reason else "") super().__init__(self.message) diff --git a/safety/reqs_scan.py b/safety/reqs_scan.py deleted file mode 100644 index 8e5ba1ed..00000000 --- a/safety/reqs_scan.py +++ /dev/null @@ -1,69 +0,0 @@ -# Python code to search for requirements files -import os -import random - -def scan_file(root, filename): - if filename.endswith('requirements.txt'): - file_name_randomizer = round(random.random()*100000000) - filepath = f"{root}/{str(filename)}" - print(f" Scanning {filepath} using safety check") - os.system(f"safety check -r {filepath} --cache 100 --output json >> {save_results_path}/{file_name_randomizer}-scan.json") - -def check_file_name(root, filename): - if filename.endswith('requirements.txt') or filename.endswith('pyproject.toml') or filename.endswith('poetry.lock') or filename.endswith('Pipfile') or filename.endswith('Pipfile.lock'): - print (f"found requirements file: {root}/{str(filename)}") - scan_file(root, filename) - -# From https://gist.github.com/TheMatt2/faf5ca760c61a267412c46bb977718fa -def walklevel(path, depth = 1, deny_list = []): - """It works just like os.walk, but you can pass it a level parameter - that indicates how deep the recursion will go. - If depth is 1, the current directory is listed. - If depth is 0, nothing is returned. - If depth is -1 (or less than 0), the full depth is walked. - """ - - # If depth is negative, just walk - # Not using yield from for python2 compat - # and copy dirs to keep consistant behavior for depth = -1 and depth = inf - if depth < 0: - for root, dirs, files in os.walk(path): - yield root, dirs[:], files - return - elif depth == 0: - return - - # path.count(os.path.sep) is safe because - # - On Windows "\\" is never allowed in the name of a file or directory - # - On UNIX "/" is never allowed in the name of a file or directory - # - On MacOS a literal "/" is quitely translated to a ":" so it is still - # safe to count "/". - base_depth = path.rstrip(os.path.sep).count(os.path.sep) - for root, dirs, files in os.walk(path): - for idx, directory in enumerate(dirs): - if f"{root}{directory}" in deny_list: - print(f"Not scanning {root}{directory}") - del dirs[idx] - yield root, dirs[:], files - cur_depth = root.count(os.path.sep) - if base_depth + depth <= cur_depth: - del dirs[:] - - - -# This is to get the directory that the program -# is currently running in. -current_path = os.path.dirname(os.path.realpath(__file__)) -save_results_path = f"{current_path}/.demo_safety_scan_results" -starting_path = "/" -print(f"Scanning from {starting_path}") - -folder_depth_limit = 8 -paths_excluded_list = ["/System"] - -# Create folder for scan results -os.system(f"mkdir {save_results_path}") - -for root, dirs, files in walklevel(starting_path, folder_depth_limit, paths_excluded_list): - for file in files: - check_file_name(root, file) diff --git a/safety/safety.py b/safety/safety.py index 72543bd9..86aa172f 100644 --- a/safety/safety.py +++ b/safety/safety.py @@ -986,11 +986,7 @@ def review(*, report=None, params=None): @sync_safety_context def get_licenses(*, session=None, db_mirror=False, cached=0, telemetry=True): - # key = key if key else os.environ.get("SAFETY_API_KEY", False) - key = session.api_key - - if not key and not db_mirror: - raise InvalidCredentialError(message="The API-KEY was not provided.") + if db_mirror: mirrors = [db_mirror] else: @@ -1175,172 +1171,3 @@ def save_report(path: str, default_name: str, report: str): with open(save_at, 'w+') as report_file: report_file.write(report) - - -import os -import subprocess - -def walklevel(path, depth = 1, deny_list = []): - """It works just like os.walk, but you can pass it a level parameter - that indicates how deep the recursion will go. - If depth is 1, the current directory is listed. - If depth is 0, nothing is returned. - If depth is -1 (or less than 0), the full depth is walked. - """ - - # If depth is negative, just walk - # Not using yield from for python2 compat - # and copy dirs to keep consistant behavior for depth = -1 and depth = inf - if depth < 0: - for root, dirs, files in os.walk(path): - yield root, dirs[:], files - return - elif depth == 0: - return - - # path.count(os.path.sep) is safe because - # - On Windows "\\" is never allowed in the name of a file or directory - # - On UNIX "/" is never allowed in the name of a file or directory - # - On MacOS a literal "/" is quitely translated to a ":" so it is still - # safe to count "/". - base_depth = path.rstrip(os.path.sep).count(os.path.sep) - for root, dirs, files in os.walk(path): - for idx, directory in enumerate(dirs): - if f"{root}{directory}" in deny_list: - # print(f"Not scanning {root}{directory}") - del dirs[idx] - yield root, dirs[:], files - cur_depth = root.count(os.path.sep) - if base_depth + depth <= cur_depth: - del dirs[:] - -def scan_directory(directory, timeout=None, max_depth=0, current_path=None): - virtual_envs = [] - python_interpreters = [] - requirements_files = [] - - deny_list = { - # Windows directories - "C:\\Windows", - "C:\\Program Files", - "C:\\Program Files (x86)", - "C:\\ProgramData", - - # Linux and macOS directories - "/usr", - "/usr/local", - "/opt", - "/var", - "/etc", - "/Library", - "/System", - "/Applications", - "~/Library", - "/proc", - "/dev" - } - - go_up = '' - - for root, dirs, files in walklevel(directory, max_depth, deny_list): - # Skip symbolic links, /proc, and /dev directories - # if os.path.islink(root) or root.startswith('/proc') or root.startswith('/dev'): - # continue - - status = f'Scanning: {root.strip()}...' - found = f'Python items found: {len(python_interpreters) + len(requirements_files)}' - status_pad = ' ' * (get_terminal_size().columns - len(status)) - found_pad = ' ' * (get_terminal_size().columns - len(found)) - - click.echo('{}{}\r{}'.format(go_up, f"{status}{status_pad}\n", f"{found}{found_pad}"), nl=False, err=True) - - if not go_up: - go_up = "\033[F" - - # Look for Python interpreters and requirements - for file_name in files: - file_path = os.path.join(root, file_name) - - if file_name.endswith('requirements.txt'): - file_name_randomizer = round(random.random()*100000000) - filepath = f"{root}/{str(file_name)}" - requirements_files.append(file_path) - os.system(f"safety check -r {filepath} --cache 100 --output json >> {current_path}/requirements/{file_name_randomizer}-scan.json") - - if file_name.startswith('python'): - try: - - p = file_path - - # Check if the path is a symbolic link - # if os.path.islink(file_path): - # p = os.path.realpath(file_path) - # if os.path.islink(p): - # p = os.path.realpath(p) - # if os.path.islink(p): - # p = os.path.realpath(p) - - output = subprocess.check_output( - [file_path, '--version', '--version'], - stderr=subprocess.STDOUT, timeout=timeout) - result = output.decode('utf-8') - - if result.startswith('Python'): - output = subprocess.check_output([p, '-m', 'pip', 'freeze'], stderr=subprocess.STDOUT, timeout=timeout) - requirements += output.decode('utf-8') + '\n' - - # Command to run - cmd = ['safety', 'check', '--cache', '100', '--output', 'json' '--stdin'] - - if not requirements: - continue - - # click.secho("Safety check is running...") - # Run the command and pass the string as stdin - result = subprocess.run(cmd, input=requirements, text=True, capture_output=True) - file_name_randomizer = round(random.random()*100000000) - with open(f"{current_path}/environments/{file_name_randomizer}-scan.json", "w") as outfile: - outfile.write(result.stdout) - - - # output = subprocess.check_output( - # [file_path, '--version', '--version'], - # stderr=subprocess.STDOUT, timeout=timeout) - # result = output.decode('utf-8') - - # if result.startswith('Python'): - # parts = result.split() - # # Extract the version number - # version = parts[1] - - # # Extract the date and time - # date = re.findall(r'\(.*?\)', result)[0].strip('()') - - # # Extract the compiler information - # compiler = re.findall(r'\[.*?\]', result)[0].strip('[]') - - python_interpreters.append(file_path) - except Exception: - pass - - requirements = "" - - click.secho(f"Results are save in {current_path}/...") - - # for interp in python_interpreters: - # output = subprocess.check_output( - # [interp, '-m', 'pip', 'freeze'], stderr=subprocess.STDOUT, timeout=timeout) - # requirements += output.decode('utf-8') + '\n' - - # # Command to run - # cmd = ['safety', 'check', '--stdin'] - - # if not requirements: - # click.secho("No packages found.") - - # click.secho("Safety check is running...") - # # Run the command and pass the string as stdin - # result = subprocess.run(cmd, input=requirements, text=True, capture_output=True) - - # click.secho(result.stdout) - # click.secho(result.stderr) diff --git a/safety/scan/constants.py b/safety/scan/constants.py index e499f7cd..adc91a91 100644 --- a/safety/scan/constants.py +++ b/safety/scan/constants.py @@ -25,6 +25,8 @@ CLI_CHECK_COMMAND_HELP = "\\[deprecated] Find vulnerabilities at target files or enviroments. Now replaced by [bold]safety scan[/bold], and will be unsupported beyond 1 May 2024." \ "\n[bold]Example: safety check -r requirements.txt[/bold]" +CLI_LICENSES_COMMAND_HELP = "\\[deprecated] Find licenses at target files or enviroments. This command will be replaced by [bold]safety scan[/bold], and will be unsupported beyond 1 May 2024." \ +"\n[bold]Example: safety license -r requirements.txt[/bold]" CLI_ALERT_COMMAND_HELP = "\\[deprecated] Create GitHub pull requests or GitHub issues using a `safety check` json report file. Being replaced by newer features." \ diff --git a/safety/util.py b/safety/util.py index b720b044..feef3747 100644 --- a/safety/util.py +++ b/safety/util.py @@ -771,6 +771,14 @@ def get_packages_licenses(*, packages=None, licenses_db=None): pkg_name = canonicalize_name(pkg.name) # packages may have different licenses depending their version. pkg_licenses = packages_licenses_db.get(pkg_name, []) + if not pkg.version: + for req in pkg.requirements: + if is_pinned_requirement(req.specifier): + pkg.version = next(iter(req.specifier)).version + break + + if not pkg.version: + continue version_requested = parse_version(pkg.version) license_id = None license_name = None diff --git a/tests/test_cli.py b/tests/test_cli.py index d8ac7215..54b02ef8 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -483,3 +483,28 @@ def test_basic_html_output_pass(self): self.assertIn("remediations-suggested", result.stdout) self.assertIn("Use API Key", result.stdout) + + @patch('safety.safety.fetch_database_url') + def test_license_with_file(self, fetch_database_url): + licenses_db = { + "licenses": { + "BSD-3-Clause": 2 + }, + "packages": { + "django": [ + { + "start_version": "0.0", + "license_id": 2 + } + ] + } + } + + mock = Mock() + mock.return_value = licenses_db + + dirname = os.path.dirname(__file__) + test_filename = os.path.join(dirname, "reqs_4.txt") + result = self.runner.invoke(cli.cli, ['license', '--key', 'foo', '--file', test_filename]) + print(result.stdout) + self.assertEqual(result.exit_code, 0) \ No newline at end of file diff --git a/tests/test_safety.py b/tests/test_safety.py index 086c5e81..3fda4cf6 100644 --- a/tests/test_safety.py +++ b/tests/test_safety.py @@ -237,7 +237,7 @@ def test_get_packages_licenses_without_api_key(self): telemetry=False ) db_generic_exception = error.exception - self.assertEqual(str(db_generic_exception), 'The API-KEY was not provided.') + self.assertEqual(str(db_generic_exception), 'Your authentication credential is invalid. See https://bit.ly/3OY2wEI.') def test_get_packages_licenses_with_invalid_api_key(self):