Skip to content

Commit

Permalink
Manually acknowledge take (faust-streaming#192)
Browse files Browse the repository at this point in the history
* Added checks to prometheus setup utilities

* No ack take

* Added functional tests for no ack take

Co-authored-by: Matthew Drago <matthew.drago@gig.com>
Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 26, 2021
1 parent 5ad8222 commit 510051b
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 3 deletions.
26 changes: 25 additions & 1 deletion faust/sensors/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,36 @@ def setup_prometheus_sensors(
registry: CollectorRegistry = REGISTRY,
name_prefix: str = None,
) -> None:
"""
A utility function which sets up prometheus and attaches the config to the app.
@param app: the faust app instance
@param pattern: the url pattern for prometheus
@param registry: the prometheus registry
@param name_prefix: the name prefix. Defaults to the app name
@return: None
"""
if prometheus_client is None:
raise ImproperlyConfigured(
"prometheus_client requires `pip install prometheus_client`."
)
if name_prefix is None:
app_conf_name = app.conf.name
app.logger.info(
"Name prefix is not supplied. Using the name %s from App config.",
app_conf_name,
)
if "-" in app_conf_name:
name_prefix = app_conf_name.replace("-", "_")
app.logger.warning(
"App config name %s does not conform to"
" Prometheus naming conventions."
" Using %s as a name_prefix.",
app_conf_name,
name_prefix,
)

faust_metrics = FaustMetrics.create(registry, name_prefix or app.conf.name)
faust_metrics = FaustMetrics.create(registry, name_prefix)
app.monitor = PrometheusMonitor(metrics=faust_metrics)

@app.page(pattern)
Expand Down
100 changes: 100 additions & 0 deletions faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,106 @@ def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:
"""
return aenumerate(self, start)

async def noack_take(
self, max_: int, within: Seconds
) -> AsyncIterable[Sequence[T_co]]:
"""
Buffer n values at a time and yield a list of buffered values.
:param max_: Max number of messages to receive. When more than this
number of messages are received within the specified number of
seconds then we flush the buffer immediately.
:param within: Timeout for when we give up waiting for another value,
and process the values we have.
Warning: If there's no timeout (i.e. `timeout=None`),
the agent is likely to stall and block buffered events for an
unreasonable length of time(!).
"""
buffer: List[T_co] = []
events: List[EventT] = []
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

buffer_consuming: Optional[asyncio.Future] = None

channel_it = aiter(self.channel)

# We add this processor to populate the buffer, and the stream
# is passively consumed in the background (enable_passive below).
async def add_to_buffer(value: T) -> T:
try:
# buffer_consuming is set when consuming buffer
# after timeout.
nonlocal buffer_consuming
if buffer_consuming is not None:
try:
await buffer_consuming
finally:
buffer_consuming = None

# We want to save events instead of values to allow for manual ack
event = self.current_event
buffer_add(cast(T_co, event))
if event is None:
raise RuntimeError("Take buffer found current_event is None")

event_add(event)
if buffer_size() >= max_:
# signal that the buffer is full and should be emptied.
buffer_full.set()
# strict wait for buffer to be consumed after buffer
# full.
# If max is 1000, we are not allowed to return 1001
# values.
buffer_consumed.clear()
await self.wait(buffer_consumed)
except CancelledError: # pragma: no cover
raise
except Exception as exc:
self.log.exception("Error adding to take buffer: %r", exc)
await self.crash(exc)
return value

# Disable acks to ensure this method acks manually
# events only after they are consumed by the user
self.enable_acks = False

self.add_processor(add_to_buffer)
self._enable_passive(cast(ChannelT, channel_it))
try:
while not self.should_stop:
# wait until buffer full, or timeout
await self.wait_for_stopped(buffer_full, timeout=timeout)
if buffer:
# make sure background thread does not add new items to
# buffer while we read.
buffer_consuming = self.loop.create_future()
try:
yield list(buffer)
finally:
buffer.clear()
# code change: We want to manually ack
# for event in events:
# await self.ack(event)
events.clear()
# allow writing to buffer again
notify(buffer_consuming)
buffer_full.clear()
buffer_consumed.set()
else: # pragma: no cover
pass
else: # pragma: no cover
pass

finally:
# Restore last behaviour of "enable_acks"
self.enable_acks = stream_enable_acks
self._processors.remove(add_to_buffer)

def through(self, channel: Union[str, ChannelT]) -> StreamT:
"""Forward values to in this stream to channel.
Expand Down
124 changes: 122 additions & 2 deletions tests/functional/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,126 @@ async def test_stop_stops_related_streams(app):
assert s3.should_stop


@pytest.mark.asyncio
async def test_noack_take(app):
async with new_stream(app).noack() as s:
assert s.enable_acks is False
await s.channel.send(value=1)
event = None
# noack_take returns an event instead of value
async for noack_value in s.noack_take(1, within=1):
assert noack_value[0].value == 1
assert s.enable_acks is False
event = mock_stream_event_ack(s)
break

assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason

if not event.ack.called:
assert not event.message.acked
assert not event.message.refcount
assert s.enable_acks is False


@pytest.mark.asyncio
async def test_noack_take__10(app, loop):
async with new_stream(app).noack() as s:
assert s.enable_acks is False
for i in range(9):
await s.channel.send(value=i)

async def in_one_second_finalize():
await s.sleep(1.0)
await s.channel.send(value=9)
for i in range(10):
await s.channel.send(value=i + 10)

asyncio.ensure_future(in_one_second_finalize())

event = None
buffer_processor = s.noack_take(10, within=10.0)
async for noack_value in buffer_processor:
assert [nv.value for nv in noack_value] == list(range(10))
assert s.enable_acks is False
event = mock_stream_event_ack(s)
break
async for noack_value in buffer_processor:
assert [nv.value for nv in noack_value] == list(range(10, 20))
assert s.enable_acks is False
break

try:
await buffer_processor.athrow(asyncio.CancelledError())
except asyncio.CancelledError:
pass

assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason

if not event.ack.called:
assert not event.message.acked
assert not event.message.refcount
assert s.enable_acks is False


@pytest.mark.asyncio
async def test_noack_take__10_and_ack(app, loop):
async with new_stream(app).noack() as s:
assert s.enable_acks is False
for i in range(9):
await s.channel.send(value=i)

async def in_one_second_finalize():
await s.sleep(1.0)
await s.channel.send(value=9)
for i in range(10):
await s.channel.send(value=i + 10)

asyncio.ensure_future(in_one_second_finalize())

event = None
buffer_processor = s.noack_take(10, within=10.0)
async for noack_values in buffer_processor:
assert [nv.value for nv in noack_values] == list(range(10))
assert s.enable_acks is False
event = mock_stream_event_ack(s)
# acknowledge
for noack_event in noack_values:
await s.ack(noack_event)
break
async for noack_values in buffer_processor:
assert [nv.value for nv in noack_values] == list(range(10, 20))
assert s.enable_acks is False
for noack_event in noack_values:
await s.ack(noack_event)
break

try:
await buffer_processor.athrow(asyncio.CancelledError())
except asyncio.CancelledError:
pass

assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason

if not event.ack.called:
assert event.message.acked
assert not event.message.refcount
assert s.enable_acks is False


@pytest.mark.asyncio
async def test_take(app):
async with new_stream(app) as s:
Expand All @@ -564,8 +684,8 @@ async def test_take(app):
assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0)
await asyncio.sleep(0)

if not event.ack.called:
assert event.message.acked
Expand Down

0 comments on commit 510051b

Please sign in to comment.