Skip to content

Commit

Permalink
Add async config block & concurrent readers to UDP input operator (op…
Browse files Browse the repository at this point in the history
…en-telemetry#27647)

**Description:** adding a feature - Adding asynchronous & concurrency
mode to the UDP receiver/stanza input operator - goal is to reduce UDP
packet loss in high-scale scenarios.
Added 'async' block that holds 'FixedAReaderRoutineCount' field - it
determines how many concurrent readers will read from the UDP port,
process logs, and send them downstream.

**Link to tracking Issue:** 27613

**Testing:** Local stress tests ran all types of config (no 'async',
with empty 'async', with 'async' that contains
FixedAReaderRoutineCount=2).
In repo, added single test to udp_test, config_test (in stanza udp
operator), and udp_test (in udplogreceiver).

**Documentation:** Updated md file for both udplogreceiver & stanza
udp_input operator with the new flags.

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
2 people authored and JaredTan95 committed Oct 18, 2023
1 parent 2a0e096 commit e609f9b
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 48 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add-async-block-concurrent-udp-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to run udp logs receiver (and stanza udp input operator) concurrently to reduce data-loss during high-scale scenarios

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27613]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
13 changes: 12 additions & 1 deletion pkg/stanza/docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ The `udp_input` operator listens for logs from UDP packets.
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
| `async` | nil | An `async` configuration block. See below for details. |

#### `multiline` configuration

Expand Down Expand Up @@ -45,6 +46,16 @@ Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

#### `async` configuration

If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.

**note** If `async` is not set at all, a single thread will read lines synchronously.

| Field | Default | Description |
| --- | --- | --- |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |

### Example Configurations

#### Simple
Expand Down
15 changes: 15 additions & 0 deletions pkg/stanza/operator/input/udp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ func TestUnmarshal(t *testing.T) {
return cfg
}(),
},
{
Name: "all_with_async",
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.ListenAddress = "10.0.0.1:9000"
cfg.AddAttributes = true
cfg.Encoding = "utf-8"
cfg.SplitConfig.LineStartPattern = "ABC"
cfg.SplitConfig.LineEndPattern = ""
cfg.AsyncConfig = NewAsyncConfig()
cfg.AsyncConfig.Readers = 2
return cfg
}(),
},
},
}.Run(t)
}
10 changes: 10 additions & 0 deletions pkg/stanza/operator/input/udp/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ all:
multiline:
line_start_pattern: ABC
line_end_pattern: ""
all_with_async:
type: udp_input
listen_address: 10.0.0.1:9000
add_attributes: true
encoding: utf-8
multiline:
line_start_pattern: ABC
line_end_pattern: ""
async:
readers: 2
89 changes: 57 additions & 32 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ type Config struct {
BaseConfig `mapstructure:",squash"`
}

type AsyncConfig struct {
Readers int `mapstructure:"readers,omitempty"`
}

// NewAsyncConfig creates a new AsyncConfig with default values.
func NewAsyncConfig() *AsyncConfig {
return &AsyncConfig{
Readers: 1,
}
}

// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
Expand All @@ -66,6 +77,7 @@ type BaseConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
AsyncConfig *AsyncConfig `mapstructure:"async,omitempty"`
}

// Build will build a udp input operator.
Expand Down Expand Up @@ -101,6 +113,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
resolver = helper.NewIPResolver()
}

if c.AsyncConfig == nil {
c.AsyncConfig = NewAsyncConfig()
}

if c.AsyncConfig.Readers <= 0 {
return nil, fmt.Errorf("async readers must be greater than 0")
}

udpInput := &Input{
InputOperator: inputOperator,
address: address,
Expand All @@ -110,6 +130,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
splitFunc: splitFunc,
resolver: resolver,
OneLogPerPacket: c.OneLogPerPacket,
AsyncConfig: c.AsyncConfig,
}
return udpInput, nil
}
Expand All @@ -121,6 +142,7 @@ type Input struct {
address *net.UDPAddr
addAttributes bool
OneLogPerPacket bool
AsyncConfig *AsyncConfig

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -148,44 +170,47 @@ func (u *Input) Start(_ operator.Persister) error {

// goHandleMessages will handle messages from a udp connection.
func (u *Input) goHandleMessages(ctx context.Context) {
u.wg.Add(1)

go func() {
defer u.wg.Done()

dec := decode.New(u.encoding)
buf := make([]byte, 0, MaxUDPSize)
for {
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
u.Errorw("Failed reading messages", zap.Error(err))
}
break
}
for i := 0; i < u.AsyncConfig.Readers; i++ {
u.wg.Add(1)
go u.readAndProcessMessages(ctx)
}
}

if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
continue
func (u *Input) readAndProcessMessages(ctx context.Context) {
defer u.wg.Done()

dec := decode.New(u.encoding)
buf := make([]byte, 0, MaxUDPSize)
for {
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
u.Errorw("Failed reading messages", zap.Error(err))
}
break
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)
if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
continue
}

scanner.Split(u.splitFunc)
scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
scanner.Split(u.splitFunc)

for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
}
}()
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
}
}

func truncateMaxLog(data []byte) (token []byte) {
Expand Down
20 changes: 12 additions & 8 deletions pkg/stanza/operator/input/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func udpInputTest(input []byte, expected []string) func(t *testing.T) {
func udpInputTest(input []byte, expected []string, cfg *Config) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewConfigWithID("test_input")
cfg.ListenAddress = ":0"

op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)

Expand Down Expand Up @@ -138,10 +135,17 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T)
}

func TestInput(t *testing.T) {
t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}))
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}))
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}))
cfg := NewConfigWithID("test_input")
cfg.ListenAddress = ":0"

t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}, cfg))
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg))

cfg.AsyncConfig = NewAsyncConfig()
cfg.AsyncConfig.Readers = 2
t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
}

func TestInputAttributes(t *testing.T) {
Expand Down
13 changes: 12 additions & 1 deletion receiver/udplogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ Receives logs over UDP.
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][hhttps://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
| `async` | nil | An `async` configuration block. See below for details. |

### Operators

Expand Down Expand Up @@ -69,6 +70,16 @@ Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

#### `async` configuration

If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.

**note** If `async` is not set at all, a single thread will read lines synchronously.

| Field | Default | Description |
| --- | --- | --- |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |

## Example Configurations

### Simple
Expand Down
21 changes: 15 additions & 6 deletions receiver/udplogreceiver/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,19 @@ import (
)

func TestUdp(t *testing.T) {
testUDP(t, testdataConfigYaml())
listenAddress := "127.0.0.1:29018"
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
}

func testUDP(t *testing.T, cfg *UDPLogConfig) {
func TestUdpAsync(t *testing.T) {
listenAddress := "127.0.0.1:29019"
cfg := testdataConfigYaml(listenAddress)
cfg.InputConfig.AsyncConfig = udp.NewAsyncConfig()
cfg.InputConfig.AsyncConfig.Readers = 2
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
}

func testUDP(t *testing.T, cfg *UDPLogConfig, listenAddress string) {
numLogs := 5

f := NewFactory()
Expand All @@ -38,7 +47,7 @@ func testUDP(t *testing.T, cfg *UDPLogConfig) {
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))

var conn net.Conn
conn, err = net.Dial("udp", "127.0.0.1:29018")
conn, err = net.Dial("udp", listenAddress)
require.NoError(t, err)

for i := 0; i < numLogs; i++ {
Expand Down Expand Up @@ -78,17 +87,17 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, testdataConfigYaml(), cfg)
assert.Equal(t, testdataConfigYaml("127.0.0.1:29018"), cfg)
}

func testdataConfigYaml() *UDPLogConfig {
func testdataConfigYaml(listenAddress string) *UDPLogConfig {
return &UDPLogConfig{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
},
InputConfig: func() udp.Config {
c := udp.NewConfig()
c.ListenAddress = "127.0.0.1:29018"
c.ListenAddress = listenAddress
return *c
}(),
}
Expand Down

0 comments on commit e609f9b

Please sign in to comment.