diff --git a/faust/sensors/prometheus.py b/faust/sensors/prometheus.py index 1a805c534..5284b9d8e 100644 --- a/faust/sensors/prometheus.py +++ b/faust/sensors/prometheus.py @@ -45,12 +45,36 @@ def setup_prometheus_sensors( registry: CollectorRegistry = REGISTRY, name_prefix: str = None, ) -> None: + """ + A utility function which sets up prometheus and attaches the config to the app. + + @param app: the faust app instance + @param pattern: the url pattern for prometheus + @param registry: the prometheus registry + @param name_prefix: the name prefix. Defaults to the app name + @return: None + """ if prometheus_client is None: raise ImproperlyConfigured( "prometheus_client requires `pip install prometheus_client`." ) + if name_prefix is None: + app_conf_name = app.conf.name + app.logger.info( + "Name prefix is not supplied. Using the name %s from App config.", + app_conf_name, + ) + if "-" in app_conf_name: + name_prefix = app_conf_name.replace("-", "_") + app.logger.warning( + "App config name %s does not conform to" + " Prometheus naming conventions." + " Using %s as a name_prefix.", + app_conf_name, + name_prefix, + ) - faust_metrics = FaustMetrics.create(registry, name_prefix or app.conf.name) + faust_metrics = FaustMetrics.create(registry, name_prefix) app.monitor = PrometheusMonitor(metrics=faust_metrics) @app.page(pattern) diff --git a/faust/streams.py b/faust/streams.py index 62dc6da5c..9ecf2b77f 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -400,6 +400,106 @@ def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]: """ return aenumerate(self, start) + async def noack_take( + self, max_: int, within: Seconds + ) -> AsyncIterable[Sequence[T_co]]: + """ + Buffer n values at a time and yield a list of buffered values. + :param max_: Max number of messages to receive. When more than this + number of messages are received within the specified number of + seconds then we flush the buffer immediately. + :param within: Timeout for when we give up waiting for another value, + and process the values we have. + Warning: If there's no timeout (i.e. `timeout=None`), + the agent is likely to stall and block buffered events for an + unreasonable length of time(!). + """ + buffer: List[T_co] = [] + events: List[EventT] = [] + buffer_add = buffer.append + event_add = events.append + buffer_size = buffer.__len__ + buffer_full = asyncio.Event(loop=self.loop) + buffer_consumed = asyncio.Event(loop=self.loop) + timeout = want_seconds(within) if within else None + stream_enable_acks: bool = self.enable_acks + + buffer_consuming: Optional[asyncio.Future] = None + + channel_it = aiter(self.channel) + + # We add this processor to populate the buffer, and the stream + # is passively consumed in the background (enable_passive below). + async def add_to_buffer(value: T) -> T: + try: + # buffer_consuming is set when consuming buffer + # after timeout. + nonlocal buffer_consuming + if buffer_consuming is not None: + try: + await buffer_consuming + finally: + buffer_consuming = None + + # We want to save events instead of values to allow for manual ack + event = self.current_event + buffer_add(cast(T_co, event)) + if event is None: + raise RuntimeError("Take buffer found current_event is None") + + event_add(event) + if buffer_size() >= max_: + # signal that the buffer is full and should be emptied. + buffer_full.set() + # strict wait for buffer to be consumed after buffer + # full. + # If max is 1000, we are not allowed to return 1001 + # values. + buffer_consumed.clear() + await self.wait(buffer_consumed) + except CancelledError: # pragma: no cover + raise + except Exception as exc: + self.log.exception("Error adding to take buffer: %r", exc) + await self.crash(exc) + return value + + # Disable acks to ensure this method acks manually + # events only after they are consumed by the user + self.enable_acks = False + + self.add_processor(add_to_buffer) + self._enable_passive(cast(ChannelT, channel_it)) + try: + while not self.should_stop: + # wait until buffer full, or timeout + await self.wait_for_stopped(buffer_full, timeout=timeout) + if buffer: + # make sure background thread does not add new items to + # buffer while we read. + buffer_consuming = self.loop.create_future() + try: + yield list(buffer) + finally: + buffer.clear() + # code change: We want to manually ack + # for event in events: + # await self.ack(event) + events.clear() + # allow writing to buffer again + notify(buffer_consuming) + buffer_full.clear() + buffer_consumed.set() + else: # pragma: no cover + pass + else: # pragma: no cover + pass + + finally: + # Restore last behaviour of "enable_acks" + self.enable_acks = stream_enable_acks + self._processors.remove(add_to_buffer) + def through(self, channel: Union[str, ChannelT]) -> StreamT: """Forward values to in this stream to channel. diff --git a/tests/functional/test_streams.py b/tests/functional/test_streams.py index e189e70bd..cea34e777 100644 --- a/tests/functional/test_streams.py +++ b/tests/functional/test_streams.py @@ -549,6 +549,126 @@ async def test_stop_stops_related_streams(app): assert s3.should_stop +@pytest.mark.asyncio +async def test_noack_take(app): + async with new_stream(app).noack() as s: + assert s.enable_acks is False + await s.channel.send(value=1) + event = None + # noack_take returns an event instead of value + async for noack_value in s.noack_take(1, within=1): + assert noack_value[0].value == 1 + assert s.enable_acks is False + event = mock_stream_event_ack(s) + break + + assert event + # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 + # need two sleeps on Python 3.6.7 + 3.7.1 :-/ + await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) # needed for some reason + + if not event.ack.called: + assert not event.message.acked + assert not event.message.refcount + assert s.enable_acks is False + + +@pytest.mark.asyncio +async def test_noack_take__10(app, loop): + async with new_stream(app).noack() as s: + assert s.enable_acks is False + for i in range(9): + await s.channel.send(value=i) + + async def in_one_second_finalize(): + await s.sleep(1.0) + await s.channel.send(value=9) + for i in range(10): + await s.channel.send(value=i + 10) + + asyncio.ensure_future(in_one_second_finalize()) + + event = None + buffer_processor = s.noack_take(10, within=10.0) + async for noack_value in buffer_processor: + assert [nv.value for nv in noack_value] == list(range(10)) + assert s.enable_acks is False + event = mock_stream_event_ack(s) + break + async for noack_value in buffer_processor: + assert [nv.value for nv in noack_value] == list(range(10, 20)) + assert s.enable_acks is False + break + + try: + await buffer_processor.athrow(asyncio.CancelledError()) + except asyncio.CancelledError: + pass + + assert event + # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 + # need two sleeps on Python 3.6.7 + 3.7.1 :-/ + await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) # needed for some reason + + if not event.ack.called: + assert not event.message.acked + assert not event.message.refcount + assert s.enable_acks is False + + +@pytest.mark.asyncio +async def test_noack_take__10_and_ack(app, loop): + async with new_stream(app).noack() as s: + assert s.enable_acks is False + for i in range(9): + await s.channel.send(value=i) + + async def in_one_second_finalize(): + await s.sleep(1.0) + await s.channel.send(value=9) + for i in range(10): + await s.channel.send(value=i + 10) + + asyncio.ensure_future(in_one_second_finalize()) + + event = None + buffer_processor = s.noack_take(10, within=10.0) + async for noack_values in buffer_processor: + assert [nv.value for nv in noack_values] == list(range(10)) + assert s.enable_acks is False + event = mock_stream_event_ack(s) + # acknowledge + for noack_event in noack_values: + await s.ack(noack_event) + break + async for noack_values in buffer_processor: + assert [nv.value for nv in noack_values] == list(range(10, 20)) + assert s.enable_acks is False + for noack_event in noack_values: + await s.ack(noack_event) + break + + try: + await buffer_processor.athrow(asyncio.CancelledError()) + except asyncio.CancelledError: + pass + + assert event + # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 + # need two sleeps on Python 3.6.7 + 3.7.1 :-/ + await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) # needed for some reason + + if not event.ack.called: + assert event.message.acked + assert not event.message.refcount + assert s.enable_acks is False + + @pytest.mark.asyncio async def test_take(app): async with new_stream(app) as s: @@ -564,8 +684,8 @@ async def test_take(app): assert event # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 # need two sleeps on Python 3.6.7 + 3.7.1 :-/ - await asyncio.sleep(0) # needed for some reason - await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) + await asyncio.sleep(0) if not event.ack.called: assert event.message.acked