Skip to content

Commit

Permalink
Restore tests (updated) (faust-streaming#141)
Browse files Browse the repository at this point in the history
* Restore testing

A lot of test classes are named test_*, which makes pytest *not* find them by default

* Fix failing unit test after fix for faust-streaming#42

* Restore tests by adding timeout arg

* Restore original behaviour of _wait, don't always wait for signal_recovery_start

* Restore a few of the aiokafka tests

* Restore a few app tests

* Run event path verification in aiokafka driver

* lower minimum code coverage to 59%

* reformat examples/livecheck.py with black

* rename test class to match Test naming pattern

* fix test__resume_streams, as assignment() returns a set

Co-authored-by: Erik Forsberg <erik.forsberg@ferroamp.se>
Co-authored-by: Erik Forsberg <forsberg@ferroamp.com>
Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
  • Loading branch information
4 people authored May 6, 2021
1 parent 48fa76b commit 7a45b2b
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 60 deletions.
89 changes: 43 additions & 46 deletions examples/livecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

class Order(faust.Record):

SIDE_SELL = 'sell'
SIDE_BUY = 'buy'
SIDE_SELL = "sell"
SIDE_BUY = "buy"
VALID_SIDES = {SIDE_SELL, SIDE_BUY}

id: str
Expand All @@ -50,21 +50,21 @@ class Order(faust.Record):


app = faust.App(
'orders',
cache='redis://localhost:6379',
origin='examples.livecheck',
"orders",
cache="redis://localhost:6379",
origin="examples.livecheck",
autodiscover=True,
)
livecheck = app.LiveCheck()

orders_topic = app.topic('orders', value_type=Order)
execution_topic = app.topic('order-execution', value_type=Order)
orders_topic = app.topic("orders", value_type=Order)
execution_topic = app.topic("order-execution", value_type=Order)


orders = web.Blueprint('orders')
orders = web.Blueprint("orders")


@orders.route('/init/{side}/', name='init')
@orders.route("/init/{side}/", name="init")
class OrderView(web.View):

# First clients do a GET on /order/init/sell/
Expand All @@ -86,31 +86,29 @@ async def get(self, request: web.Request, side: str) -> web.Response:
# the test will ensure that no system is changing the side
# of this order from buy to sell.
async with test_order.maybe_trigger(order_id, side=side) as test:
fake = bool(int(request.query.get('fake', 0)))
next_url = self.url_for('orders:create', 'http://localhost:6066')
fake = bool(int(request.query.get("fake", 0)))
next_url = self.url_for("orders:create", "http://localhost:6066")
data = {
'order_id': order_id,
'user_id': user_id,
'side': side,
'did_execute_test': bool(test),
'fake': fake,
"order_id": order_id,
"user_id": user_id,
"side": side,
"did_execute_test": bool(test),
"fake": fake,
}
async with self.app.http_client.post(next_url, json=data) as resp:
assert resp.status == 200
return self.bytes(await resp.read(),
content_type='application/json')
return self.bytes(await resp.read(), content_type="application/json")


@orders.route('/create/', name='create')
@orders.route("/create/", name="create")
class CreateOrderView(web.View):

async def post(self, request: web.Request) -> web.Response:
payload = await request.json()
order_id = payload['order_id']
user_id = payload['user_id']
side = payload['side']
fake = payload['fake']
did_execute_test = payload['did_execute_test']
order_id = payload["order_id"]
user_id = payload["user_id"]
side = payload["side"]
fake = payload["fake"]
did_execute_test = payload["did_execute_test"]

if did_execute_test:
# LiveCheck read the HTTP headers passed in this request
Expand All @@ -121,10 +119,10 @@ async def post(self, request: web.Request) -> web.Response:

order = Order(order_id, user_id, side, 1.0, 3.33, fake=fake)
await orders_topic.send(key=order_id, value=order)
return self.json({'status': 'success'})
return self.json({"status": "success"})


app.web.blueprints.add('/order/', orders)
app.web.blueprints.add("/order/", orders)


@app.agent(orders_topic)
Expand All @@ -133,20 +131,18 @@ async def create_order(orders: StreamT[Order]) -> None:
test = livecheck.current_test
if test is not None:
assert test.id == order.id
print('1. ORDER SENT TO DB')
print("1. ORDER SENT TO DB")
await test_order.order_sent_to_db.send(order)

def on_order_sent(fut: FutureMessage) -> None:
print('2. ORDER SENT TO KAFKA')
asyncio.ensure_future(
test_order.order_sent_to_kafka.send())

await execution_topic.send(key=order.id, value=order,
callback=on_order_sent)
print('3. ORDER SENT TO EXECUTION AGENT')
await app.cache.client.sadd(f'order.{order.user_id}.orders', order.id)
print("2. ORDER SENT TO KAFKA")
asyncio.ensure_future(test_order.order_sent_to_kafka.send())

await execution_topic.send(key=order.id, value=order, callback=on_order_sent)
print("3. ORDER SENT TO EXECUTION AGENT")
await app.cache.client.sadd(f"order.{order.user_id}.orders", order.id)
await test_order.order_cache_in_redis.send()
print('4. ORDER CACHED IN REDIS')
print("4. ORDER CACHED IN REDIS")


@app.agent(execution_topic)
Expand All @@ -157,11 +153,11 @@ async def execute_order(orders: StreamT[Order]) -> None:
# bla bla bla
pass
await test_order.order_executed.send(execution_id)
print('5. ORDER EXECUTED BY EXECUTION AGENT')
print("5. ORDER EXECUTED BY EXECUTION AGENT")


@livecheck.case(warn_stalled_after=5.0, frequency=0.5, probability=0.5)
class test_order(Case):
class Test_order(Case):

order_sent_to_db: Signal[Order]
order_sent_to_kafka: Signal[None]
Expand All @@ -185,26 +181,27 @@ async def run(self, side: str) -> None:
await self.order_cache_in_redis.wait(timeout=30.0)
# make sure it's now actually in redis
assert await livecheck.cache.client.sismember(
f'order.{order.user_id}.orders', order.id)
f"order.{order.user_id}.orders", order.id
)

# 4) wait for execution agent to execute the order.
await self.order_executed.wait(timeout=30.0)

async def make_fake_request(self) -> None:
await self.get_url('http://localhost:6066/order/init/sell/?fake=1')
await self.get_url("http://localhost:6066/order/init/sell/?fake=1")


@app.command(
cli.option('--side', default='sell', help='Order side: buy, sell'),
cli.option('--base-url', default='http://localhost:6066'),
cli.option("--side", default="sell", help="Order side: buy, sell"),
cli.option("--base-url", default="http://localhost:6066"),
)
async def post_order(self: cli.AppCommand, side: str, base_url: str) -> None:
path = self.app.web.url_for('orders:init', side=side)
url = ''.join([base_url.rstrip('/'), path])
path = self.app.web.url_for("orders:init", side=side)
url = "".join([base_url.rstrip("/"), path])
async with self.app.http_client.get(url) as response:
response.raise_for_status()
print(await response.read())


if __name__ == '__main__':
if __name__ == "__main__":
app.main()
3 changes: 3 additions & 0 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ async def on_stop(self) -> None:
transport = cast(Transport, self.transport)
transport._topic_waiters.clear()

def verify_event_path(self, now: float, tp: TP) -> None:
return self._thread.verify_event_path(now, tp)


class ThreadedProducer(ServiceThread):
_producer: Optional[aiokafka.AIOKafkaProducer] = None
Expand Down
2 changes: 1 addition & 1 deletion scripts/coverage
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ fi

set -x

${PREFIX}coverage report --show-missing --skip-covered --fail-under=60
${PREFIX}coverage report --show-missing --skip-covered --fail-under=59
codecov --token=$CODECOV_TOKEN
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ addopts =
--strict-config
--strict-markers
xfail_strict=True

19 changes: 17 additions & 2 deletions tests/unit/agents/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,26 @@ async def test_execute_actor__cancelled_stopped(self, *, agent):
@pytest.mark.skip(reason="Fix is TBD")
@pytest.mark.asyncio
async def test_execute_actor__cancelled_running(self, *, agent):
agent._on_error = AsyncMock(name="on_error")
agent.log = Mock(name="log", autospec=CompositeLogger)
aref = Mock(
name="aref",
autospec=Actor,
crash=AsyncMock(),
)
agent.supervisor = Mock(name="supervisor")
coro = FutureMock()
coro.side_effect = asyncio.CancelledError()
await agent._execute_actor(coro, Mock(name="aref", autospec=Actor))
exc = coro.side_effect = asyncio.CancelledError()
await agent._execute_actor(coro, aref)
coro.assert_awaited()

aref.crash.assert_called_once_with(exc)
agent.supervisor.wakeup.assert_called_once_with()
agent._on_error.assert_not_called()

agent._on_error = None
await agent._execute_actor(coro, aref)

@pytest.mark.asyncio
async def test_execute_actor__raising(self, *, agent):
agent._on_error = AsyncMock(name="on_error")
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/tables/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ async def test__resume_streams(self, *, recovery, tables, app):
consumer = app.consumer = Mock()
recovery._wait = AsyncMock()
recovery._is_changelog_tp = MagicMock(return_value=False)
consumer.assignment = MagicMock(return_value=[("tp", 1)])
consumer.assignment = MagicMock(return_value={("tp", 1)})
await recovery._resume_streams()
app.on_rebalance_complete.send.assert_called_once_with()
consumer.resume_flow.assert_called_once_with()
app.flow_control.resume.assert_called_once_with()
recovery._wait.assert_called_once_with(consumer.perform_seek(), timeout=90.0)
consumer.resume_partitions.assert_called_once
recovery._wait.assert_called_once_with(
consumer.perform_seek(), timeout=app.conf.broker_request_timeout
)
consumer.resume_partitions.assert_called_once_with(consumer.assignment())

assert recovery.completed.is_set()
app._fetcher.maybe_start.assert_called_once_with()
Expand Down
23 changes: 15 additions & 8 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ def _consumer(self):
commit=AsyncMock(),
position=AsyncMock(),
end_offsets=AsyncMock(),
_client=Mock(name="Client", close=AsyncMock()),
_coordinator=Mock(name="Coordinator", close=AsyncMock()),
)

@pytest.fixture()
Expand Down Expand Up @@ -755,7 +757,7 @@ def assert_create_worker_consumer(
api_version=app.conf.consumer_api_version,
client_id=conf.broker_client_id,
group_id=conf.id,
group_instance_id=conf.consumer_group_instance_id,
# group_instance_id=conf.consumer_group_instance_id,
bootstrap_servers=server_list(transport.url, transport.default_port),
partition_assignment_strategy=[cthread._assignor],
enable_auto_commit=False,
Expand All @@ -770,11 +772,11 @@ def assert_create_worker_consumer(
session_timeout_ms=int(conf.broker_session_timeout * 1000.0),
heartbeat_interval_ms=int(conf.broker_heartbeat_interval * 1000.0),
isolation_level=isolation_level,
traced_from_parent_span=cthread.traced_from_parent_span,
start_rebalancing_span=cthread.start_rebalancing_span,
start_coordinator_span=cthread.start_coordinator_span,
on_generation_id_known=cthread.on_generation_id_known,
flush_spans=cthread.flush_spans,
# traced_from_parent_span=cthread.traced_from_parent_span,
# start_rebalancing_span=cthread.start_rebalancing_span,
# start_coordinator_span=cthread.start_coordinator_span,
# on_generation_id_known=cthread.on_generation_id_known,
# flush_spans=cthread.flush_spans,
**auth_settings,
)

Expand Down Expand Up @@ -980,17 +982,21 @@ async def test__commit__CommitFailedError(self, *, cthread, _consumer):
cthread._consumer = _consumer
exc = _consumer.commit.side_effect = CommitFailedError("xx")
cthread.crash = AsyncMock()
cthread.supervisor = Mock(name="supervisor")
assert not (await cthread._commit({TP1: 1001}))
cthread.crash.assert_called_once_with(exc)
cthread.supervisor.wakeup.assert_called_once()

@pytest.mark.asyncio
async def test__commit__IllegalStateError(self, *, cthread, _consumer):
cthread._consumer = _consumer
cthread.assignment = Mock()
exc = _consumer.commit.side_effect = IllegalStateError("xx")
cthread.crash = AsyncMock()
cthread.supervisor = Mock(name="supervisor")
assert not (await cthread._commit({TP1: 1001}))
cthread.crash.assert_called_once_with(exc)
cthread.supervisor.wakeup.assert_called_once()

@pytest.mark.asyncio
async def test_position(self, *, cthread, _consumer):
Expand Down Expand Up @@ -1319,7 +1325,7 @@ async def test_commit_transactions(self, *, producer, _producer):

def test__settings_extra(self, *, producer, app):
app.in_transaction = True
assert producer._settings_extra() == {"acks": "all"}
assert producer._settings_extra() == {"acks": "all", "enable_idempotence": True}
app.in_transaction = False
assert producer._settings_extra() == {}

Expand Down Expand Up @@ -1410,7 +1416,8 @@ def assert_new_producer(
security_protocol=security_protocol,
loop=producer.loop,
partitioner=producer.partitioner,
on_irrecoverable_error=producer._on_irrecoverable_error,
transactional_id=None,
# on_irrecoverable_error=producer._on_irrecoverable_error,
**kwargs,
)

Expand Down

0 comments on commit 7a45b2b

Please sign in to comment.