Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async config block & concurrent readers to UDP input operator #27647

27 changes: 27 additions & 0 deletions .chloggen/add-async-block-concurrent-udp-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to run udp logs receiver (and stanza udp input operator) concurrently to reduce data-loss during high-scale scenarios

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27613]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
13 changes: 12 additions & 1 deletion pkg/stanza/docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ The `udp_input` operator listens for logs from UDP packets.
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
| `async` | nil | An `async` configuration block. See below for details. |

#### `multiline` configuration

Expand Down Expand Up @@ -45,6 +46,16 @@ Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

#### `async` configuration

If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.

**note** If `async` is not set at all, a single thread will read lines synchronously.

| Field | Default | Description |
| --- | --- | --- |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |

### Example Configurations

#### Simple
Expand Down
15 changes: 15 additions & 0 deletions pkg/stanza/operator/input/udp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ func TestUnmarshal(t *testing.T) {
return cfg
}(),
},
{
Name: "all_with_async",
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.ListenAddress = "10.0.0.1:9000"
cfg.AddAttributes = true
cfg.Encoding = "utf-8"
cfg.SplitConfig.LineStartPattern = "ABC"
cfg.SplitConfig.LineEndPattern = ""
cfg.AsyncConfig = NewAsyncConfig()
cfg.AsyncConfig.Readers = 2
return cfg
}(),
},
},
}.Run(t)
}
10 changes: 10 additions & 0 deletions pkg/stanza/operator/input/udp/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ all:
multiline:
line_start_pattern: ABC
line_end_pattern: ""
all_with_async:
type: udp_input
listen_address: 10.0.0.1:9000
add_attributes: true
encoding: utf-8
multiline:
line_start_pattern: ABC
line_end_pattern: ""
async:
readers: 2
89 changes: 57 additions & 32 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ type Config struct {
BaseConfig `mapstructure:",squash"`
}

type AsyncConfig struct {
Readers int `mapstructure:"readers,omitempty"`
}

// NewAsyncConfig creates a new AsyncConfig with default values.
func NewAsyncConfig() *AsyncConfig {
return &AsyncConfig{
Readers: 1,
}
}

// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
Expand All @@ -66,6 +77,7 @@ type BaseConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
AsyncConfig *AsyncConfig `mapstructure:"async,omitempty"`
}

// Build will build a udp input operator.
Expand Down Expand Up @@ -101,6 +113,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
resolver = helper.NewIPResolver()
}

if c.AsyncConfig == nil {
c.AsyncConfig = NewAsyncConfig()
}

if c.AsyncConfig.Readers <= 0 {
return nil, fmt.Errorf("async readers must be greater than 0")
}

udpInput := &Input{
InputOperator: inputOperator,
address: address,
Expand All @@ -110,6 +130,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
splitFunc: splitFunc,
resolver: resolver,
OneLogPerPacket: c.OneLogPerPacket,
AsyncConfig: c.AsyncConfig,
}
return udpInput, nil
}
Expand All @@ -121,6 +142,7 @@ type Input struct {
address *net.UDPAddr
addAttributes bool
OneLogPerPacket bool
AsyncConfig *AsyncConfig

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -148,44 +170,47 @@ func (u *Input) Start(_ operator.Persister) error {

// goHandleMessages will handle messages from a udp connection.
func (u *Input) goHandleMessages(ctx context.Context) {
u.wg.Add(1)

go func() {
defer u.wg.Done()

dec := decode.New(u.encoding)
buf := make([]byte, 0, MaxUDPSize)
for {
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
u.Errorw("Failed reading messages", zap.Error(err))
}
break
}
for i := 0; i < u.AsyncConfig.Readers; i++ {
u.wg.Add(1)
go u.readAndProcessMessages(ctx)
}
}

if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
continue
func (u *Input) readAndProcessMessages(ctx context.Context) {
defer u.wg.Done()

dec := decode.New(u.encoding)
buf := make([]byte, 0, MaxUDPSize)
for {
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
u.Errorw("Failed reading messages", zap.Error(err))
}
break
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)
if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
continue
}

scanner.Split(u.splitFunc)
scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
scanner.Split(u.splitFunc)

for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
}
}()
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
}
}

func truncateMaxLog(data []byte) (token []byte) {
Expand Down
20 changes: 12 additions & 8 deletions pkg/stanza/operator/input/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func udpInputTest(input []byte, expected []string) func(t *testing.T) {
func udpInputTest(input []byte, expected []string, cfg *Config) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewConfigWithID("test_input")
cfg.ListenAddress = ":0"

op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)

Expand Down Expand Up @@ -138,10 +135,17 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T)
}

func TestInput(t *testing.T) {
t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}))
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}))
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}))
cfg := NewConfigWithID("test_input")
cfg.ListenAddress = ":0"

t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}, cfg))
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg))

cfg.AsyncConfig = NewAsyncConfig()
cfg.AsyncConfig.Readers = 2
t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
}

func TestInputAttributes(t *testing.T) {
Expand Down
13 changes: 12 additions & 1 deletion receiver/udplogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ Receives logs over UDP.
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][hhttps://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
| `async` | nil | An `async` configuration block. See below for details. |

### Operators

Expand Down Expand Up @@ -69,6 +70,16 @@ Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

#### `async` configuration

If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.

**note** If `async` is not set at all, a single thread will read lines synchronously.

| Field | Default | Description |
| --- | --- | --- |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |

## Example Configurations

### Simple
Expand Down
21 changes: 15 additions & 6 deletions receiver/udplogreceiver/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,19 @@ import (
)

func TestUdp(t *testing.T) {
testUDP(t, testdataConfigYaml())
listenAddress := "127.0.0.1:29018"
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
}

func testUDP(t *testing.T, cfg *UDPLogConfig) {
func TestUdpAsync(t *testing.T) {
listenAddress := "127.0.0.1:29019"
cfg := testdataConfigYaml(listenAddress)
cfg.InputConfig.AsyncConfig = udp.NewAsyncConfig()
cfg.InputConfig.AsyncConfig.Readers = 2
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
}

func testUDP(t *testing.T, cfg *UDPLogConfig, listenAddress string) {
numLogs := 5

f := NewFactory()
Expand All @@ -38,7 +47,7 @@ func testUDP(t *testing.T, cfg *UDPLogConfig) {
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))

var conn net.Conn
conn, err = net.Dial("udp", "127.0.0.1:29018")
conn, err = net.Dial("udp", listenAddress)
require.NoError(t, err)

for i := 0; i < numLogs; i++ {
Expand Down Expand Up @@ -78,17 +87,17 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, testdataConfigYaml(), cfg)
assert.Equal(t, testdataConfigYaml("127.0.0.1:29018"), cfg)
}

func testdataConfigYaml() *UDPLogConfig {
func testdataConfigYaml(listenAddress string) *UDPLogConfig {
return &UDPLogConfig{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
},
InputConfig: func() udp.Config {
c := udp.NewConfig()
c.ListenAddress = "127.0.0.1:29018"
c.ListenAddress = listenAddress
return *c
}(),
}
Expand Down