Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subprocess.Popen bugfix: Regard it as a class and monkeypatch constructor #250

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions aikido_zen/helpers/get_argument.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Exports get_argument"""


def get_argument(args, kwargs, pos, name):
"""Checks kwargs and args for your argument"""
if name in kwargs:
return kwargs.get(name)
if args and len(args) > pos:
return args[pos]
return None
62 changes: 62 additions & 0 deletions aikido_zen/helpers/get_argument_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest
from .get_argument import get_argument


def test_get_argument_with_only_kwargs():
"""Test when only kwargs are provided."""
result = get_argument((), {"arg1": "value1"}, 0, "arg1")
assert result == "value1", f"Expected 'value1', got {result}"


def test_get_argument_with_only_args():
"""Test when only args are provided."""
result = get_argument(("value2",), {}, 0, "arg1")
assert result == "value2", f"Expected 'value2', got {result}"


def test_get_argument_with_args_and_kwargs():
"""Test when both args and kwargs are provided, with priority to kwargs."""
result = get_argument(("value2",), {"arg1": "value1"}, 0, "arg1")
assert result == "value1", f"Expected 'value1', got {result}"


def test_get_argument_with_positional_index():
"""Test when args are provided and a specific position is requested."""
result = get_argument(("value2", "value3"), {}, 1, "arg1")
assert result == "value3", f"Expected 'value3', got {result}"


def test_get_argument_with_positional_index_out_of_bounds():
"""Test when the positional index is out of bounds."""
result = get_argument(("value2",), {}, 1, "arg1")
assert result is None, f"Expected None, got {result}"


def test_get_argument_with_none_in_kwargs():
"""Test when the argument in kwargs is None."""
result = get_argument((), {"arg1": None}, 0, "arg1")
assert result is None, f"Expected None, got {result}"


def test_get_argument_with_none_in_args():
"""Test when the argument in args is None."""
result = get_argument((None,), {}, 0, "arg1")
assert result is None, f"Expected None, got {result}"


def test_get_argument_with_empty_args_and_kwargs():
"""Test when both args and kwargs are empty."""
result = get_argument((), {}, 0, "arg1")
assert result is None, f"Expected None, got {result}"


def test_get_argument_with_multiple_kwargs():
"""Test when multiple kwargs are provided."""
result = get_argument((), {"arg1": "value1", "arg2": "value2"}, 0, "arg1")
assert result == "value1", f"Expected 'value1', got {result}"


def test_get_argument_with_positional_index_and_kwargs():
"""Test when both args and kwargs are provided, with positional index."""
result = get_argument(("value2", "value3"), {"arg1": "value1"}, 0, "arg1")
assert result == "value1", f"Expected 'value1', got {result}"
32 changes: 24 additions & 8 deletions aikido_zen/sinks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import copy
import aikido_zen.importhook as importhook
import aikido_zen.vulnerabilities as vulns
from aikido_zen.helpers.get_argument import get_argument

SUBPROCESS_OPERATIONS = ["call", "run", "check_call", "Popen", "check_output"]
SUBPROCESS_OPERATIONS = ["check_output"]
# check_call, call, and run all call Popen class


def generate_aikido_function(op, former_func):
Expand All @@ -15,25 +17,31 @@ def generate_aikido_function(op, former_func):
an operation and a former function
"""

def aikido_new_func(args, *arguments, op=op, former_func=former_func, **kwargs):
def aikido_new_func(*args, op=op, former_func=former_func, **kwargs):
shell_enabled = kwargs.get("shell")

position = (
1 if op == "Popen" else 0
) # If it's a constructor, first argument is self
shell_arguments = get_argument(args, kwargs, pos=position, name="args")

command = None
if isinstance(args, str):
command = args
elif hasattr(args, "__iter__"):
if isinstance(shell_arguments, str):
command = shell_arguments
elif hasattr(shell_arguments, "__iter__"):
# Check if args is an iterable i.e. list, dict, tuple
# If it is we join it with spaces to run the shell_injection algorithm.
command = " ".join(args)
command = " ".join(shell_arguments)

# For all operations above: call, run, check_call, Popen, check_output, default value
# of the shell property is False.
if command and shell_enabled == True:
if command and shell_enabled:
vulns.run_vulnerability_scan(
kind="shell_injection",
op=f"subprocess.{op}",
args=(command,),
)
return former_func(args, *arguments, **kwargs)
return former_func(*args, **kwargs)

return aikido_new_func

Expand All @@ -53,4 +61,12 @@ def on_subprocess_import(subprocess):
generate_aikido_function(op=op, former_func=former_func),
)

# Wrap Class Popen seperately:
former_popen_constructor = copy.deepcopy(subprocess.Popen.__init__)
setattr(
getattr(modified_subprocess, "Popen"),
"__init__", # Popen is a class, modify it's constructor
generate_aikido_function(op="Popen", former_func=former_popen_constructor),
)

return modified_subprocess
11 changes: 7 additions & 4 deletions aikido_zen/sinks/tests/subprocess_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_subprocess_call():
) as mock_run_vulnerability_scan:
import subprocess

op = "subprocess.call"
op = "subprocess.Popen"

subprocess.call(["ls", "-la"], shell=True)
args = ("ls -la",)
Expand Down Expand Up @@ -51,7 +51,7 @@ def test_subprocess_run():
) as mock_run_vulnerability_scan:
import subprocess

op = "subprocess.run"
op = "subprocess.Popen"

subprocess.run(["ls", "-la"], shell=True)
args = ("ls -la",)
Expand Down Expand Up @@ -85,7 +85,7 @@ def test_subprocess_check_call():
) as mock_run_vulnerability_scan:
import subprocess

op = "subprocess.check_call"
op = "subprocess.Popen"

subprocess.check_call(["ls", "-la"], shell=True)
args = ("ls -la",)
Expand Down Expand Up @@ -134,10 +134,13 @@ def test_subprocess_popen():
args = ("ls -la",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

subprocess.Popen(args="ls -la", shell=True)
process = subprocess.Popen(args="ls -la", shell=True)
args = ("ls -la",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

process.kill() # Test class functions
process.pid # Access a class attribute


def test_subprocess_popen_not_called():
with patch(
Expand Down
Loading