-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
testing_utils.py
306 lines (251 loc) · 11.1 KB
/
testing_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import logging
import os
import platform
import subprocess
import tempfile
from pathlib import Path
from threading import Thread
from typing import Callable, List, Optional, cast
from collections import namedtuple
from subprocess import Popen, PIPE, TimeoutExpired
from queue import Queue
from samcli import __version__ as SAM_CLI_VERSION
import shutil
from uuid import uuid4
import boto3
import psutil
import docker
import packaging.version
RUNNING_ON_APPVEYOR = os.environ.get("APPVEYOR", False)
IS_WINDOWS = platform.system().lower() == "windows"
RUNNING_ON_GITHUB_ACTIONS = os.environ.get("CI", False)
RUNNING_ON_CI = RUNNING_ON_APPVEYOR or RUNNING_ON_GITHUB_ACTIONS
RUNNING_TEST_FOR_MASTER_ON_CI = (
os.environ.get("APPVEYOR_REPO_BRANCH", os.environ.get("GITHUB_REF_NAME", "master")) != "master"
)
CI_OVERRIDE = os.environ.get("APPVEYOR_CI_OVERRIDE", False) or os.environ.get("CI_OVERRIDE", False)
RUN_BY_CANARY = os.environ.get("BY_CANARY", False)
# Tests require docker suffers from Docker Hub request limit
SKIP_DOCKER_TESTS = RUNNING_ON_CI and not RUN_BY_CANARY
# Set to True temporarily if the integration tests require updated build images
# Build images aren't published until after the CLI is released
# The CLI integration tests thus cannot succeed if they require new build images (chicken-egg problem)
SKIP_DOCKER_BUILD = False
SKIP_DOCKER_MESSAGE = "Skipped Docker test: running on CI not in canary or new build images are required"
LOG = logging.getLogger(__name__)
CommandResult = namedtuple("CommandResult", "process stdout stderr")
TIMEOUT = 600
CFN_PYTHON_VERSION_SUFFIX = os.environ.get("PYTHON_VERSION", "0.0.0").replace(".", "-")
def get_sam_command():
windows_bin_path = os.getenv("SAM_WINDOWS_BINARY_PATH")
if windows_bin_path:
return windows_bin_path
return "samdev" if os.getenv("SAM_CLI_DEV") else "sam"
def method_to_stack_name(method_name):
"""Method expects method name which can be a full path. Eg: test.integration.test_deploy_command.method_name"""
method_name = method_name.split(".")[-1]
stack_name = f"{method_name.replace('_', '-')}-{CFN_PYTHON_VERSION_SUFFIX}-{uuid4().hex}"
if not stack_name.startswith("test"):
stack_name = f"test-{stack_name}"
return stack_name[:128]
def run_command(command_list, cwd=None, env=None, timeout=TIMEOUT) -> CommandResult:
LOG.info("Running command: %s", " ".join(command_list))
process_execute = Popen(command_list, cwd=cwd, env=env, stdout=PIPE, stderr=PIPE)
try:
stdout_data, stderr_data = process_execute.communicate(timeout=timeout)
LOG.info(f"Stdout: {stdout_data.decode('utf-8')}")
LOG.info(f"Stderr: {stderr_data.decode('utf-8')}")
return CommandResult(process_execute, stdout_data, stderr_data)
except TimeoutExpired:
LOG.error(f"Command: {command_list}, TIMED OUT")
LOG.error(f"Return Code: {process_execute.returncode}")
process_execute.kill()
raise
def run_command_with_input(command_list, stdin_input, timeout=TIMEOUT, cwd=None, env=None) -> CommandResult:
LOG.info("Running command: %s", " ".join(command_list))
LOG.info("With input: %s", stdin_input)
process_execute = Popen(command_list, cwd=cwd, env=env, stdout=PIPE, stderr=PIPE, stdin=PIPE)
try:
stdout_data, stderr_data = process_execute.communicate(stdin_input, timeout=timeout)
LOG.info(f"Stdout: {stdout_data.decode('utf-8')}")
LOG.info(f"Stderr: {stderr_data.decode('utf-8')}")
return CommandResult(process_execute, stdout_data, stderr_data)
except TimeoutExpired:
LOG.error(f"Command: {command_list}, TIMED OUT")
LOG.error(f"Return Code: {process_execute.returncode}")
process_execute.kill()
raise
def run_command_with_inputs(command_list: List[str], inputs: List[str], timeout=TIMEOUT) -> CommandResult:
return run_command_with_input(command_list, ("\n".join(inputs) + "\n").encode(), timeout)
def start_persistent_process(
command_list: List[str],
cwd: Optional[str] = None,
) -> Popen:
"""Start a process with parameters that are suitable for persistent execution."""
return Popen(
command_list,
stdout=PIPE,
stderr=subprocess.STDOUT,
stdin=PIPE,
encoding="utf-8",
bufsize=1,
cwd=cwd,
)
def kill_process(process: Popen) -> None:
"""Kills a process and it's children.
This loop ensures orphaned children are killed as well.
https://psutil.readthedocs.io/en/latest/#kill-process-tree
Raises ValueError if some processes are alive"""
root_process = psutil.Process(process.pid)
all_processes = root_process.children(recursive=True)
all_processes.append(root_process)
for process_to_kill in all_processes:
try:
process_to_kill.kill()
except psutil.NoSuchProcess:
pass
_, alive = psutil.wait_procs(all_processes, timeout=10)
if alive:
raise ValueError(f"Processes: {alive} are still alive.")
def read_until_string(process: Popen, expected_output: str, timeout: int = 30) -> None:
"""Read output from process until a line equals to expected_output has shown up or reaching timeout.
Throws TimeoutError if times out
"""
def _compare_output(output, _: List[str]) -> bool:
return bool(expected_output in output)
try:
read_until(process, _compare_output, timeout)
except TimeoutError as ex:
expected_output_bytes = expected_output.encode("utf-8")
raise TimeoutError(
f"Did not get expected output after {timeout} seconds. Expected output: {expected_output_bytes!r}"
) from ex
def read_until(process: Popen, callback: Callable[[str, List[str]], bool], timeout: int = 5):
"""Read output from process until callback returns True or timeout is reached
Parameters
----------
process : Popen
callback : Callable[[str, List[str]], None]
Call when a new line is read from the process.
timeout : int, optional
By default 5
Raises
------
TimeoutError
Raises when timeout is reached
"""
result_queue: Queue = Queue()
def _read_output():
try:
outputs = list()
for output in process.stdout:
outputs.append(output)
LOG.info(output.encode("utf-8"))
if callback(output, outputs):
result_queue.put(True)
return
except Exception as ex:
result_queue.put(ex)
reading_thread = Thread(target=_read_output, daemon=True)
reading_thread.start()
reading_thread.join(timeout=timeout)
if reading_thread.is_alive():
raise TimeoutError(f"Did not get expected output after {timeout} seconds.")
if result_queue.qsize() > 0:
result = result_queue.get()
if isinstance(result, Exception):
raise result
else:
raise ValueError()
class FileCreator(object):
def __init__(self):
self.rootdir = tempfile.mkdtemp()
def remove_all(self):
if os.path.exists(self.rootdir):
shutil.rmtree(self.rootdir)
def create_file(self, filename, contents, mtime=None, mode="w"):
"""Creates a file in a tmpdir
``filename`` should be a relative path, e.g. "foo/bar/baz.txt"
It will be translated into a full path in a tmp dir.
If the ``mtime`` argument is provided, then the file's
mtime will be set to the provided value (must be an epoch time).
Otherwise the mtime is left untouched.
``mode`` is the mode the file should be opened either as ``w`` or
`wb``.
Returns the full path to the file.
"""
full_path = os.path.join(self.rootdir, filename)
if not os.path.isdir(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
with open(full_path, mode) as f:
f.write(contents)
current_time = os.path.getmtime(full_path)
# Subtract a few years off the last modification date.
os.utime(full_path, (current_time, current_time - 100000000))
if mtime is not None:
os.utime(full_path, (mtime, mtime))
return full_path
def append_file(self, filename, contents):
"""Append contents to a file
``filename`` should be a relative path, e.g. "foo/bar/baz.txt"
It will be translated into a full path in a tmp dir.
Returns the full path to the file.
"""
full_path = os.path.join(self.rootdir, filename)
if not os.path.isdir(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
with open(full_path, "a") as f:
f.write(contents)
return full_path
def full_path(self, filename):
"""Translate relative path to full path in temp dir.
f.full_path('foo/bar.txt') -> /tmp/asdfasd/foo/bar.txt
"""
return os.path.join(self.rootdir, filename)
def _get_current_account_id():
sts = boto3.client("sts")
account_id = sts.get_caller_identity()["Account"]
return account_id
def strip_nightly_installer_suffix(request: dict, metric_type: str):
"""
If it's a nightly release version, it will have a suffix.
We can strip it for the purpose of testing telemetry.
"""
metrics = request.get("data", {}).get("metrics", [])
if not metrics:
return
version = metrics[0].get(metric_type, {}).get("samcliVersion", "")
if version:
request["data"]["metrics"][0][metric_type]["samcliVersion"] = version[: len(SAM_CLI_VERSION)]
class UpdatableSARTemplate:
"""
This class is used to replace the `${AWS::AccountId}` in the testing templates with the account id for the testing
is used during the integration testing. This class helps to resolve the problem that SAM CLI does not support Sub
intrinsic function, and to avoid exposing any of our testing accounts ids.
"""
def __init__(self, source_template_path):
self.source_template_path = source_template_path
self.temp_directory = tempfile.TemporaryDirectory()
self.temp_directory_path = Path(tempfile.TemporaryDirectory().name)
self.updated_template_path = None
def setup(self):
with open(self.source_template_path, "r") as sar_template:
updated_template_content = sar_template.read()
updated_template_content = updated_template_content.replace("${AWS::AccountId}", _get_current_account_id())
self.temp_directory_path.mkdir()
self.updated_template_path = os.path.join(self.temp_directory_path, "template.yml")
with open(self.updated_template_path, "w") as updated_template:
updated_template.write(updated_template_content)
def clean(self):
self.temp_directory.cleanup()
def __enter__(self):
self.setup()
return self
def __exit__(self, *args):
self.clean()
def _version_gte(version1: str, version2: str) -> bool:
v1 = packaging.version.parse(version1)
v2 = packaging.version.parse(version2)
return v1 >= v2
def get_docker_version() -> str:
return cast(str, docker.from_env().info().get("ServerVersion", "0.0.0"))