-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async/concurrency to udp receiver (stanza udp input operator) #27620
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we take these changes incrementally?
I suggest first we define an async
config block and within this define the number of concurrent readers.
As a separate PR, we can look at decoupling reading packets from processing.
Finally, we can look at the shutdown time.
| `async_concurrent_mode` | false | Determines whether UDP receiver processes messages synchronsouly or asynchronsouly and concurrently. | | ||
| `fixed_async_reader_routine_count` | 1 | Concurrency level - Determines how many go routines read from UDP port and write to channel (relevant only if async_concurrent_mode==true). | | ||
| `fixed_async_processor_routine_count` | 1 | Concurrency level - Determines how many go routines read from channel, process (split, add attributes, etc.) and write downstream (relevant only if async_concurrent_mode==true). | | ||
| `max_async_queue_length` | 1000 | Determines max length of channel being used by reader async routines. When channel reaches max number, reader routine will wait until channel has room (relevant only if async_concurrent_mode==true). | | ||
| `max_graceful_shutdown_time_in_ms` | 1000 | During shutdown, determines how long the processor go routine will continue reading from channel and pushing downstream (reader routine will stop reading from UDP port immediately on when shutdown is requested; relevant only if async_concurrent_mode==true). | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the close relationship between these settings, can we group them into one struct?
I think async_concurrent_mode
can be inferred from whether or not the struct is specified:
receivers:
udplog:
...
udplog/async_default:
...
async: # with default async settings
udplog/async_custom:
...
async:
... # custom settings
Thanks. Actually, I think decoupling of reader and processor (in other words, making it asynchronous) is more important than making it concurrent (multiple readers/processors) due to the channel that can be defined to be pretty big. Stress tests indicated that as well. In other words, if there's short term latency downstream (in the exporter, for example), having multiple readers won't help too much. |
I'm sold on the first two steps. I think the order I mentioned will help me review the changes. The shutdown delay is likely reasonable but it's not something the component can guarantee. It's probably reasonable to add a best effort attempt up to a certain threshold but I'd like to consider this separately. |
Description: Adding a feature - Adding asynchronous & concurrency mode to the UDP receiver/stanza input operator - goal is to reduce UDP packet loss in high-scale scenarios.
Added 5 new flags to the config: AsyncConcurrentMode, FixedAsyncReaderRoutineCount, FixedAsyncProcessorRoutineCount, MaxAsyncQueueLength, MaxGracefulShutdownTimeInMS.
Goal:
Since the underlying computer's network buffer is limited and there's no handshake in UDP (that would cause the sender to reduce rate, as in TCP), even short term latency on the otel-collector's side (for example, the endpoint receiving logs from the otel-exporter has increased latency) causes UDP packet drop in high scale scenarios (and as a result, data loss).
This feature allows the consumer to load data into the memory (instead of the limited network buffer), so short term latency or issues downstream, will not quickly result in data loss.
Also, since receiver can be configured to perform split & add attributes operations on each packet, it improves performance since it separates the threads that read from UDP from the threads that process (before sending the packets downstream). It allows to raise concurrency level of either thread reader routines & the processor routines, depending on scenario.
Link to tracking Issue: #27613
Testing: Local stress tests ran and indicated data-loss issue was solved (during high scale scenarios), when MaxAsyncQueueLength was set sufficiently high (+AsyncConcurrentMode==true).
In repo, added single test to udp_test, config_test (in stanza udp operator), and udp_test (in udplogreceiver).
Documentation: Updated md file for both udplogreceiver & stanza udp_input operator with the new flags.