Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature - parallelization factor kinesis functionbeat config #20727

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Functionbeat*
- Add basic ECS categorization and `cloud` fields. {pull}19174[19174]
- Add support for parallelization factor for kinesis. {pull}20727[20727]

*Winlogbeat*

Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -206,6 +206,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -263,7 +267,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -273,6 +277,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/_meta/config/beat.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -180,6 +180,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Create a function that accepts Cloudwatch logs from Kinesis streams.
- name: cloudwatch-logs-kinesis
enabled: false
Expand Down Expand Up @@ -233,7 +237,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -243,6 +247,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Configure functions to run on Google Cloud Platform, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
Expand Down
9 changes: 8 additions & 1 deletion x-pack/functionbeat/docs/config-options-aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ Set this option to an ARN that points to an SQS queue.
[id="{beatname_lc}-batch-size"]
==== `batch_size`

The number of events to read from a Kinesis stream, the minimal values is 100 and the maximun is
The number of events to read from a Kinesis stream, the minimum value is 100 and the maximum is
10000. The default is 100.

[float]
Expand All @@ -186,6 +186,13 @@ The number of events to read from a Kinesis stream, the minimal values is 100 an
The starting position to read from a Kinesis stream, valids values are `trim_horizon` and `latest`.
The default is trim_horizon.

[float]
[id="{beatname_lc}-parallelization-factor"]
==== `parallelization_factor`

The number of batches to process from each shard concurrently, the minimum value is 1 and the maximum is 10
The default is 1.

[float]
[id="{beatname_lc}-keep-null"]
==== `keep_null`
Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -206,6 +206,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -263,7 +267,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -273,6 +277,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/functionbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -180,6 +180,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Create a function that accepts Cloudwatch logs from Kinesis streams.
- name: cloudwatch-logs-kinesis
enabled: false
Expand Down Expand Up @@ -233,7 +237,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -243,6 +247,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Configure functions to run on Google Cloud Platform, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
Expand Down
21 changes: 12 additions & 9 deletions x-pack/functionbeat/provider/aws/aws/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,19 @@ func (cfg *KinesisConfig) Validate() error {

// KinesisTriggerConfig configuration for the current trigger.
type KinesisTriggerConfig struct {
EventSourceArn string `config:"event_source_arn" validate:"required"`
BatchSize int `config:"batch_size" validate:"min=100,max=10000"`
StartingPosition startingPosition `config:"starting_position"`
EventSourceArn string `config:"event_source_arn" validate:"required"`
BatchSize int `config:"batch_size" validate:"min=100,max=10000"`
StartingPosition startingPosition `config:"starting_position"`
ParallelizationFactor int `config:"parallelization_factor" validate:"min=1,max=10"`
}

// Unpack unpacks the trigger and make sure the defaults settings are correctly sets.
func (c *KinesisTriggerConfig) Unpack(cfg *common.Config) error {
type tmpConfig KinesisTriggerConfig
config := tmpConfig{
BatchSize: 100,
StartingPosition: trimHorizonPos,
BatchSize: 100,
StartingPosition: trimHorizonPos,
ParallelizationFactor: 1,
}
if err := cfg.Unpack(&config); err != nil {
return err
Expand Down Expand Up @@ -176,10 +178,11 @@ func (k *Kinesis) Template() *cloudformation.Template {
for _, trigger := range k.config.Triggers {
resourceName := prefix(k.Name() + trigger.EventSourceArn)
template.Resources[resourceName] = &lambda.EventSourceMapping{
BatchSize: trigger.BatchSize,
EventSourceArn: trigger.EventSourceArn,
FunctionName: cloudformation.GetAtt(prefix(""), "Arn"),
StartingPosition: trigger.StartingPosition.String(),
BatchSize: trigger.BatchSize,
ParallelizationFactor: trigger.ParallelizationFactor,
EventSourceArn: trigger.EventSourceArn,
FunctionName: cloudformation.GetAtt(prefix(""), "Arn"),
StartingPosition: trigger.StartingPosition.String(),
}
}

Expand Down
35 changes: 31 additions & 4 deletions x-pack/functionbeat/provider/aws/aws/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestKinesis(t *testing.T) {
assert.NoError(t, err)
})

t.Run("when publish is not succesful", func(t *testing.T) {
t.Run("when publish is not successful", func(t *testing.T) {
e := errors.New("something bad")
client := &arrayBackedClient{err: e}

Expand Down Expand Up @@ -141,6 +141,32 @@ func testKinesisConfig(t *testing.T) {
},
},
},
"test upper bound parallelization factor limit": {
valid: false,
rawConfig: map[string]interface{}{
"name": "mysuperfunctionname",
"description": "mylong description",
"triggers": []map[string]interface{}{
map[string]interface{}{
"event_source_arn": "abc123",
"parallelization_factor": 13,
},
},
},
},
"test lower bound parallelization factor limit": {
valid: false,
rawConfig: map[string]interface{}{
"name": "mysuperfunctionname",
"description": "mylong description",
"triggers": []map[string]interface{}{
map[string]interface{}{
"event_source_arn": "abc123",
"parallelization_factor": 0,
},
},
},
},
"test default values": {
valid: true,
rawConfig: map[string]interface{}{
Expand All @@ -158,9 +184,10 @@ func testKinesisConfig(t *testing.T) {
LambdaConfig: DefaultLambdaConfig,
Triggers: []*KinesisTriggerConfig{
&KinesisTriggerConfig{
EventSourceArn: "abc123",
BatchSize: 100,
StartingPosition: trimHorizonPos,
EventSourceArn: "abc123",
BatchSize: 100,
StartingPosition: trimHorizonPos,
ParallelizationFactor: 1,
},
},
},
Expand Down