Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually acknowledge take #192

Merged
merged 7 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion faust/sensors/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,36 @@ def setup_prometheus_sensors(
registry: CollectorRegistry = REGISTRY,
name_prefix: str = None,
) -> None:
"""
A utility function which sets up prometheus and attaches the config to the app.

@param app: the faust app instance
@param pattern: the url pattern for prometheus
@param registry: the prometheus registry
@param name_prefix: the name prefix. Defaults to the app name
@return: None
"""
if prometheus_client is None:
raise ImproperlyConfigured(
"prometheus_client requires `pip install prometheus_client`."
)
if name_prefix is None:
app_conf_name = app.conf.name
app.logger.info(
"Name prefix is not supplied. Using the name %s from App config.",
app_conf_name,
)
if "-" in app_conf_name:
name_prefix = app_conf_name.replace("-", "_")
app.logger.warning(
"App config name %s does not conform to"
" Prometheus naming conventions."
" Using %s as a name_prefix.",
app_conf_name,
name_prefix,
)

faust_metrics = FaustMetrics.create(registry, name_prefix or app.conf.name)
faust_metrics = FaustMetrics.create(registry, name_prefix)
app.monitor = PrometheusMonitor(metrics=faust_metrics)

@app.page(pattern)
Expand Down
100 changes: 100 additions & 0 deletions faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,106 @@ def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:
"""
return aenumerate(self, start)

async def noack_take(
self, max_: int, within: Seconds
) -> AsyncIterable[Sequence[T_co]]:
"""
Buffer n values at a time and yield a list of buffered values.
:param max_: Max number of messages to receive. When more than this
number of messages are received within the specified number of
seconds then we flush the buffer immediately.
:param within: Timeout for when we give up waiting for another value,
and process the values we have.
Warning: If there's no timeout (i.e. `timeout=None`),
the agent is likely to stall and block buffered events for an
unreasonable length of time(!).
"""
buffer: List[T_co] = []
events: List[EventT] = []
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

buffer_consuming: Optional[asyncio.Future] = None

channel_it = aiter(self.channel)

# We add this processor to populate the buffer, and the stream
# is passively consumed in the background (enable_passive below).
async def add_to_buffer(value: T) -> T:
try:
# buffer_consuming is set when consuming buffer
# after timeout.
nonlocal buffer_consuming
if buffer_consuming is not None:
try:
await buffer_consuming
finally:
buffer_consuming = None

# We want to save events instead of values to allow for manual ack
event = self.current_event
buffer_add(cast(T_co, event))
if event is None:
raise RuntimeError("Take buffer found current_event is None")

event_add(event)
if buffer_size() >= max_:
# signal that the buffer is full and should be emptied.
buffer_full.set()
# strict wait for buffer to be consumed after buffer
# full.
# If max is 1000, we are not allowed to return 1001
# values.
buffer_consumed.clear()
await self.wait(buffer_consumed)
except CancelledError: # pragma: no cover
raise
except Exception as exc:
self.log.exception("Error adding to take buffer: %r", exc)
await self.crash(exc)
return value

# Disable acks to ensure this method acks manually
# events only after they are consumed by the user
self.enable_acks = False

self.add_processor(add_to_buffer)
self._enable_passive(cast(ChannelT, channel_it))
try:
while not self.should_stop:
# wait until buffer full, or timeout
await self.wait_for_stopped(buffer_full, timeout=timeout)
if buffer:
# make sure background thread does not add new items to
# buffer while we read.
buffer_consuming = self.loop.create_future()
try:
yield list(buffer)
finally:
buffer.clear()
# code change: We want to manually ack
# for event in events:
# await self.ack(event)
events.clear()
# allow writing to buffer again
notify(buffer_consuming)
buffer_full.clear()
buffer_consumed.set()
else: # pragma: no cover
pass
else: # pragma: no cover
pass

finally:
# Restore last behaviour of "enable_acks"
self.enable_acks = stream_enable_acks
self._processors.remove(add_to_buffer)
patkivikram marked this conversation as resolved.
Show resolved Hide resolved

def through(self, channel: Union[str, ChannelT]) -> StreamT:
"""Forward values to in this stream to channel.

Expand Down