Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIK-3833 Don't send over data if "started" event has not been sent yet #258

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, block, api, token, serverless):
self.hostnames = Hostnames(200)
self.conf = ServiceConfig(
endpoints=[],
last_updated_at=get_unixtime_ms(),
last_updated_at=-1, # Has not been updated yet
blocked_uids=[],
bypassed_ips=[],
received_any_stats=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def on_start(connection_manager):
connection_manager.timeout_in_sec,
)
if not res.get("success", True):
# Update config time even in failure :
connection_manager.conf.last_updated_at = get_unixtime_ms()
logger.error("Failed to communicate with Aikido Server : %s", res["error"])
else:
connection_manager.update_service_config(res)
Expand Down
16 changes: 9 additions & 7 deletions aikido_zen/background_process/commands/sync_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ def process_sync_data(connection_manager, data, conn, queue=None):
update_route_info(route["apispec"], existing_route)

connection_manager.statistics.requests["total"] += data.get("reqs", 0)

return {
"routes": dict(connection_manager.routes.routes),
"endpoints": connection_manager.conf.endpoints,
"bypassed_ips": connection_manager.conf.bypassed_ips,
"blocked_uids": connection_manager.conf.blocked_uids,
}
if connection_manager.conf.last_updated_at > 0:
# Only report data if the config has been fetched.
return {
"routes": dict(connection_manager.routes.routes),
"endpoints": connection_manager.conf.endpoints,
"bypassed_ips": connection_manager.conf.bypassed_ips,
"blocked_uids": connection_manager.conf.blocked_uids,
}
return {}
49 changes: 49 additions & 0 deletions aikido_zen/background_process/commands/sync_data_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from multiprocessing.forkserver import connect_to_new_process

import pytest
from unittest.mock import MagicMock
from .sync_data import process_sync_data
Expand All @@ -12,6 +14,7 @@ def setup_connection_manager():
connection_manager.conf.endpoints = ["endpoint1", "endpoint2"]
connection_manager.conf.bypassed_ips = ["192.168.1.1"]
connection_manager.conf.blocked_uids = ["user1", "user2"]
connection_manager.conf.last_updated_at = 200
connection_manager.statistics.requests = {"total": 0} # Initialize total requests
return connection_manager

Expand Down Expand Up @@ -64,6 +67,52 @@ def test_process_sync_data_initialization(setup_connection_manager):
assert result["blocked_uids"] == connection_manager.conf.blocked_uids


def test_process_sync_data_with_last_updated_at_below_zero(setup_connection_manager):
"""Test the initialization of routes and hits update."""
connection_manager = setup_connection_manager
connection_manager.conf.last_updated_at = -1
data = {
"current_routes": {
"route1": {
"method": "GET",
"path": "/api/v1/resource",
"hits_delta_since_sync": 5,
"apispec": {"info": "API spec for resource"},
},
"route2": {
"method": "POST",
"path": "/api/v1/resource",
"hits_delta_since_sync": 3,
"apispec": {"info": "API spec for resource"},
},
},
"reqs": 10, # Total requests to be added
}

result = process_sync_data(connection_manager, data, None)

# Check that routes were initialized correctly
assert len(connection_manager.routes) == 2
assert (
connection_manager.routes.get({"method": "GET", "route": "/api/v1/resource"})[
"hits"
]
== 5
)
assert (
connection_manager.routes.get({"method": "POST", "route": "/api/v1/resource"})[
"hits"
]
== 3
)

# Check that the total requests were updated
assert connection_manager.statistics.requests["total"] == 10

# Check that the return value is correct
assert result == {}


def test_process_sync_data_existing_route(setup_connection_manager):
"""Test updating an existing route's hit count."""
connection_manager = setup_connection_manager
Expand Down
12 changes: 7 additions & 5 deletions aikido_zen/thread/thread_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import aikido_zen.background_process.comms as comms
import aikido_zen.helpers.get_current_unixtime_ms as t
from aikido_zen.background_process.routes import Routes
from aikido_zen.helpers.logging import logger

THREAD_CONFIG_TTL_MS = 60 * 1000 # Time-To-Live is 60 seconds for the thread cache

Expand Down Expand Up @@ -72,18 +73,19 @@ def renew(self):
)

self.reset()
if res["success"]:
if isinstance(res["data"]["bypassed_ips"], set):
if res["success"] and res["data"]:
if isinstance(res["data"].get("bypassed_ips"), set):
self.bypassed_ips = res["data"]["bypassed_ips"]
if isinstance(res["data"]["endpoints"], list):
if isinstance(res["data"].get("endpoints"), list):
self.endpoints = res["data"]["endpoints"]
if isinstance(res["data"]["routes"], dict):
if isinstance(res["data"].get("routes"), dict):
self.routes.routes = res["data"]["routes"]
for route in self.routes.routes.values():
route["hits_delta_since_sync"] = 0
if isinstance(res["data"]["blocked_uids"], set):
if isinstance(res["data"].get("blocked_uids"), set):
self.blocked_uids = res["data"]["blocked_uids"]
self.last_renewal = t.get_unixtime_ms(monotonic=True)
logger.debug("Renewed thread cache")

def increment_stats(self):
"""Increments the requests"""
Expand Down
19 changes: 19 additions & 0 deletions end2end/flask_mysql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,22 @@ def test_dangerous_response_without_firewall():
res = requests.post(base_url_nofw + "/create", data={'dog_name': dog_name})
assert res.status_code == 200

def test_ratelimiting_1_route():
# First request :
res = requests.get(base_url_fw + "/test_ratelimiting_1")
assert res.status_code == 200
# Second request :
res = requests.get(base_url_fw + "/test_ratelimiting_1")
assert res.status_code == 200
# Third request :
res = requests.get(base_url_fw + "/test_ratelimiting_1")
assert res.status_code == 429
# Fourth request :
res = requests.get(base_url_fw + "/test_ratelimiting_1")
assert res.status_code == 429

time.sleep(5) # Wait until window expires

# Fifth request :
res = requests.get(base_url_fw + "/test_ratelimiting_1")
assert res.status_code == 200
16 changes: 15 additions & 1 deletion end2end/server/mock_aikido_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@
responses = {
"config": {
"receivedAnyStats": False,
"success": True
"success": True,
"endpoints": [
{
"route": "/test_ratelimiting_1",
"method": "*",
"forceProtectionOff": False,
"rateLimiting": {
"enabled": True,
"maxRequests": 2,
"windowSizeInMS": 1000 * 5,
},
"graphql": False,
}
],
"blockedUserIds": [],
},
"configUpdatedAt": {},
}
Expand Down
5 changes: 5 additions & 0 deletions sample-apps/flask-mysql/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,8 @@ def make_request():
def execute_command_get(command):
result = subprocess.run(command, capture_output=True, text=True, shell=True)
return str(result.stdout)

# End2End Test route :
@app.route("/test_ratelimiting_1", methods=["GET"])
def test_ratelimiting_1():
return "OK"