Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more coverage for path traversal: #248

Merged
merged 13 commits into from
Nov 5, 2024
10 changes: 9 additions & 1 deletion aikido_zen/context/init_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import pytest
import pickle
import json
from aikido_zen.context import Context, get_current_context
from aikido_zen.context import Context, get_current_context, current_context


@pytest.fixture(autouse=True)
def run_around_tests():
yield
# Make sure to reset context after every test so it does not
# interfere with other tests
current_context.set(None)


def test_get_current_context_no_context():
Expand Down
26 changes: 8 additions & 18 deletions aikido_zen/sinks/os.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@
"open",
]

# os.path.func(...) functions, can have a filename and destination.
OS_PATH_FUNCTIONS = [
"realpath",
"getsize",
]
# os.path.join(path, *paths) is not wrapped
# os.path.realpath, os.path.abspath aren't wrapped, since they use os.path.join
OS_PATH_FUNCTIONS = ["getsize", "join", "expanduser", "expandvars"]

# os.makedirs() is not wrapped since it uses os.mkdir() which we wrap
# os.path.exists() and functions alike are not wrapped for performance reasons.
# We also don't wrap the stat library : https://docs.python.org/3/library/stat.html
Expand All @@ -43,18 +40,11 @@ def generate_aikido_function(op, former_func):
"""

def aikido_new_func(*args, op=op, former_func=former_func, **kwargs):
if len(args) > 0 and isinstance(
args[0], (str, bytes, PurePath)
): # args[0] : filename
vulns.run_vulnerability_scan(
kind="path_traversal", op=f"os.{op}", args=(args[0],)
)
if len(args) > 1 and isinstance(
args[1], (str, bytes, PurePath)
): # args[1] : Could be dst folder
vulns.run_vulnerability_scan(
kind="path_traversal", op=f"os.{op}", args=(args[1],)
)
for arg in args:
if isinstance(arg, (str, bytes, PurePath)):
vulns.run_vulnerability_scan(
kind="path_traversal", op=f"os.{op}", args=(arg,)
)
return former_func(*args, **kwargs)

return aikido_new_func
Expand Down
6 changes: 4 additions & 2 deletions aikido_zen/sinks/tests/builtins_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def test_open():
with pytest.raises(FileNotFoundError):
open(path)
args = (path,)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

path2 = PurePath("./test", "/test.py")
with pytest.raises(FileNotFoundError):
open(path2)
args = (path2,)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)


def test_open_with_builtins_import():
Expand Down
8 changes: 5 additions & 3 deletions aikido_zen/sinks/tests/motor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ async def test_replace_one(db):
await dogs.replace_one(filter, repl)

called_with = mock_run_vulnerability_scan.call_args[1]
assert called_with["args"][0] == filter
assert called_with["op"] == "pymongo.collection.Collection.replace_one"
assert called_with["kind"] == "nosql_injection"
mock_run_vulnerability_scan.assert_any_call(
args=({"dog_name": "test"},),
op="pymongo.collection.Collection.replace_one",
kind="nosql_injection",
)


@pytest.mark.asyncio
Expand Down
4 changes: 2 additions & 2 deletions aikido_zen/sinks/tests/os_system_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ def test_osdotpopen_commands():
os.popen("Test command")

args = ("Test command",)
mock_run_vulnerability_scan.assert_called_with(
mock_run_vulnerability_scan.assert_any_call(
kind=kind, op="subprocess.Popen", args=args
)

os.popen("ls -la | grep 'test'")
args = ("ls -la | grep 'test'",)
mock_run_vulnerability_scan.assert_called_with(
mock_run_vulnerability_scan.assert_any_call(
kind=kind, op="subprocess.Popen", args=args
)
147 changes: 139 additions & 8 deletions aikido_zen/sinks/tests/os_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,159 @@ def test_ospath_commands():
import os

os.path.realpath("test/test2")
op = "os.path.realpath"
op = "os.path.join"
args = ("test/test2",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

with pytest.raises(FileNotFoundError):
os.path.getsize("aqkqjefbkqlleq_qkvfjksaicuaviel")
op = "os.path.getsize"
args = ("aqkqjefbkqlleq_qkvfjksaicuaviel",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)
op = "os.path.getsize"
args = ("aqkqjefbkqlleq_qkvfjksaicuaviel",)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

os.path.realpath(b"te2st/test2")
op = "os.path.realpath"
op = "os.path.join"
args = (b"te2st/test2",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

path1 = Path("./test", "test2", "test3")
os.path.realpath(path1)
path1 = Path("./", "../", "test/../test2")
with pytest.raises(FileNotFoundError):
os.path.getsize(path1)

op = "os.path.realpath"
op = "os.path.getsize"
args = (path1,)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)


def test_ospath_command_absolute_path():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
import os

os.path.abspath("../test/test2")
op = "os.path.join"
args = ("../test/test2",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

path1 = Path("./test", "test2", "test3")
os.path.abspath(path1)

op = "os.path.join"
args = ("test/test2/test3",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_ospath_expanduser():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
import os

os.path.expanduser("../test/test2")
op = "os.path.expanduser"
args = ("../test/test2",)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

path1 = Path("./test", "test2", "test3")
os.path.expanduser(path1)

op = "os.path.expanduser"
args = (path1,)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)


def test_ospath_expandvars():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
import os

os.path.expandvars("../test/test2")
op = "os.path.expandvars"
args = ("../test/test2",)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

path1 = Path("./test", "test2", "test3")
os.path.expandvars(path1)

op = "os.path.expandvars"
args = (path1,)
# Need to use assert_any_call, since python 3.12 it uses os.path.join
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)


def test_ospath_join():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
import os

os.path.join("../", "/etc/passwd", "..")
op = "os.path.join"
args1 = ("../",)
args2 = ("/etc/passwd",)
args3 = ("..",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args1)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args2)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args3)


@patch("aikido_zen.vulnerabilities.run_vulnerability_scan")
def test_ospath_join_bytes(mock_run_vulnerability_scan):
import os

op = "os.path.join"

# Test with bytes arguments
os.path.join(b"../", b"/etc/passwd", b"..")
args1 = (b"../",)
args2 = (b"/etc/passwd",)
args3 = (b"..",)

mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args1)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args2)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args3)


@patch("aikido_zen.vulnerabilities.run_vulnerability_scan")
def test_ospath_join_path_objects(mock_run_vulnerability_scan):
import os

op = "os.path.join"

# Test with Path objects
os.path.join(Path("../"), Path("/etc/passwd"), Path(".."))
args1 = (Path("../"),)
args2 = (Path("/etc/passwd"),)
args3 = (Path(".."),)

mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args1)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args2)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args3)


@patch("aikido_zen.vulnerabilities.run_vulnerability_scan")
def test_ospath_join_mixed_paths(mock_run_vulnerability_scan):
import os

op = "os.path.join"

# Test with mixed strings and Path objects
os.path.join("../", Path("/etc/passwd"), "..")
args1 = ("../",)
args2 = (Path("/etc/passwd"),)
args3 = ("..",)

mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args1)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args2)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args3)


def test_os_commands():
with patch(
Expand Down
13 changes: 11 additions & 2 deletions aikido_zen/sinks/tests/requests_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import pytest
from aikido_zen.context import Context
from aikido_zen.thread.thread_cache import ThreadCache
from aikido_zen.context import Context, current_context
from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage
from aikido_zen.errors import AikidoSSRF
from aikido_zen.background_process.comms import reset_comms
import aikido_zen.sinks.socket
Expand All @@ -17,6 +17,15 @@
CROSS_DOMAIN_TEST_DOMAIN_TWICE = "http://firewallssrfredirects-env-2.eba-7ifve22q.eu-north-1.elasticbeanstalk.com/ssrf-test-domain-twice"


@pytest.fixture(autouse=True)
def run_around_tests():
yield
# Make sure to reset context and cache after every test so it does not
# interfere with other tests
current_context.set(None)
setattr(threadlocal_storage, "cache", None)


def set_context_and_lifecycle(url):
wsgi_request = {
"REQUEST_METHOD": "GET",
Expand Down
Loading
Loading