Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async config block & concurrent readers to UDP input operator #27647

27 changes: 27 additions & 0 deletions .chloggen/add-async-block-concurrent-udp-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to run udp logs receiver (and stanza udp input operator) concurrently to reduce data-loss during high-scale scenarios

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27613]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
11 changes: 11 additions & 0 deletions pkg/stanza/docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The `udp_input` operator listens for logs from UDP packets.
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
| `async` | {} | An `async` configuration block. See below for details. |

#### `multiline` configuration

Expand Down Expand Up @@ -45,6 +46,16 @@ Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

#### `async` configuration

If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.

**note** If `async` is not set at all, a single thread will receive lines synchronously.

| Field | Default | Description |
| --- | --- | --- |
| `fixed_reader_routine_count` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |

### Example Configurations

#### Simple
Expand Down
14 changes: 14 additions & 0 deletions pkg/stanza/operator/input/udp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ func TestUnmarshal(t *testing.T) {
return cfg
}(),
},
{
Name: "all_with_async",
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.ListenAddress = "10.0.0.1:9000"
cfg.AddAttributes = true
cfg.Encoding = "utf-8"
cfg.SplitConfig.LineStartPattern = "ABC"
cfg.SplitConfig.LineEndPattern = ""
cfg.AsyncConfig.FixedReaderRoutineCount = 2
return cfg
}(),
},
},
}.Run(t)
}
10 changes: 10 additions & 0 deletions pkg/stanza/operator/input/udp/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ all:
multiline:
line_start_pattern: ABC
line_end_pattern: ""
all_with_async:
type: udp_input
listen_address: 10.0.0.1:9000
add_attributes: true
encoding: utf-8
multiline:
line_start_pattern: ABC
line_end_pattern: ""
async:
fixed_reader_routine_count: 2
104 changes: 68 additions & 36 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,26 @@ type Config struct {
BaseConfig `mapstructure:",squash"`
}

type UdpAsyncConfig struct {
FixedReaderRoutineCount int `mapstructure:"fixed_reader_routine_count,omitempty"`
}

// NewUdpAsyncConfig creates a new UdpAsyncConfig with default values.
func NewUdpAsyncConfig() UdpAsyncConfig {
return UdpAsyncConfig{
FixedReaderRoutineCount: 1,
}
}

// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
ListenAddress string `mapstructure:"listen_address,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
AsyncConfig UdpAsyncConfig `mapstructure:"async,omitempty"`
}

// Build will build a udp input operator.
Expand Down Expand Up @@ -101,6 +113,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
resolver = helper.NewIPResolver()
}

if c.AsyncConfig == (UdpAsyncConfig{}) {
hovavza marked this conversation as resolved.
Show resolved Hide resolved
c.AsyncConfig = NewUdpAsyncConfig()
}

if c.AsyncConfig.FixedReaderRoutineCount <= 0 {
return nil, fmt.Errorf("AsyncConfig.FixedReaderRoutineCount must be bigger than 0")
hovavza marked this conversation as resolved.
Show resolved Hide resolved
}

udpInput := &Input{
InputOperator: inputOperator,
address: address,
Expand All @@ -110,6 +130,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
splitFunc: splitFunc,
resolver: resolver,
OneLogPerPacket: c.OneLogPerPacket,
AsyncConfig: c.AsyncConfig,
}
return udpInput, nil
}
Expand All @@ -121,6 +142,7 @@ type Input struct {
address *net.UDPAddr
addAttributes bool
OneLogPerPacket bool
AsyncConfig UdpAsyncConfig

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -150,42 +172,52 @@ func (u *Input) Start(_ operator.Persister) error {
func (u *Input) goHandleMessages(ctx context.Context) {
u.wg.Add(1)

go func() {
defer u.wg.Done()

dec := decode.New(u.encoding)
buf := make([]byte, 0, MaxUDPSize)
for {
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
u.Errorw("Failed reading messages", zap.Error(err))
}
break
}
numConcurrentReaders := 1
if u.AsyncConfig != (UdpAsyncConfig{}) {
numConcurrentReaders = u.AsyncConfig.FixedReaderRoutineCount
}
hovavza marked this conversation as resolved.
Show resolved Hide resolved

if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
continue
for i := 0; i < numConcurrentReaders; i++ {
u.wg.Add(1)
go u.readAndProcessMessages(ctx)
}
}

func (u *Input) readAndProcessMessages(ctx context.Context) {
defer u.wg.Done()

dec := decode.New(u.encoding)
buf := make([]byte, 0, MaxUDPSize)
for {
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
u.Errorw("Failed reading messages", zap.Error(err))
}
break
}

if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
continue
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)
scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

scanner.Split(u.splitFunc)
scanner.Split(u.splitFunc)

for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
}
}()
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
}
}

func truncateMaxLog(data []byte) (token []byte) {
Expand Down
19 changes: 11 additions & 8 deletions pkg/stanza/operator/input/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func udpInputTest(input []byte, expected []string) func(t *testing.T) {
func udpInputTest(input []byte, expected []string, cfg *Config) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewConfigWithID("test_input")
cfg.ListenAddress = ":0"

op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)

Expand Down Expand Up @@ -138,10 +135,16 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T)
}

func TestInput(t *testing.T) {
t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}))
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}))
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}))
cfg := NewConfigWithID("test_input")
cfg.ListenAddress = ":0"

t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}, cfg))
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg))

cfg.AsyncConfig.FixedReaderRoutineCount = 2
t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
}

func TestInputAttributes(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions receiver/udplogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Receives logs over UDP.
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
| `async` | {} | An `async` configuration block. See below for details. |

### Operators

Expand Down Expand Up @@ -69,6 +70,16 @@ Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

#### `async` configuration

If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.

**note** If `async` is not set at all, a single thread will receive lines synchronously.

| Field | Default | Description |
| --- | --- | --- |
| `fixed_reader_routine_count` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |
hovavza marked this conversation as resolved.
Show resolved Hide resolved

## Example Configurations

### Simple
Expand Down
20 changes: 14 additions & 6 deletions receiver/udplogreceiver/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ import (
)

func TestUdp(t *testing.T) {
testUDP(t, testdataConfigYaml())
listenAddress := "127.0.0.1:29018"
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
}

func testUDP(t *testing.T, cfg *UDPLogConfig) {
func TestUdpAsync(t *testing.T) {
listenAddress := "127.0.0.1:29019"
cfg := testdataConfigYaml(listenAddress)
cfg.InputConfig.AsyncConfig.FixedReaderRoutineCount = 2
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
}

func testUDP(t *testing.T, cfg *UDPLogConfig, listenAddress string) {
numLogs := 5

f := NewFactory()
Expand All @@ -38,7 +46,7 @@ func testUDP(t *testing.T, cfg *UDPLogConfig) {
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))

var conn net.Conn
conn, err = net.Dial("udp", "127.0.0.1:29018")
conn, err = net.Dial("udp", listenAddress)
require.NoError(t, err)

for i := 0; i < numLogs; i++ {
Expand Down Expand Up @@ -78,17 +86,17 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, testdataConfigYaml(), cfg)
assert.Equal(t, testdataConfigYaml("127.0.0.1:29018"), cfg)
}

func testdataConfigYaml() *UDPLogConfig {
func testdataConfigYaml(listenAddress string) *UDPLogConfig {
return &UDPLogConfig{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
},
InputConfig: func() udp.Config {
c := udp.NewConfig()
c.ListenAddress = "127.0.0.1:29018"
c.ListenAddress = listenAddress
return *c
}(),
}
Expand Down
Loading