-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathconftest.py
132 lines (107 loc) · 3.81 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import sys
import pytest
from cryptography.fernet import Fernet
import prefect
from prefect.engine.executors import LocalExecutor, SynchronousExecutor
from prefect.utilities import debug
if sys.version_info >= (3, 5):
from prefect.engine.executors import DaskExecutor
from distributed import Client
# ----------------
# set up executor fixtures
# so that we don't have to spin up / tear down a dask cluster
# for every test that needs a dask executor
# ----------------
@pytest.fixture(scope="session")
def mthread():
"Multi-threaded executor"
if sys.version_info >= (3, 5):
with Client(processes=False) as client:
yield DaskExecutor(client.scheduler.address)
else:
yield
@pytest.fixture()
def local():
"Local, immediate execution executor"
yield LocalExecutor()
@pytest.fixture()
def sync():
"Synchronous dask (not dask.distributed) executor"
yield SynchronousExecutor()
@pytest.fixture(scope="session")
def mproc():
"Multi-processing executor"
if sys.version_info >= (3, 5):
with Client(processes=True) as client:
yield DaskExecutor(client.scheduler.address, local_processes=True)
else:
yield
@pytest.fixture()
def _switch(mthread, local, sync, mproc):
"""
A construct needed so we can parametrize the executor fixture.
This isn't straightforward since each executor needs to be initialized
in slightly different ways.
"""
execs = dict(mthread=mthread, local=local, sync=sync, mproc=mproc)
return lambda e: execs[e]
@pytest.fixture()
def executor(request, _switch):
"""
The actual fixture that should be used in testing.
Parametrize your test by decorating:
```
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
```
or with some subset of executors that you want to use.
"""
if sys.version_info < (3, 5) and request.param in ["mthread", "mproc"]:
request.applymarker(
pytest.mark.xfail(
run=False,
reason="dask.distributed does not officially support Python 3.4",
)
)
return _switch(request.param)
def pytest_addoption(parser):
parser.addoption(
"--airflow",
action="store_true",
dest="airflow",
help="including this flag will attempt to ONLY run airflow compatibility tests",
)
parser.addoption(
"--skip-formatting",
action="store_true",
dest="formatting",
help="including this flag will skip all formatting tests",
)
def pytest_configure(config):
config.addinivalue_line(
"markers", "airflow: mark test to run only when --airflow flag is provided."
)
config.addinivalue_line(
"markers",
"formatting: mark test as formatting to skip when --skip-formatting flag is provided.",
)
def pytest_runtest_setup(item):
air_mark = item.get_marker("airflow")
# if a test IS marked as "airflow" and the airflow flag IS NOT set, skip it
if air_mark is not None and item.config.getoption("--airflow") is False:
pytest.skip(
"Airflow tests skipped by default unless --airflow flag provided to pytest."
)
# if a test IS NOT marked as airflow and the airflow flag IS set, skip it
elif air_mark is None and item.config.getoption("--airflow") is True:
pytest.skip("Non-Airflow tests skipped because --airflow flag was provided.")
formatting_mark = item.get_marker("formatting")
# if a test IS marked as "formatting" and the --skip-formatting flag IS set, skip it
if (
formatting_mark is not None
and item.config.getoption("--skip-formatting") is True
):
pytest.skip(
"Formatting tests skipped because --skip-formatting flag was provided."
)