forked from ethereum/web3.py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conftest.py
116 lines (83 loc) · 2.64 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import pytest
import time
import warnings
import pytest_asyncio
from tests.utils import (
PollDelayCounter,
_async_wait_for_block_fixture_logic,
_async_wait_for_transaction_fixture_logic,
)
from web3._utils.threads import (
Timeout,
)
from web3.main import (
Web3,
)
from web3.providers.eth_tester import (
AsyncEthereumTesterProvider,
EthereumTesterProvider,
)
@pytest.fixture()
def sleep_interval():
return PollDelayCounter()
def is_testrpc_provider(provider):
return isinstance(provider, EthereumTesterProvider)
@pytest.fixture()
def skip_if_testrpc():
def _skip_if_testrpc(w3):
if is_testrpc_provider(w3.provider):
pytest.skip()
return _skip_if_testrpc
@pytest.fixture(scope="module")
def wait_for_block():
def _wait_for_block(w3, block_number=1, timeout=None):
if not timeout:
timeout = (block_number - w3.eth.block_number) * 3
poll_delay_counter = PollDelayCounter()
with Timeout(timeout) as timeout:
while w3.eth.block_number < block_number:
w3.manager.request_blocking("evm_mine", [])
timeout.sleep(poll_delay_counter())
return _wait_for_block
@pytest.fixture(scope="module")
def wait_for_transaction():
def _wait_for_transaction(w3, txn_hash, timeout=120):
poll_delay_counter = PollDelayCounter()
with Timeout(timeout) as timeout:
while True:
txn_receipt = w3.eth.get_transaction_receipt(txn_hash)
if txn_receipt is not None:
break
time.sleep(poll_delay_counter())
timeout.check()
return txn_receipt
return _wait_for_transaction
@pytest.fixture
def w3():
w3 = Web3(EthereumTesterProvider())
w3.eth.default_account = w3.eth.accounts[0]
return w3
@pytest.fixture(scope="module")
def w3_non_strict_abi():
w3 = Web3(EthereumTesterProvider())
w3.eth.default_account = w3.eth.accounts[0]
w3.strict_bytes_type_checking = False
return w3
@pytest.fixture(autouse=True)
def print_warnings():
warnings.simplefilter("always")
# --- async --- #
def is_async_testrpc_provider(provider):
return isinstance(provider, AsyncEthereumTesterProvider)
@pytest.fixture()
def async_skip_if_testrpc():
def _skip_if_testrpc(async_w3):
if is_async_testrpc_provider(async_w3.provider):
pytest.skip()
return _skip_if_testrpc
@pytest_asyncio.fixture()
async def async_wait_for_block():
return _async_wait_for_block_fixture_logic
@pytest_asyncio.fixture()
async def async_wait_for_transaction():
return _async_wait_for_transaction_fixture_logic