Skip to content

Commit

Permalink
enable the test_aiokafka.py tests (faust-streaming#143)
Browse files Browse the repository at this point in the history
* enable the test_aiokafka.py TestProducer tests

* remove obsolete tests and some commented out dead code

* clean up transaction tests

* enable TestProducer tests

* restore 60% coverage requirement
  • Loading branch information
taybin authored May 11, 2021
1 parent 7a45b2b commit 7cfc728
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 65 deletions.
9 changes: 0 additions & 9 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,6 @@ def _settings_default(self) -> Mapping[str, Any]:
"max_batch_size": self.max_batch_size,
"max_request_size": self.max_request_size,
"compression_type": self.compression_type,
# 'on_irrecoverable_error': self._on_irrecoverable_error,
"security_protocol": "SSL" if self.ssl_context else "PLAINTEXT",
"partitioner": self.partitioner,
"request_timeout_ms": int(self.request_timeout * 1000),
Expand Down Expand Up @@ -1217,13 +1216,6 @@ def _new_producer(self, transactional_id: str = None) -> aiokafka.AIOKafkaProduc
def _producer_type(self) -> Type[aiokafka.AIOKafkaProducer]:
return aiokafka.AIOKafkaProducer

# async def _on_irrecoverable_error(self, exc: BaseException) -> None:
# consumer = self.transport.app.consumer
# if consumer is not None: # pragma: no cover
# # coverage executes this line, but does not mark as covered.
# await consumer.crash(exc)
# await self.crash(exc)

async def create_topic(
self,
topic: str,
Expand Down Expand Up @@ -1263,7 +1255,6 @@ def _ensure_producer(self) -> aiokafka.AIOKafkaProducer:
async def on_start(self) -> None:
"""Call when producer starts."""
await super().on_start()
# if not self.app.in_transaction:
producer = self._producer = self._new_producer()
self.beacon.add(producer)
await producer.start()
Expand Down
2 changes: 1 addition & 1 deletion scripts/coverage
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ fi

set -x

${PREFIX}coverage report --show-missing --skip-covered --fail-under=59
${PREFIX}coverage report --show-missing --skip-covered --fail-under=60
codecov --token=$CODECOV_TOKEN
137 changes: 82 additions & 55 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,30 +1259,43 @@ class MyPartitioner:
my_partitioner = MyPartitioner()


@pytest.mark.skip("Needs fixing")
class TestProducer:
@pytest.fixture()
def producer(self, *, app, _producer):
producer = Producer(app.transport)
producer._new_producer = Mock(return_value=_producer)
producer._producer = _producer

# I can't figure out what is setting a value for this,
# so we're clearing out the dict after creation
producer._transaction_producers = {}

return producer

@pytest.fixture()
def _producer(self):
return Mock(
name="AIOKafkaProducer",
autospec=aiokafka.AIOKafkaProducer,
start=AsyncMock(),
stop=AsyncMock(),
begin_transaction=AsyncMock(),
commit_transaction=AsyncMock(),
abort_transaction=AsyncMock(),
stop_transaction=AsyncMock(),
maybe_begin_transaction=AsyncMock(),
commit=AsyncMock(),
send=AsyncMock(),
flush=AsyncMock(),
)
def _producer(self, *, _producer_call):
return _producer_call()

@pytest.fixture()
def _producer_call(self):
def inner():
return Mock(
name="AIOKafkaProducer",
autospec=aiokafka.AIOKafkaProducer,
start=AsyncMock(),
stop=AsyncMock(),
begin_transaction=AsyncMock(),
commit_transaction=AsyncMock(),
abort_transaction=AsyncMock(),
stop_transaction=AsyncMock(),
maybe_begin_transaction=AsyncMock(),
commit=AsyncMock(),
send=AsyncMock(),
flush=AsyncMock(),
send_offsets_to_transaction=AsyncMock(),
)

return inner

@pytest.mark.conf(producer_partitioner=my_partitioner)
def test_producer__uses_custom_partitioner(self, *, producer):
Expand All @@ -1291,45 +1304,63 @@ def test_producer__uses_custom_partitioner(self, *, producer):
@pytest.mark.asyncio
async def test_begin_transaction(self, *, producer, _producer):
await producer.begin_transaction("tid")
_producer.begin_transaction.assert_called_once_with("tid")
_producer.begin_transaction.assert_called_once()

@pytest.mark.asyncio
async def test_commit_transaction(self, *, producer, _producer):
await producer.begin_transaction("tid")
await producer.commit_transaction("tid")
_producer.commit_transaction.assert_called_once_with("tid")
_producer.commit_transaction.assert_called_once()

@pytest.mark.asyncio
async def test_abort_transaction(self, *, producer, _producer):
await producer.begin_transaction("tid")
await producer.abort_transaction("tid")
_producer.abort_transaction.assert_called_once_with("tid")
_producer.abort_transaction.assert_called_once()

@pytest.mark.asyncio
async def test_stop_transaction(self, *, producer, _producer):
await producer.begin_transaction("tid")
await producer.stop_transaction("tid")
_producer.stop_transaction.assert_called_once_with("tid")
_producer.stop.assert_called_once()

@pytest.mark.asyncio
async def test_maybe_begin_transaction(self, *, producer, _producer):
await producer.maybe_begin_transaction("tid")
_producer.maybe_begin_transaction.assert_called_once_with("tid")
_producer.begin_transaction.assert_called_once()

@pytest.mark.asyncio
async def test_commit_transactions(self, *, producer, _producer):
async def test_commit_transactions(self, *, producer, _producer_call):
_producer1 = _producer_call()
_producer2 = _producer_call()
producer._new_producer = Mock(return_value=_producer1)
await producer.begin_transaction("t1")
producer._new_producer = Mock(return_value=_producer2)
await producer.begin_transaction("t2")
tid_to_offset_map = {"t1": {TP1: 1001}, "t2": {TP2: 2002}}

await producer.commit_transactions(
tid_to_offset_map, "group_id", start_new_transaction=False
)
_producer.commit.assert_called_once_with(
tid_to_offset_map, "group_id", start_new_transaction=False

_producer1.send_offsets_to_transaction.assert_called_once_with(
{TP1: 1001}, "group_id"
)
_producer2.send_offsets_to_transaction.assert_called_once_with(
{TP2: 2002}, "group_id"
)
_producer1.commit_transaction.assert_called_once()
_producer2.commit_transaction.assert_called_once()

def test__settings_extra(self, *, producer, app):
app.in_transaction = True
assert producer._settings_extra() == {"acks": "all", "enable_idempotence": True}
app.in_transaction = False
assert producer._settings_extra() == {}

def test__new_producer(self, *, producer):
@pytest.mark.skip("fix me")
def test__new_producer(self, *, app):
producer = Producer(app.transport)
self.assert_new_producer(producer)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -1382,7 +1413,9 @@ def test__new_producer(self, *, producer):
),
],
)
def test__new_producer__using_settings(self, expected_args, *, app, producer):
@pytest.mark.skip("fix me")
def test__new_producer__using_settings(self, expected_args, *, app):
producer = Producer(app.transport)
self.assert_new_producer(producer, **expected_args)

def assert_new_producer(
Expand Down Expand Up @@ -1417,38 +1450,18 @@ def assert_new_producer(
loop=producer.loop,
partitioner=producer.partitioner,
transactional_id=None,
# on_irrecoverable_error=producer._on_irrecoverable_error,
**kwargs,
)

def test__new_producer__default(self, *, producer):
@pytest.mark.asyncio
async def test__new_producer__default(self, *, app):
producer = Producer(app.transport)
p = producer._new_producer()
assert isinstance(p, aiokafka.AIOKafkaProducer)

def test__new_producer__in_transaction(self, *, producer):
producer.app.in_transaction = True
p = producer._new_producer()
assert isinstance(p, aiokafka.MultiTXNProducer)

def test__producer_type(self, *, producer, app):
app.in_transaction = True
assert producer._producer_type is aiokafka.MultiTXNProducer
app.in_transaction = False
assert producer._producer_type is aiokafka.AIOKafkaProducer

@pytest.mark.asyncio
async def test__on_irrecoverable_error(self, *, producer):
exc = KeyError()
producer.crash = AsyncMock()
app = producer.transport.app
app.consumer = None
await producer._on_irrecoverable_error(exc)
producer.crash.assert_called_once_with(exc)
app.consumer = Mock(name="consumer")
app.consumer.crash = AsyncMock()
await producer._on_irrecoverable_error(exc)
app.consumer.crash.assert_called_once_with(exc)

@pytest.mark.asyncio
async def test_create_topic(self, *, producer, _producer):
_producer.client = Mock(
Expand Down Expand Up @@ -1518,6 +1531,7 @@ def test_supports_headers__not_ready(self, *, producer):

@pytest.mark.asyncio
async def test_send(self, producer, _producer):
await producer.begin_transaction("tid")
await producer.send(
"topic",
"k",
Expand All @@ -1534,12 +1548,12 @@ async def test_send(self, producer, _producer):
partition=3,
timestamp_ms=100 * 1000.0,
headers=[("foo", "bar")],
transactional_id="tid",
)

@pytest.mark.asyncio
@pytest.mark.conf(producer_api_version="0.10")
async def test_send__request_no_headers(self, producer, _producer):
await producer.begin_transaction("tid")
await producer.send(
"topic",
"k",
Expand All @@ -1556,12 +1570,12 @@ async def test_send__request_no_headers(self, producer, _producer):
partition=3,
timestamp_ms=100 * 1000.0,
headers=None,
transactional_id="tid",
)

@pytest.mark.asyncio
@pytest.mark.conf(producer_api_version="0.11")
async def test_send__kafka011_supports_headers(self, producer, _producer):
await producer.begin_transaction("tid")
await producer.send(
"topic",
"k",
Expand All @@ -1578,12 +1592,12 @@ async def test_send__kafka011_supports_headers(self, producer, _producer):
partition=3,
timestamp_ms=100 * 1000.0,
headers=[("foo", "bar")],
transactional_id="tid",
)

@pytest.mark.asyncio
@pytest.mark.conf(producer_api_version="auto")
async def test_send__auto_passes_headers(self, producer, _producer):
await producer.begin_transaction("tid")
await producer.send(
"topic",
"k",
Expand All @@ -1600,11 +1614,11 @@ async def test_send__auto_passes_headers(self, producer, _producer):
partition=3,
timestamp_ms=100 * 1000.0,
headers=[("foo", "bar")],
transactional_id="tid",
)

@pytest.mark.asyncio
async def test_send__no_headers(self, producer, _producer):
await producer.begin_transaction("tid")
await producer.send(
"topic",
"k",
Expand All @@ -1621,11 +1635,11 @@ async def test_send__no_headers(self, producer, _producer):
partition=3,
timestamp_ms=100 * 1000.0,
headers=None,
transactional_id="tid",
)

@pytest.mark.asyncio
async def test_send__no_timestamp(self, producer, _producer):
await producer.begin_transaction("tid")
await producer.send(
"topic",
"k",
Expand All @@ -1642,12 +1656,25 @@ async def test_send__no_timestamp(self, producer, _producer):
partition=3,
timestamp_ms=None,
headers=None,
transactional_id="tid",
)

@pytest.mark.asyncio
async def test_send__KafkaError(self, producer, _producer):
_producer.send.coro.side_effect = KafkaError()
with pytest.raises(ProducerSendError):
await producer.send(
"topic",
"k",
"v",
3,
None,
None,
)

@pytest.mark.asyncio
async def test_send__trn_KafkaError(self, producer, _producer):
_producer.send.coro.side_effect = KafkaError()
await producer.begin_transaction("tid")
with pytest.raises(ProducerSendError):
await producer.send(
"topic",
Expand Down

0 comments on commit 7cfc728

Please sign in to comment.