From 34367a739fdd9103d78a3c7772dbf469b342dae0 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 5 Nov 2024 13:06:08 +0100 Subject: [PATCH 1/2] Create a new helper function get_argument --- aikido_zen/helpers/get_argument.py | 10 ++++ aikido_zen/helpers/get_argument_test.py | 62 +++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 aikido_zen/helpers/get_argument.py create mode 100644 aikido_zen/helpers/get_argument_test.py diff --git a/aikido_zen/helpers/get_argument.py b/aikido_zen/helpers/get_argument.py new file mode 100644 index 00000000..83e338f8 --- /dev/null +++ b/aikido_zen/helpers/get_argument.py @@ -0,0 +1,10 @@ +"""Exports get_argument""" + + +def get_argument(args, kwargs, pos, name): + """Checks kwargs and args for your argument""" + if name in kwargs: + return kwargs.get(name) + if args and len(args) > pos: + return args[pos] + return None diff --git a/aikido_zen/helpers/get_argument_test.py b/aikido_zen/helpers/get_argument_test.py new file mode 100644 index 00000000..a87e5776 --- /dev/null +++ b/aikido_zen/helpers/get_argument_test.py @@ -0,0 +1,62 @@ +import pytest +from .get_argument import get_argument + + +def test_get_argument_with_only_kwargs(): + """Test when only kwargs are provided.""" + result = get_argument((), {"arg1": "value1"}, 0, "arg1") + assert result == "value1", f"Expected 'value1', got {result}" + + +def test_get_argument_with_only_args(): + """Test when only args are provided.""" + result = get_argument(("value2",), {}, 0, "arg1") + assert result == "value2", f"Expected 'value2', got {result}" + + +def test_get_argument_with_args_and_kwargs(): + """Test when both args and kwargs are provided, with priority to kwargs.""" + result = get_argument(("value2",), {"arg1": "value1"}, 0, "arg1") + assert result == "value1", f"Expected 'value1', got {result}" + + +def test_get_argument_with_positional_index(): + """Test when args are provided and a specific position is requested.""" + result = get_argument(("value2", "value3"), {}, 1, "arg1") + assert result == "value3", f"Expected 'value3', got {result}" + + +def test_get_argument_with_positional_index_out_of_bounds(): + """Test when the positional index is out of bounds.""" + result = get_argument(("value2",), {}, 1, "arg1") + assert result is None, f"Expected None, got {result}" + + +def test_get_argument_with_none_in_kwargs(): + """Test when the argument in kwargs is None.""" + result = get_argument((), {"arg1": None}, 0, "arg1") + assert result is None, f"Expected None, got {result}" + + +def test_get_argument_with_none_in_args(): + """Test when the argument in args is None.""" + result = get_argument((None,), {}, 0, "arg1") + assert result is None, f"Expected None, got {result}" + + +def test_get_argument_with_empty_args_and_kwargs(): + """Test when both args and kwargs are empty.""" + result = get_argument((), {}, 0, "arg1") + assert result is None, f"Expected None, got {result}" + + +def test_get_argument_with_multiple_kwargs(): + """Test when multiple kwargs are provided.""" + result = get_argument((), {"arg1": "value1", "arg2": "value2"}, 0, "arg1") + assert result == "value1", f"Expected 'value1', got {result}" + + +def test_get_argument_with_positional_index_and_kwargs(): + """Test when both args and kwargs are provided, with positional index.""" + result = get_argument(("value2", "value3"), {"arg1": "value1"}, 0, "arg1") + assert result == "value1", f"Expected 'value1', got {result}" From 966b85b00acaf3b326b4324002ee0a2c736c7a78 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 5 Nov 2024 13:06:28 +0100 Subject: [PATCH 2/2] Update subprocess to wrap constructor of subprocess.Popen --- aikido_zen/sinks/subprocess.py | 32 +++++++++++++++++------ aikido_zen/sinks/tests/subprocess_test.py | 11 +++++--- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/aikido_zen/sinks/subprocess.py b/aikido_zen/sinks/subprocess.py index 6dfda5fc..92c8b0a6 100644 --- a/aikido_zen/sinks/subprocess.py +++ b/aikido_zen/sinks/subprocess.py @@ -5,8 +5,10 @@ import copy import aikido_zen.importhook as importhook import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument -SUBPROCESS_OPERATIONS = ["call", "run", "check_call", "Popen", "check_output"] +SUBPROCESS_OPERATIONS = ["check_output"] +# check_call, call, and run all call Popen class def generate_aikido_function(op, former_func): @@ -15,25 +17,31 @@ def generate_aikido_function(op, former_func): an operation and a former function """ - def aikido_new_func(args, *arguments, op=op, former_func=former_func, **kwargs): + def aikido_new_func(*args, op=op, former_func=former_func, **kwargs): shell_enabled = kwargs.get("shell") + + position = ( + 1 if op == "Popen" else 0 + ) # If it's a constructor, first argument is self + shell_arguments = get_argument(args, kwargs, pos=position, name="args") + command = None - if isinstance(args, str): - command = args - elif hasattr(args, "__iter__"): + if isinstance(shell_arguments, str): + command = shell_arguments + elif hasattr(shell_arguments, "__iter__"): # Check if args is an iterable i.e. list, dict, tuple # If it is we join it with spaces to run the shell_injection algorithm. - command = " ".join(args) + command = " ".join(shell_arguments) # For all operations above: call, run, check_call, Popen, check_output, default value # of the shell property is False. - if command and shell_enabled == True: + if command and shell_enabled: vulns.run_vulnerability_scan( kind="shell_injection", op=f"subprocess.{op}", args=(command,), ) - return former_func(args, *arguments, **kwargs) + return former_func(*args, **kwargs) return aikido_new_func @@ -53,4 +61,12 @@ def on_subprocess_import(subprocess): generate_aikido_function(op=op, former_func=former_func), ) + # Wrap Class Popen seperately: + former_popen_constructor = copy.deepcopy(subprocess.Popen.__init__) + setattr( + getattr(modified_subprocess, "Popen"), + "__init__", # Popen is a class, modify it's constructor + generate_aikido_function(op="Popen", former_func=former_popen_constructor), + ) + return modified_subprocess diff --git a/aikido_zen/sinks/tests/subprocess_test.py b/aikido_zen/sinks/tests/subprocess_test.py index 48862734..a155725b 100644 --- a/aikido_zen/sinks/tests/subprocess_test.py +++ b/aikido_zen/sinks/tests/subprocess_test.py @@ -11,7 +11,7 @@ def test_subprocess_call(): ) as mock_run_vulnerability_scan: import subprocess - op = "subprocess.call" + op = "subprocess.Popen" subprocess.call(["ls", "-la"], shell=True) args = ("ls -la",) @@ -51,7 +51,7 @@ def test_subprocess_run(): ) as mock_run_vulnerability_scan: import subprocess - op = "subprocess.run" + op = "subprocess.Popen" subprocess.run(["ls", "-la"], shell=True) args = ("ls -la",) @@ -85,7 +85,7 @@ def test_subprocess_check_call(): ) as mock_run_vulnerability_scan: import subprocess - op = "subprocess.check_call" + op = "subprocess.Popen" subprocess.check_call(["ls", "-la"], shell=True) args = ("ls -la",) @@ -134,10 +134,13 @@ def test_subprocess_popen(): args = ("ls -la",) mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) - subprocess.Popen(args="ls -la", shell=True) + process = subprocess.Popen(args="ls -la", shell=True) args = ("ls -la",) mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + process.kill() # Test class functions + process.pid # Access a class attribute + def test_subprocess_popen_not_called(): with patch(