Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Sep 25, 2023
1 parent 64cab37 commit 2491819
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 0 deletions.
37 changes: 37 additions & 0 deletions acto/post_process/post_chain_inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@


import glob
from acto.common import ErrorResult
from acto.lib.operator_config import OperatorConfig
from acto.post_process.post_process import PostProcessor
from acto.utils.thread_logger import get_thread_logger


class ChainInputs(PostProcessor):

def __init__(self, testrun_dir: str, config: OperatorConfig, ignore_invalid: bool = False, acto_namespace: int = 0):
self.acto_namespace = acto_namespace
super().__init__(testrun_dir, config)

self.all_inputs = []
for trial, steps in self.trial_to_steps.items():
for step in steps.values():
invalid, _ = step.runtime_result.is_invalid()
if invalid and not ignore_invalid:
continue
if not step.runtime_result.is_pass():
continue
self.all_inputs.append({
'trial': trial,
'gen': step.gen,
'input': step.input,
'input_digest': step.input_digest,
'operator_log': step.operator_log,
'system_state': step.system_state,
'cli_output': step.cli_output,
'runtime_result': step.runtime_result
})




117 changes: 117 additions & 0 deletions performance_measurement/measure_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import argparse
import dataclasses
from datetime import datetime
import glob
import json
import logging
import os

import sys

import yaml
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')))
from performance_measurement.measure_runner import MeasurementRunner
from acto.utils.preprocess import process_crd
from acto.reproduce import load_cr_from_trial
from acto.lib.operator_config import OperatorConfig
from acto.kubernetes_engine import kind
from acto.kubectl_client.kubectl import KubectlClient
from acto.deploy import Deploy
from acto.constant import CONST
from acto.common import kubernetes_client
from acto import utils
from acto.post_process.post_chain_inputs import ChainInputs

def load_inputs_from_dir(dir: str) -> list:
inputs = []
files = glob.glob(f"{dir}/mutated-*.yaml")
files.sort()
logging.info(f"Loading {len(files)} inputs from {dir}")
for file in files:
with open(file, "r") as f:
inputs.append(yaml.load(f, Loader=yaml.FullLoader))
return inputs

if __name__ == "__main__":
workdir_path = "testrun-%s" % datetime.now().strftime("%Y-%m-%d-%H-%M")

parser = argparse.ArgumentParser(
description="Collecting Performance for k8s/openshift Operators Workload")
parser.add_argument("--workdir",
dest="workdir_path",
type=str,
default=workdir_path,
help="Working directory")
parser.add_argument("--input-dir",
dest="input_dir",
required=True,
help="The directory of the trial folder to reproduce. CR files should have names starting with 'mutated-'")
parser.add_argument("--config", "-c", dest="config",
help="Operator port config path")
args = parser.parse_args()

os.makedirs(args.workdir_path, exist_ok=True)
logging.basicConfig(
filename=os.path.join(args.workdir_path, 'test.log'),
level=logging.DEBUG,
filemode='w',
format='%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s')
logging.getLogger("kubernetes").setLevel(logging.ERROR)
logging.getLogger("sh").setLevel(logging.ERROR)

# parse the inputs
with open(args.config, "r") as config_file:
config = json.load(config_file)
if "monkey_patch" in config:
del config["monkey_patch"]
config = OperatorConfig(**config)

# prepare workloads
# workloads = load_cr_from_trial(args.input_dir)
workloads = load_inputs_from_dir(dir="testrun-2023-09-25-02-58/inputs")

gen = 0
all_inputs = ChainInputs(testrun_dir=args.input_dir, config=config).all_inputs
os.makedirs(f"{args.workdir_path}/inputs", exist_ok=True)
for input in all_inputs:
with open(f"{args.workdir_path}/inputs/mutated-{gen:03d}.yaml", "w") as f:
yaml.dump(input["input"], f)
# workloads.append(input["input"])
gen += 1

# start the k8s cluster
kubeconfig = os.path.join(os.path.expanduser("~"), ".kube", "anvil")
cluster = kind.Kind(acto_namespace=0)
cluster.configure_cluster(config.num_nodes, config.kubernetes_version)
cluster.create_cluster(name="anvil", kubeconfig=kubeconfig)

# deploy the operator
context_name = cluster.get_context_name(f"anvil")
deploy = Deploy(config.deploy.method, config.deploy.file,
config.deploy.init).new()
namespace = utils.get_yaml_existing_namespace(
config.deploy.file) or CONST.ACTO_NAMESPACE
deployed = deploy.deploy_with_retry(kubeconfig, context_name, namespace)
if not deployed:
logging.info('Not deployed. Try again!')

# run the workload
crd = process_crd(kubernetes_client(kubeconfig, context_name),
KubectlClient(kubeconfig, context_name), config.crd_name)

trial_dir = f"{args.workdir_path}/trial"
os.makedirs(trial_dir, exist_ok=True)
runner = MeasurementRunner(
namespace, crd, trial_dir, kubeconfig, context_name)
gen = 0
for workload in workloads:
measurement_result = runner.run(
workload, MeasurementRunner.wait_for_zk_spec, MeasurementRunner.wait_for_pod_ready, gen)
if measurement_result is not None:
measurement_result_file = f"{trial_dir}/measurement_result_{gen:03d}.json"
with open(measurement_result_file, "w") as f:
json.dump(dataclasses.asdict(measurement_result), f)
gen += 1

# collect the performance data
Loading

0 comments on commit 2491819

Please sign in to comment.