Skip to content

introduce an event queue and a processing thread into transports #411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from

Conversation

beniwohli
Copy link
Contributor

Using this queue, we can avoid that any other thread than the processing
thread will access the compressed StringIO queue, making coarse blocking
unnecessary. This should avoid dead locks.

Using this queue, we can avoid that any other thread than the processing
thread will access the compressed StringIO queue, making coarse blocking
unnecessary. This should avoid dead locks.
@@ -54,74 +53,93 @@ def __init__(
self._max_flush_time = max_flush_time
self._max_buffer_size = max_buffer_size
self._queued_data = None
self._queue_lock = threading.Lock()
self._event_queue = compat.queue.Queue(maxsize=100)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if this should be configurable.

@axw is it in the Go agent? Also, how did you determine the (default) size of the queue (or ring buffer in your case IIRC)

Copy link
Member

@axw axw Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separate queues for transactions, spans, and errors; mostly for type-safety. Each queue has a capacity of 1000. I don't think I was very scientific about the choice, just eyeballed a high-throughput test app to ensure it didn't drop a significant amount.

Originally I made them configurable, but the number of queues and buffers increased to the point that I thought it would be unhelpful to expose it initially. The Go agent has this initial queue, then a ring buffer, and then finally the gzip request buffer. The ring buffer and request buffers have a configurable sizes, specified in bytes.

transport.queue("error", "x", flush=True)
record = caplog.records[0]
assert "forced" in record.message
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it seems the caplog fixture has some difficulties in multithreaded environments (or I have difficulties using it correctly), so I had to remove some of these asserts

Copy link
Member

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. I think this will be easier to reason about, and moving the HTTP request sending to the background should reduce impact on application threads.

I found a few issues. I didn't look at the guts of the Transport.flush method in detail, I assume that hasn't changed other than removing the lock and timer, and just recording the last-flushed time. I also haven't looked at the test code, but can do later if you'd like.

queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
self._counts[event_type] += 1
if not self._event_process_thread.is_alive() and not self._closed:
self._event_process_thread.start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple callers of queue could race and end up here, and the second one will raise an exception. Maybe just start the thread in the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that might be an issue. I had it in the constructor at first, but that made testing extremely cumbersome.

Maybe a flag in the constructor, start_processing_thread that defaults to True could help with that.

try:
self._event_queue.put_nowait((event_type, data, flush))
except compat.queue.Full:
logger.warning("Event of type %s dropped due to full event queue", event_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, dropping events is normal operation (i.e. for high-throughput applications) and shouldn't log a warning. Maybe debug level.

FYI, this is where I would record that an event was dropped, and what was contributing to the contentiously-named agent.events.dropped metric (which I... dropped).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -135,7 +152,10 @@ def close(self):
Cleans up resources and closes connection
:return:
"""
self.flush(sync=True, start_flush_timer=False)
self._closed = True
self.queue("close", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like it might be a bad idea for the "close" event to be silently dropped. Or at least, it should be made optional. Suggestion:

  • add "block" and "timeout" options to both close and queue
  • for queue, make it non-blocking by default
  • for close, probably blocking with a smallish timeout (<10s?)

@@ -45,7 +45,7 @@ def emit(self, record):
self.format(record)

# Avoid typical config issues by overriding loggers behavior
if record.name.startswith("elasticapm.errors"):
if record.name.startswith(("elasticapm.errors",)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this for?

self._closed = True
self.queue("close", None)
if self._event_process_thread.is_alive():
self._event_process_thread.join()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you ensure the queue above didn't drop, this could block forever. If you do add a timeout to close, I think you would need to measure how long the queue call takes, and subtract that from the timeout that you then pass to join.

timed_out = True

if event_type == "close":
self.flush(sync=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the flush value sent through the queue, as for other events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When closing, we always want to flush the queue. Or at least I don't see a scenario where you wouldn't want to do that

def flush(self):
"""
Trigger a flush of the queue.
Note: the flush happens asynchronously in a background thread, which means that the queue won't be immediately
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Go agent I make the Flush method block until the server has sent the queued events to the server, successfully or not. I did this primarily to make testing simpler and more robust. The way I did it is roughly this:

  • when flush is called, check if the transport has a flushed event attribute already. If not, set a new one, and then send the flush request to the queue and wait on the event to be signalled
  • when the server has finished processing the flush request, signal the event and clear the flushed event field

You might want to do this eventually, otherwise you'll probably run into intermittent failures on slow CI machines due to the sleeps.

also, don't send the data in a background thread, as we already are in a thread when sending
(in the event processor thread). This reduces the amount of threads that we spawn, but it requires
a bigger event queue.
Instead of waking up the waiting thread for every single element, it
waits some time to trigger the event. This can reduce the amount of
context switches considerably.
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
if (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic could lead to the consumer being blocked forever.
e.g. (with the constructor's default args) send 99 events in <1s; a blocked get() call with no timeout will never be notified/unblocked.

I'm not sure how expensive timers are in Python - would it be viable to start a timer in the case that the event is not notified, for the remaining chill time? And if an event comes in in the mean time, restart the timer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is not ideal, but I think the trade-off justifies it (a lot less CPU usage). This worst case scenario would only happen if the user set api_request_time to None, and in that case we already have a scenario where collected data can stay in limbo indefinitely. As soon as api_request_time is set, the processing thread will be woken up when the timeout expires.

Unfortunately, Python doesn't have a "native" way to set timers. There's threading.Timer, but that spawns up a new OS thread, which comes with all the baggage of threads: increased memory use, increased context switches, locking issues etc.

def flush(self):
"""
Trigger a flush of the queue.
Note: this method will only return once the queue is empty. This means it can block indefinitely if more events
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is only meant for tests, but FWIW: in the Go agent, flush just flushes the currently enqueued events. When a flush command is received by the background thread, it gets the current size of the queue and drains it by that many events. Even that's not necessary here because you have a single queue (whereas in Go we have one per event type, and a separate event/condition variable for flushing.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants