-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathplugin.py
238 lines (173 loc) · 6.46 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import contextlib
import os
import re
import subprocess
import time
import timeit
import attr
import pytest
class PytestDockerError(Exception):
"""Base pytest-docker exception."""
class ServiceTimeoutError(PytestDockerError):
"""
Timed out while waiting for Docker service(s).
"""
def execute(command, success_codes=(0,)):
"""Run a shell command."""
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
status = 0
except subprocess.CalledProcessError as error:
output = error.output or b""
status = error.returncode
command = error.cmd
if status not in success_codes:
raise Exception(
'Command {} returned {}: """{}""".'.format(
command, status, output.decode("utf-8")
)
)
return output
def get_docker_ip():
# When talking to the Docker daemon via a UNIX socket, route all TCP
# traffic to docker containers via the TCP loopback interface.
docker_host = os.environ.get("DOCKER_HOST", "").strip()
if not docker_host or docker_host.startswith("unix://"):
return "127.0.0.1"
match = re.match(r"^tcp://(.+?):\d+$", docker_host)
if not match:
raise ValueError('Invalid value for DOCKER_HOST: "%s".' % (docker_host,))
return match.group(1)
@pytest.fixture(scope="session")
def docker_ip():
"""Determine the IP address for TCP connections to Docker containers."""
return get_docker_ip()
@attr.s(frozen=True)
class Services:
_docker_compose = attr.ib()
_services = attr.ib(init=False, default=attr.Factory(dict))
def port_for(self, service, container_port):
"""Return the "host" port for `service` and `container_port`.
E.g. If the service is defined like this:
version: '2'
services:
httpbin:
build: .
ports:
- "8000:80"
this method will return 8000 for container_port=80.
"""
# Lookup in the cache.
cache = self._services.get(service, {}).get(container_port, None)
if cache is not None:
return cache
output = self._docker_compose.execute("port %s %d" % (service, container_port))
endpoint = output.strip().decode("utf-8")
if not endpoint:
raise ValueError(
'Could not detect port for "%s:%d".' % (service, container_port)
)
# This handles messy output that might contain warnings or other text
if len(endpoint.split("\n")) > 1:
endpoint = endpoint.split("\n")[-1]
# Usually, the IP address here is 0.0.0.0, so we don't use it.
match = int(endpoint.split(":", 1)[1])
# Store it in cache in case we request it multiple times.
self._services.setdefault(service, {})[container_port] = match
return match
def wait_until_responsive(self, check, timeout, pause, clock=timeit.default_timer):
"""Wait until a service is responsive."""
ref = clock()
now = ref
while (now - ref) < timeout:
if check():
return
time.sleep(pause)
now = clock()
raise ServiceTimeoutError("Timeout reached while waiting on service!")
def str_to_list(arg):
if isinstance(arg, (list, tuple)):
return arg
return [arg]
@attr.s(frozen=True)
class DockerComposeExecutor:
_compose_command = attr.ib()
_compose_files = attr.ib(converter=str_to_list)
_compose_project_name = attr.ib()
def execute(self, subcommand):
command = self._compose_command
for compose_file in self._compose_files:
command += ' -f "{}"'.format(compose_file)
command += ' -p "{}" {}'.format(self._compose_project_name, subcommand)
return execute(command)
@pytest.fixture(scope="session")
def docker_compose_command():
"""Docker Compose command to use, it could be either `docker-compose`
for Docker Compose v1 or `docker compose` for Docker Compose
v2."""
return "docker-compose"
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
"""Get an absolute path to the `docker-compose.yml` file. Override this
fixture in your tests if you need a custom location."""
return os.path.join(str(pytestconfig.rootdir), "tests", "docker-compose.yml")
@pytest.fixture(scope="session")
def docker_compose_project_name():
"""Generate a project name using the current process PID. Override this
fixture in your tests if you need a particular project name."""
return "pytest{}".format(os.getpid())
def get_cleanup_command():
return "down -v"
@pytest.fixture(scope="session")
def docker_cleanup():
"""Get the docker_compose command to be executed for test clean-up actions.
Override this fixture in your tests if you need to change clean-up actions.
Returning anything that would evaluate to False will skip this command."""
return get_cleanup_command()
def get_setup_command():
return "up --build -d"
@pytest.fixture(scope="session")
def docker_setup():
"""Get the docker_compose command to be executed for test setup actions.
Override this fixture in your tests if you need to change setup actions.
Returning anything that would evaluate to False will skip this command."""
return get_setup_command()
@contextlib.contextmanager
def get_docker_services(
docker_compose_command,
docker_compose_file,
docker_compose_project_name,
docker_setup,
docker_cleanup,
):
docker_compose = DockerComposeExecutor(
docker_compose_command, docker_compose_file, docker_compose_project_name
)
# setup containers.
if docker_setup:
docker_compose.execute(docker_setup)
try:
# Let test(s) run.
yield Services(docker_compose)
finally:
# Clean up.
if docker_cleanup:
docker_compose.execute(docker_cleanup)
@pytest.fixture(scope="session")
def docker_services(
docker_compose_command,
docker_compose_file,
docker_compose_project_name,
docker_setup,
docker_cleanup,
):
"""Start all services from a docker compose file (`docker-compose up`).
After test are finished, shutdown all services (`docker-compose down`)."""
with get_docker_services(
docker_compose_command,
docker_compose_file,
docker_compose_project_name,
docker_setup,
docker_cleanup,
) as docker_service:
yield docker_service