-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[exporterhelper] Awkwardness due to API between queue sender and batch sender #10368
Comments
We still need to limit concurrency in the queue or batch configuration. Otherwise, the queue becomes useless. From the user configuration perspective, I think we should provide: batcher:
# The maximum number of batches that can be exported concurrently.
# This can only be used with enabled sending_queue.
max_concurrency: If this option is defined and both sending_queue and batcher are enabled, the As you mentioned, this will complicate the contract between the queue and batcher senders. I think we can move the queue consumers into some sort of another component which will be replaced by the batcher if |
@moh-osman3 and I studied the same problem -- we want to avoid use of a queue_sender but we want concurrency in the batcher. I reason that another user will come along asking not to use the queue sender, not to use the batch sender, and still want concurrency. Therefore, I propose we try to (and @moh-osman3 has been investigating how to) add a new sender stage purely for concurrency control. In such a future, the existing queue_sender num_consumers I think should dictate how many workers perform the cpu and I/O intensive work of gathering items from the queue, not dictate how many export threads are used. |
@dmitryax @carsonip This PR #10478 shows an implementation of the concurrency_sender which will limit the number of goroutines used for export. This removes the concurrenyLimit from the batch_sender so that batch exports are only triggered by size/timeout and the concurrency is limited by the new sender independent of queue_sender.num_consumers. Any thoughts on this approach? |
@jmacd, I just wanted to clarify that the existing implementation of the batcher sender is concurrent. There is no limit if queue sender is not enabled. This issue is about limiting the concurrency of the batcher sender. |
I believe the solution should be to change API interface between queue and batch from pushing queue->batcher to pulling queue<-batcher. So if queue and batcher are enabled, we treat batcher as queue consumers. The number of concurrent batchers should be configurable, similar to queue consumers it'll represent the maximum amount of outgoing requests. The workflow would be the following:
The maximum number of cuncurrent batchers can be configured by the same If the queue isn't enabled, the batch workers would listen for incoming requests and, in a similar way, control the concurrency of outgoing requests. |
@dmitryax that sounds good to me. I wrote down related thoughts at #10478 (comment). Re |
|
I think we can start with the same default value for the |
Ah ok, I understand now. Sounds good. FWIW we took that approach in an earlier version of Elastic APM and then switched to having multiple concurrently polling queue consumers to make better use of CPUs. It may not be relevant here, since in that case the queue consumer was doing more CPU-heavy work (JSON document encoding) than the batcher would be doing. (And the reason we did it there was so the bytes-based threshold could be based on the compressed total encoded documents size.) |
Hmm why can we only have one active batcher? Why shouldn't all batchers read from the queue and try to form batches? |
Because we can't get consistent batching in that case. Let's say we have 200 items in the queue, and the minimum batch size is configured to be 100. The expected behavior is to get 2 batches of 100 items each sent out right away. However, if we have, let's say, 4 batchers reading from the queue concurrently, they can get smaller batches like 50,50,50,50 and be blocked until the timeout is reached. Once the timeout is passed, they will send 4 batches of 50 items. |
@sfc-gh-sili is currently helping with this issue. She prepared a design doc based on our discussions: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit#heading=h.ypo1293baglx Feel free to take a look and comment |
@dmitryax @sfc-gh-sili Thanks for the design doc, it looks good at a high level. Moving batching into queue sender when sending queue is enabled makes sense to me, and it should fix the awkwardness described in this issue, despite a little bit of extra complexity to handle both sending_queue={false,true}. I am happy to review the PRs when they are ready. |
…11042) #### Description This PR moves `exporter/exporterhelper/request.go` to `exporter/internal/request.go` to avoid circular dependency. Context: As part of the effort to move from a queue->batch pushing model to a queue->batch pulling model in exporter, we will be depending on `Request` from `exporter/exporterbatcher`. However, `exporter/exporterhelper` already depends on `exporter/exporterbatcher`, so we need to move `Request` out of `exporter/exporterhelper` #### Link to tracking issue #10368 #### Testing Ran `opentelemetry-collector$ make` to make sure all tests still pass.
…guration (#11041) #### Description This PR changes initialization of `batchSender` and `queueSender` to AFTER configuration. That way we get to access `queueConfig` and `batcherConfig` in the same place. Context: This is some pre-work for changing queue->batch from a pushing model to a pulling model. We will be initialization a `queueBatchSender(queueConfig, batcherConfig)` if both queue and batcher are enabled and initialize `batchSender(batchConfig)` if only batcher is enabled. This change enables us to achieve the goal without changing config API. #### Link to tracking issue #10368 #### Testing Ran `opentelemetry-collector$ make` to make sure all tests still pass. Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
…guration - #2 (#11240) This PR follows #11041. The previous PR changed the initialization of `batchSender` and `queueSender` to AFTER configuration, because that enables us to access `queueConfig` and `batcherConfig` in the same place. I noticed since then that there is another API for queue configuration, and this PR takes care of that other API #### Link to tracking issue #10368 #### Testing Ran `opentelemetry-collector$ make` to make sure all tests still pass.
… to a class function instead of a callback (#11338) #### Description Why this change? Each request from the queue contains multiple items, and those items could be merge-split into multiple batches when they are sent out (see #8122 for more about exporter batcher). We would like to book-keep those cases, and only call `onProcessingFinished` when all such batches has gone out. In this PR, `onProcessingFinished` is changed from a callback to a method function because it is easier to book keep index instead of functions. #### Link to tracking issue #8122 #10368 #### Testing `exporter/internal/queue/persistent_queue_test.go` #### Documentation This is an internal change invisible to the users. --------- Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
…guration - #2 (open-telemetry#11240) This PR follows open-telemetry#11041. The previous PR changed the initialization of `batchSender` and `queueSender` to AFTER configuration, because that enables us to access `queueConfig` and `batcherConfig` in the same place. I noticed since then that there is another API for queue configuration, and this PR takes care of that other API #### Link to tracking issue open-telemetry#10368 #### Testing Ran `opentelemetry-collector$ make` to make sure all tests still pass.
…tdown (open-telemetry#11666) This PR changes exporter queue batcher to flush the current batch on shutdown. open-telemetry#10368 open-telemetry#8122
…tdown (open-telemetry#11666) This PR changes exporter queue batcher to flush the current batch on shutdown. open-telemetry#10368 open-telemetry#8122
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR adds a public function `GetNextItem` to queue (both persistent queue and bounded memory queue) Why this change? Instead of blocking until consumption of the item is done, we would like to separate the API for reading and committing consumption. Before: `Consume(consumeFunc)` After: `idx, item = Read()` `OnProcessingFinished(idx)` <!-- Issue number if applicable --> #### Link to tracking issue open-telemetry#8122 open-telemetry#10368 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
… that operates on batch sender (open-telemetry#11448) #### Description As part of the effort to solve open-telemetry#10368, we no longer guarantee to initialize a `batchSender` when `batcher` is enabled. Therefore, we would like to remove the interface to set `mergeFunc` and `mergeSplitFunc` as a callback that operates on `batchSender`. Instead, users should use the alternative `WithBatchFuncs` that is a callback that operates `baseExporter`. Context: open-telemetry#11414 #### Link to tracking issue open-telemetry#8122 open-telemetry#10368 --------- Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
…n-telemetry#11532) #### Description This PR is a bare minimum implementation of a component called queue batcher. On completion, this component will replace `consumers` in `queue_sender`, and thus moving queue-batch from a pulling model instead of pushing model. Limitations of the current code * This implements only the case where batching is disabled, which means no merge of splitting of requests + no timeout flushing. * This implementation does not enforce an upper bound on concurrency All these code paths are marked as panic currently, and they will be replaced with actual implementation in coming PRs. This PR is split from open-telemetry#11507 for easier review. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…elemetry#11540) #### Description This PR follows open-telemetry#11532 and implements support for limited worker pool for queue batcher. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…d exports (open-telemetry#11546) #### Description This PR follows open-telemetry#11540 and implements support for item-count based batching for queue batcher. Limitation: This PR supports merging request but not splitting request. In other words, it support specifying a minimum request size but not a maximum request size. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…batcher (open-telemetry#11580) #### Description This PR follows open-telemetry#11546 and add support for splitting (i.e. support setting a maximum request size) Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…equest.export() (open-telemetry#11636) #### Description This PR changes queue batcher to use `exportFunc` instead of `request.export()`. This makes testing easier and avoid passing unnecessary detail to the exporter batcher. #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…tdown (open-telemetry#11666) #### Description This PR changes exporter queue batcher to flush the current batch on shutdown. #### Link to tracking issue open-telemetry#10368 open-telemetry#8122
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR proceeds #11637. It * Introduces a noop feature gate that will be used for queue batcher. * Updates exporter tests to run with both the feature gate on and off. <!-- Issue number if applicable --> #### Link to tracking issue #10368 #8122 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR solves #10368. Previously we use a pushing model between the queue and the batch, resulting the batch size to be constrained by the `sending_queue.num_consumers`, because the batch cannot accumulate more than `sending_queue.num_consumers` blocked goroutines provide. This PR changes it to a pulling model. We read from the queue until threshold is met or timeout, then allocate a worker to asynchronously send out the request. <!-- Issue number if applicable --> #### Link to tracking issue Fixes #10368 #8122 --------- Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
…guration - open-telemetry#2 (open-telemetry#11240) This PR follows open-telemetry#11041. The previous PR changed the initialization of `batchSender` and `queueSender` to AFTER configuration, because that enables us to access `queueConfig` and `batcherConfig` in the same place. I noticed since then that there is another API for queue configuration, and this PR takes care of that other API #### Link to tracking issue open-telemetry#10368 #### Testing Ran `opentelemetry-collector$ make` to make sure all tests still pass.
… to a class function instead of a callback (open-telemetry#11338) #### Description Why this change? Each request from the queue contains multiple items, and those items could be merge-split into multiple batches when they are sent out (see open-telemetry#8122 for more about exporter batcher). We would like to book-keep those cases, and only call `onProcessingFinished` when all such batches has gone out. In this PR, `onProcessingFinished` is changed from a callback to a method function because it is easier to book keep index instead of functions. #### Link to tracking issue open-telemetry#8122 open-telemetry#10368 #### Testing `exporter/internal/queue/persistent_queue_test.go` #### Documentation This is an internal change invisible to the users. --------- Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR adds a public function `GetNextItem` to queue (both persistent queue and bounded memory queue) Why this change? Instead of blocking until consumption of the item is done, we would like to separate the API for reading and committing consumption. Before: `Consume(consumeFunc)` After: `idx, item = Read()` `OnProcessingFinished(idx)` <!-- Issue number if applicable --> #### Link to tracking issue open-telemetry#8122 open-telemetry#10368 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
… that operates on batch sender (open-telemetry#11448) #### Description As part of the effort to solve open-telemetry#10368, we no longer guarantee to initialize a `batchSender` when `batcher` is enabled. Therefore, we would like to remove the interface to set `mergeFunc` and `mergeSplitFunc` as a callback that operates on `batchSender`. Instead, users should use the alternative `WithBatchFuncs` that is a callback that operates `baseExporter`. Context: open-telemetry#11414 #### Link to tracking issue open-telemetry#8122 open-telemetry#10368 --------- Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
…n-telemetry#11532) #### Description This PR is a bare minimum implementation of a component called queue batcher. On completion, this component will replace `consumers` in `queue_sender`, and thus moving queue-batch from a pulling model instead of pushing model. Limitations of the current code * This implements only the case where batching is disabled, which means no merge of splitting of requests + no timeout flushing. * This implementation does not enforce an upper bound on concurrency All these code paths are marked as panic currently, and they will be replaced with actual implementation in coming PRs. This PR is split from open-telemetry#11507 for easier review. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…elemetry#11540) #### Description This PR follows open-telemetry#11532 and implements support for limited worker pool for queue batcher. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…d exports (open-telemetry#11546) #### Description This PR follows open-telemetry#11540 and implements support for item-count based batching for queue batcher. Limitation: This PR supports merging request but not splitting request. In other words, it support specifying a minimum request size but not a maximum request size. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…batcher (open-telemetry#11580) #### Description This PR follows open-telemetry#11546 and add support for splitting (i.e. support setting a maximum request size) Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…equest.export() (open-telemetry#11636) #### Description This PR changes queue batcher to use `exportFunc` instead of `request.export()`. This makes testing easier and avoid passing unnecessary detail to the exporter batcher. #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
…tdown (open-telemetry#11666) #### Description This PR changes exporter queue batcher to flush the current batch on shutdown. #### Link to tracking issue open-telemetry#10368 open-telemetry#8122
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR proceeds open-telemetry#11637. It * Introduces a noop feature gate that will be used for queue batcher. * Updates exporter tests to run with both the feature gate on and off. <!-- Issue number if applicable --> #### Link to tracking issue open-telemetry#10368 open-telemetry#8122 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR solves open-telemetry#10368. Previously we use a pushing model between the queue and the batch, resulting the batch size to be constrained by the `sending_queue.num_consumers`, because the batch cannot accumulate more than `sending_queue.num_consumers` blocked goroutines provide. This PR changes it to a pulling model. We read from the queue until threshold is met or timeout, then allocate a worker to asynchronously send out the request. <!-- Issue number if applicable --> #### Link to tracking issue Fixes open-telemetry#10368 open-telemetry#8122 --------- Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
Is your feature request related to a problem? Please describe.
Related to batch sender in #8122
Currently, queue sender and batch sender work together fine, but the config
sending_queue.num_consumers
and batch senderconcurrencyLimit
make it a little awkward to use. The awkwardness comes from the fact that it is impossible for batch sender to get more requests than queue sender's num_consumers, and may be forced to export a smaller batch due to it.I am also not sure if this presents a performance problem at scale, which means a lot of goroutines will be involved.
The batch sender concurrency check has also been prone to unfavorable goroutine scheduling, as reported in #9952 .
Describe the solution you'd like
Ideally, queue sender can keep sending to batch sender without an artificial limit, such that batching is only triggered by min_size_items and flush_timeout.
I don't have an idea how to implement this yet. Also, this may require a change in contract between exporterhelper parts.
The text was updated successfully, but these errors were encountered: