Skip to content

Commit

Permalink
Update subprocess to wrap constructor of subprocess.Popen
Browse files Browse the repository at this point in the history
  • Loading branch information
Wout Feys authored and bitterpanda63 committed Nov 5, 2024
1 parent c724fea commit 11ebd81
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
32 changes: 24 additions & 8 deletions aikido_zen/sinks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import copy
import aikido_zen.importhook as importhook
import aikido_zen.vulnerabilities as vulns
from aikido_zen.helpers.get_argument import get_argument

SUBPROCESS_OPERATIONS = ["call", "run", "check_call", "Popen", "check_output"]
SUBPROCESS_OPERATIONS = ["check_output"]
# check_call, call, and run all call Popen class


def generate_aikido_function(op, former_func):
Expand All @@ -15,25 +17,31 @@ def generate_aikido_function(op, former_func):
an operation and a former function
"""

def aikido_new_func(args, *arguments, op=op, former_func=former_func, **kwargs):
def aikido_new_func(*args, op=op, former_func=former_func, **kwargs):
shell_enabled = kwargs.get("shell")

position = (
1 if op == "Popen" else 0
) # If it's a constructor, first argument is self
shell_arguments = get_argument(args, kwargs, pos=position, name="args")

command = None
if isinstance(args, str):
command = args
elif hasattr(args, "__iter__"):
if isinstance(shell_arguments, str):
command = shell_arguments
elif hasattr(shell_arguments, "__iter__"):
# Check if args is an iterable i.e. list, dict, tuple
# If it is we join it with spaces to run the shell_injection algorithm.
command = " ".join(args)
command = " ".join(shell_arguments)

# For all operations above: call, run, check_call, Popen, check_output, default value
# of the shell property is False.
if command and shell_enabled == True:
if command and shell_enabled:
vulns.run_vulnerability_scan(
kind="shell_injection",
op=f"subprocess.{op}",
args=(command,),
)
return former_func(args, *arguments, **kwargs)
return former_func(*args, **kwargs)

return aikido_new_func

Expand All @@ -53,4 +61,12 @@ def on_subprocess_import(subprocess):
generate_aikido_function(op=op, former_func=former_func),
)

# Wrap Class Popen seperately:
former_popen_constructor = copy.deepcopy(subprocess.Popen.__init__)
setattr(
getattr(modified_subprocess, "Popen"),
"__init__", # Popen is a class, modify it's constructor
generate_aikido_function(op="Popen", former_func=former_popen_constructor),
)

return modified_subprocess
11 changes: 7 additions & 4 deletions aikido_zen/sinks/tests/subprocess_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_subprocess_call():
) as mock_run_vulnerability_scan:
import subprocess

op = "subprocess.call"
op = "subprocess.Popen"

subprocess.call(["ls", "-la"], shell=True)
args = ("ls -la",)
Expand Down Expand Up @@ -51,7 +51,7 @@ def test_subprocess_run():
) as mock_run_vulnerability_scan:
import subprocess

op = "subprocess.run"
op = "subprocess.Popen"

subprocess.run(["ls", "-la"], shell=True)
args = ("ls -la",)
Expand Down Expand Up @@ -85,7 +85,7 @@ def test_subprocess_check_call():
) as mock_run_vulnerability_scan:
import subprocess

op = "subprocess.check_call"
op = "subprocess.Popen"

subprocess.check_call(["ls", "-la"], shell=True)
args = ("ls -la",)
Expand Down Expand Up @@ -134,10 +134,13 @@ def test_subprocess_popen():
args = ("ls -la",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

subprocess.Popen(args="ls -la", shell=True)
process = subprocess.Popen(args="ls -la", shell=True)
args = ("ls -la",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

process.kill() # Test class functions
process.pid # Access a class attribute


def test_subprocess_popen_not_called():
with patch(
Expand Down

0 comments on commit 11ebd81

Please sign in to comment.