Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fbreceiver] - Fix batcher's configuration #42797

Merged
merged 8 commits into from
Feb 21, 2025

Conversation

VihasMakwana
Copy link
Contributor

@VihasMakwana VihasMakwana commented Feb 20, 2025

Proposed commit message

As per batcher's documentation, there are two configurations related to flushing:

  • min_size_items
    • Minimum number of log records/metric points in a request to immediately trigger a batcher flush.
    • For eg. if a plog.Log has more records than min_size_items then it is immediately forwarded to the exporter.
  • max_size_items
    • Maximum log records/metric points per request.
    • For eg. if a plog.Log has more records than max_size_items then it is split into multiple requests.
    • This doesn't necessarily mean that it will be sent to exporter. That decision will be based on flush timeout or min_size_items.

This PR sets min_size_items to 0. This ensures both of the following conditions are satisfied:

  1. No plog.Log contains more than bulk_max_size items.
  2. Batcher flushes the data as soon it gets it

@VihasMakwana VihasMakwana added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Feb 20, 2025
@VihasMakwana VihasMakwana self-assigned this Feb 20, 2025
@VihasMakwana VihasMakwana requested a review from a team as a code owner February 20, 2025 09:23
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Feb 20, 2025
Copy link
Contributor

mergify bot commented Feb 20, 2025

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @VihasMakwana? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@VihasMakwana VihasMakwana added backport-8.x Automated backport to the 8.x branch with mergify backport-9.0 Automated backport to the 9.0 branch labels Feb 20, 2025
@VihasMakwana VihasMakwana added backport-active-all Automated backport with mergify to all the active branches and removed backport-8.x Automated backport to the 8.x branch with mergify backport-9.0 Automated backport to the 9.0 branch labels Feb 20, 2025
@khushijain21
Copy link
Contributor

khushijain21 commented Feb 20, 2025

I think the correct parameter to map min_size_items on the exporter is flush.min_events.

They immediately trigger a flush based on minimum events/records set.

For more info https://www.elastic.co/guide/en/beats/filebeat/current/configuring-internal-queue.html#queue-mem-flush-min-events-option

@VihasMakwana
Copy link
Contributor Author

VihasMakwana commented Feb 20, 2025

flush.min_events

I initially did it, but as per the same document you linked:

flush.min_events is a legacy parameter, and new configurations should prefer to control batch size with bulk_max_size. 
As of 8.13, there is never a performance advantage to limiting batch size with flush.min_events instead of bulk_max_size.

Also, flush.min_events is used when we set presets. To avoid all this I would rather set this on batcher.

There is one more problem which is solved by this PR (I'll update description):

  • default batcher min_size_items is 5000 and we set max_size_items to 1600 (same as filebeat bulk_max_size).
  • This essentially means that no batch will have more than 1600 records and as a result, it will not be flushed until flush timeout is reached because minimum required size is 5000 (and flush timeout is 30s for batcher)

i hope that makes sense.

@mauri870
Copy link
Member

@carsonip Would be great to get your opinion on this change.

@khushijain21
Copy link
Contributor

This essentially means that no batch will have more than 1600 records and as a result, it will not be flushed until flush timeout is reached because minimum required size is 5000 (and flush timeout is 30s for batcher)

The batcher validates that max_size_items is greater than or equal to min_size_items so we never run into this situation.
https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go#L53-L55

Also, flush.min_events is used when we set presets. To avoid all this I would rather set this on batcher.

I agree with you. Since every event has to go through two queues before reaching ES. But setting min_size_items equal to max_size_items does not seem the correct thing to do. I'll wait for what others have to say.

Copy link
Member

@carsonip carsonip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the ping and PR lgtm.
2 things to note, both are irrelevant to the PR

  • one should NOT use es exporter batcher::max_size_items or batchprocessor's send_batch_max_size in metrics pipelines as it may break down metrics requests which would cause issues with TSDB. In this PR it is a log pipeline and it is fine.
  • check out the es exporter flush::bytes config. It is respected regardless of batcher settings. If 1 batch is greater than flush::bytes (default ~5MB) it will be broken down into 2 bulk requests

@VihasMakwana
Copy link
Contributor Author

This essentially means that no batch will have more than 1600 records and as a result, it will not be flushed until flush timeout is reached because minimum required size is 5000 (and flush timeout is 30s for batcher)

The batcher validates that max_size_items is greater than or equal to min_size_items so we never run into this situation. https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go#L53-L55

Also, flush.min_events is used when we set presets. To avoid all this I would rather set this on batcher.

I agree with you. Since every event has to go through two queues before reaching ES. But setting min_size_items equal to max_size_items does not seem the correct thing to do. I'll wait for what others have to say.

This is the bug in upstream exporter. It never validates the batcher configuration and we never run into any error. min_size_items is set to 5000 and we set max_size_items to 1600. We should run into this validation error, but we don't

I'm moving this PR to draft for now until I fix upstream.

@carsonip
Copy link
Member

carsonip commented Feb 20, 2025

But setting min_size_items equal to max_size_items does not seem the correct thing to do. I'll wait for what others have to say.

if min_size_items == max_size_items, the (first) batch will always be of that size if it isn't triggered by batcher::flush_timeout.

@carsonip
Copy link
Member

carsonip commented Feb 20, 2025

On a side note, setting min_size_items == max_size_items may be inefficient, as splitting a request and exporting them separately will incur some overhead (think about 2 bulk requests vs 1). e.g. if the current batched request has 1599 records, and the next request has 2 records, I assume that it will trigger a flush with a request of 1600 records and another request of 1 record, which will make it very inefficient. Would be good to have someone to confirm this.

@VihasMakwana
Copy link
Contributor Author

@VihasMakwana VihasMakwana marked this pull request as draft February 20, 2025 12:22
@VihasMakwana
Copy link
Contributor Author

On a side note, setting min_size_items == max_size_items may be inefficient, as splitting a request and exporting them separately will incur some overhead (think about 2 bulk requests vs 1). e.g. if the current batched request has 1599 records, and the next request has 2 records, I assume that it will trigger a flush with a request of 1600 records and another request of 1 record, which will make it very inefficient. Would be good to have someone to confirm this.

You're right. Can you please take a look at open-telemetry/opentelemetry-collector-contrib#38072? I'd like to get that merged before continuing on this one.

@VihasMakwana VihasMakwana marked this pull request as ready for review February 20, 2025 13:31
@@ -149,6 +149,7 @@ func ToOTelConfig(output *config.C) (map[string]any, error) {
"batcher": map[string]any{
"enabled": true,
"max_size_items": escfg.BulkMaxSize, // bulk_max_size
"min_size_items": 0, // 0 means immediately trigger a flush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you test this out? I've always avoided setting otel configs to 0, because they will not override any non-0 default. In this case, I'm not sure if setting it explicitly as 0 will override the default 5000. With your PR in open-telemetry/opentelemetry-collector-contrib#38072 you should be able to tell it right away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance testing is currently in progress. It looks good and EPS is similar to native filebeat. I'll manually test it as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carsonip attaching a few screenshots for better transparency:

  1. collector run with min_size_items: 0
Screenshot 2025-02-21 at 2 20 57 PM
  1. collector run while skipping min_size_items (it defaults to 5000 in this case)
Screenshot 2025-02-21 at 2 21 11 PM

@pierrehilbert
Copy link
Collaborator


=== FAIL: libbeat/otelbeat/oteltranslate/outputs/elasticsearch TestToOtelConfig/check_preset_config_translation/config_translation_w/latency (0.00s)
--
  | config_otel_test.go:198:
  | Error Trace:	/opt/buildkite-agent/builds/bk-agent-prod-gcp-1740064094203004939/elastic/beats-libbeat/libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go:223
  | /opt/buildkite-agent/builds/bk-agent-prod-gcp-1740064094203004939/elastic/beats-libbeat/libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go:198
  | Error:      	Not equal:
  | expected: "api_key: \"\"\nbatcher:\n  enabled: true\n  max_size_items: 50\n  min_size_items: 50\nendpoints:\n- http://localhost:9200\nidle_conn_timeout: 1m0s\nlogs_index: some-index\nnum_workers: 1\npassword: changeme\nretry:\n  enabled: true\n  initial_interval: 1s\n  max_interval: 1m0s\n  max_retries: 3\ntimeout: 1m30s\nuser: elastic\n"
  | actual  : "api_key: \"\"\nbatcher:\n  enabled: true\n  max_size_items: 50\n  min_size_items: 0\nendpoints:\n- http://localhost:9200\nidle_conn_timeout: 1m0s\nlogs_index: some-index\nnum_workers: 1\npassword: changeme\nretry:\n  enabled: true\n  initial_interval: 1s\n  max_interval: 1m0s\n  max_retries: 3\ntimeout: 1m30s\nuser: elastic\n"


@VihasMakwana
Copy link
Contributor Author

@khushijain21 @mauri870 can you take a look now? I've update PR description with new set of changes. We now set min_size_items: 0. This would immediately flush the data as soon as it gets it.

Let me know if you have any concerns. Happy to help you out.

@VihasMakwana
Copy link
Contributor Author

/test

@VihasMakwana VihasMakwana merged commit 2f3df16 into elastic:main Feb 21, 2025
143 checks passed
mergify bot pushed a commit that referenced this pull request Feb 21, 2025
* fix: batcher min_size_items

* update tests

* set min_size_items to 0

* fix tests

* fix tests

* fix test

(cherry picked from commit 2f3df16)

# Conflicts:
#	libbeat/otelbeat/beatconverter/beatconverter_test.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go
mergify bot pushed a commit that referenced this pull request Feb 21, 2025
* fix: batcher min_size_items

* update tests

* set min_size_items to 0

* fix tests

* fix tests

* fix test

(cherry picked from commit 2f3df16)

# Conflicts:
#	libbeat/otelbeat/beatconverter/beatconverter_test.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go
mergify bot pushed a commit that referenced this pull request Feb 21, 2025
* fix: batcher min_size_items

* update tests

* set min_size_items to 0

* fix tests

* fix tests

* fix test

(cherry picked from commit 2f3df16)

# Conflicts:
#	libbeat/otelbeat/beatconverter/beatconverter_test.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go
mergify bot pushed a commit that referenced this pull request Feb 21, 2025
* fix: batcher min_size_items

* update tests

* set min_size_items to 0

* fix tests

* fix tests

* fix test

(cherry picked from commit 2f3df16)

# Conflicts:
#	libbeat/otelbeat/beatconverter/beatconverter_test.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go
mergify bot pushed a commit that referenced this pull request Feb 21, 2025
* fix: batcher min_size_items

* update tests

* set min_size_items to 0

* fix tests

* fix tests

* fix test

(cherry picked from commit 2f3df16)
mergify bot pushed a commit that referenced this pull request Feb 21, 2025
* fix: batcher min_size_items

* update tests

* set min_size_items to 0

* fix tests

* fix tests

* fix test

(cherry picked from commit 2f3df16)

# Conflicts:
#	libbeat/otelbeat/beatconverter/beatconverter_test.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel.go
#	libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-active-all Automated backport with mergify to all the active branches Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants