-
Notifications
You must be signed in to change notification settings - Fork 227
introduce an event queue and a processing thread into transports #411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Using this queue, we can avoid that any other thread than the processing thread will access the compressed StringIO queue, making coarse blocking unnecessary. This should avoid dead locks.
elasticapm/transport/base.py
Outdated
@@ -54,74 +53,93 @@ def __init__( | |||
self._max_flush_time = max_flush_time | |||
self._max_buffer_size = max_buffer_size | |||
self._queued_data = None | |||
self._queue_lock = threading.Lock() | |||
self._event_queue = compat.queue.Queue(maxsize=100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if this should be configurable.
@axw is it in the Go agent? Also, how did you determine the (default) size of the queue (or ring buffer in your case IIRC)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have separate queues for transactions, spans, and errors; mostly for type-safety. Each queue has a capacity of 1000. I don't think I was very scientific about the choice, just eyeballed a high-throughput test app to ensure it didn't drop a significant amount.
Originally I made them configurable, but the number of queues and buffers increased to the point that I thought it would be unhelpful to expose it initially. The Go agent has this initial queue, then a ring buffer, and then finally the gzip request buffer. The ring buffer and request buffers have a configurable sizes, specified in bytes.
transport.queue("error", "x", flush=True) | ||
record = caplog.records[0] | ||
assert "forced" in record.message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, it seems the caplog fixture has some difficulties in multithreaded environments (or I have difficulties using it correctly), so I had to remove some of these asserts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. I think this will be easier to reason about, and moving the HTTP request sending to the background should reduce impact on application threads.
I found a few issues. I didn't look at the guts of the Transport.flush method in detail, I assume that hasn't changed other than removing the lock and timer, and just recording the last-flushed time. I also haven't looked at the test code, but can do later if you'd like.
elasticapm/transport/base.py
Outdated
queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8")) | ||
self._counts[event_type] += 1 | ||
if not self._event_process_thread.is_alive() and not self._closed: | ||
self._event_process_thread.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple callers of queue
could race and end up here, and the second one will raise an exception. Maybe just start the thread in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that might be an issue. I had it in the constructor at first, but that made testing extremely cumbersome.
Maybe a flag in the constructor, start_processing_thread
that defaults to True
could help with that.
try: | ||
self._event_queue.put_nowait((event_type, data, flush)) | ||
except compat.queue.Full: | ||
logger.warning("Event of type %s dropped due to full event queue", event_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, dropping events is normal operation (i.e. for high-throughput applications) and shouldn't log a warning. Maybe debug level.
FYI, this is where I would record that an event was dropped, and what was contributing to the contentiously-named agent.events.dropped
metric (which I... dropped).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -135,7 +152,10 @@ def close(self): | |||
Cleans up resources and closes connection | |||
:return: | |||
""" | |||
self.flush(sync=True, start_flush_timer=False) | |||
self._closed = True | |||
self.queue("close", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like it might be a bad idea for the "close" event to be silently dropped. Or at least, it should be made optional. Suggestion:
- add "block" and "timeout" options to both
close
andqueue
- for
queue
, make it non-blocking by default - for
close
, probably blocking with a smallish timeout (<10s?)
@@ -45,7 +45,7 @@ def emit(self, record): | |||
self.format(record) | |||
|
|||
# Avoid typical config issues by overriding loggers behavior | |||
if record.name.startswith("elasticapm.errors"): | |||
if record.name.startswith(("elasticapm.errors",)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's this for?
elasticapm/transport/base.py
Outdated
self._closed = True | ||
self.queue("close", None) | ||
if self._event_process_thread.is_alive(): | ||
self._event_process_thread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless you ensure the queue above didn't drop, this could block forever. If you do add a timeout to close
, I think you would need to measure how long the queue
call takes, and subtract that from the timeout that you then pass to join
.
elasticapm/transport/base.py
Outdated
timed_out = True | ||
|
||
if event_type == "close": | ||
self.flush(sync=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the flush
value sent through the queue, as for other events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When closing, we always want to flush the queue. Or at least I don't see a scenario where you wouldn't want to do that
elasticapm/transport/base.py
Outdated
def flush(self): | ||
""" | ||
Trigger a flush of the queue. | ||
Note: the flush happens asynchronously in a background thread, which means that the queue won't be immediately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Go agent I make the Flush method block until the server has sent the queued events to the server, successfully or not. I did this primarily to make testing simpler and more robust. The way I did it is roughly this:
- when
flush
is called, check if the transport has aflushed
event attribute already. If not, set a new one, and then send the flush request to the queue and wait on the event to be signalled - when the server has finished processing the flush request, signal the event and clear the
flushed
event field
You might want to do this eventually, otherwise you'll probably run into intermittent failures on slow CI machines due to the sleeps.
also, don't send the data in a background thread, as we already are in a thread when sending (in the event processor thread). This reduces the amount of threads that we spawn, but it requires a bigger event queue.
Instead of waking up the waiting thread for every single element, it waits some time to trigger the event. This can reduce the amount of context switches considerably.
self.not_full.wait(remaining) | ||
self._put(item) | ||
self.unfinished_tasks += 1 | ||
if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this logic could lead to the consumer being blocked forever.
e.g. (with the constructor's default args) send 99 events in <1s; a blocked get()
call with no timeout will never be notified/unblocked.
I'm not sure how expensive timers are in Python - would it be viable to start a timer in the case that the event is not notified, for the remaining chill time? And if an event comes in in the mean time, restart the timer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this is not ideal, but I think the trade-off justifies it (a lot less CPU usage). This worst case scenario would only happen if the user set api_request_time
to None
, and in that case we already have a scenario where collected data can stay in limbo indefinitely. As soon as api_request_time
is set, the processing thread will be woken up when the timeout expires.
Unfortunately, Python doesn't have a "native" way to set timers. There's threading.Timer
, but that spawns up a new OS thread, which comes with all the baggage of threads: increased memory use, increased context switches, locking issues etc.
def flush(self): | ||
""" | ||
Trigger a flush of the queue. | ||
Note: this method will only return once the queue is empty. This means it can block indefinitely if more events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is only meant for tests, but FWIW: in the Go agent, flush just flushes the currently enqueued events. When a flush command is received by the background thread, it gets the current size of the queue and drains it by that many events. Even that's not necessary here because you have a single queue (whereas in Go we have one per event type, and a separate event/condition variable for flushing.)
we're observing some flakiness with this test, hopefully this will resolve that issue
Using this queue, we can avoid that any other thread than the processing
thread will access the compressed StringIO queue, making coarse blocking
unnecessary. This should avoid dead locks.