Skip to content

Commit

Permalink
Add paralelization_factor to Functionbeat Kinesis (elastic#20727)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds the ability to add the parallelization factor configuration to functionbeat when reading from Kinesis streams. 
https://aws.amazon.com/about-aws/whats-new/2019/11/aws-lambda-supports-parallelization-factor-for-kinesis-and-dynamodb-event-sources/


## Why is it important?

This configuration allows you to process one shard of a Kinesis or DynamoDB data stream with more than one Lambda invocation simultaneously.


- Closes elastic#16901
  • Loading branch information
ravinaik1312 authored and melchiormoulin committed Oct 14, 2020
1 parent 0c9e00a commit ed33c7b
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Functionbeat*
- Add basic ECS categorization and `cloud` fields. {pull}19174[19174]
- Add support for parallelization factor for kinesis. {pull}20727[20727]

*Winlogbeat*

Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -206,6 +206,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -263,7 +267,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -273,6 +277,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/_meta/config/beat.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -180,6 +180,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Create a function that accepts Cloudwatch logs from Kinesis streams.
- name: cloudwatch-logs-kinesis
enabled: false
Expand Down Expand Up @@ -233,7 +237,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -243,6 +247,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Configure functions to run on Google Cloud Platform, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
Expand Down
9 changes: 8 additions & 1 deletion x-pack/functionbeat/docs/config-options-aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ Set this option to an ARN that points to an SQS queue.
[id="{beatname_lc}-batch-size"]
==== `batch_size`

The number of events to read from a Kinesis stream, the minimal values is 100 and the maximun is
The number of events to read from a Kinesis stream, the minimum value is 100 and the maximum is
10000. The default is 100.

[float]
Expand All @@ -186,6 +186,13 @@ The number of events to read from a Kinesis stream, the minimal values is 100 an
The starting position to read from a Kinesis stream, valids values are `trim_horizon` and `latest`.
The default is trim_horizon.

[float]
[id="{beatname_lc}-parallelization-factor"]
==== `parallelization_factor`

The number of batches to process from each shard concurrently, the minimum value is 1 and the maximum is 10
The default is 1.

[float]
[id="{beatname_lc}-keep-null"]
==== `keep_null`
Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -206,6 +206,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -263,7 +267,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -273,6 +277,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down
12 changes: 10 additions & 2 deletions x-pack/functionbeat/functionbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -180,6 +180,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Create a function that accepts Cloudwatch logs from Kinesis streams.
- name: cloudwatch-logs-kinesis
enabled: false
Expand Down Expand Up @@ -233,7 +237,7 @@ functionbeat.provider.aws.functions:
# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents
- event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
Expand All @@ -243,6 +247,10 @@ functionbeat.provider.aws.functions:
# Default is trim_horizon.
#starting_position: "trim_horizon"

# parallelization_factor is the number of batches to process from each shard concurrently.
# Default is 1.
#parallelization_factor: 1

# Configure functions to run on Google Cloud Platform, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
Expand Down
21 changes: 12 additions & 9 deletions x-pack/functionbeat/provider/aws/aws/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,19 @@ func (cfg *KinesisConfig) Validate() error {

// KinesisTriggerConfig configuration for the current trigger.
type KinesisTriggerConfig struct {
EventSourceArn string `config:"event_source_arn" validate:"required"`
BatchSize int `config:"batch_size" validate:"min=100,max=10000"`
StartingPosition startingPosition `config:"starting_position"`
EventSourceArn string `config:"event_source_arn" validate:"required"`
BatchSize int `config:"batch_size" validate:"min=100,max=10000"`
StartingPosition startingPosition `config:"starting_position"`
ParallelizationFactor int `config:"parallelization_factor" validate:"min=1,max=10"`
}

// Unpack unpacks the trigger and make sure the defaults settings are correctly sets.
func (c *KinesisTriggerConfig) Unpack(cfg *common.Config) error {
type tmpConfig KinesisTriggerConfig
config := tmpConfig{
BatchSize: 100,
StartingPosition: trimHorizonPos,
BatchSize: 100,
StartingPosition: trimHorizonPos,
ParallelizationFactor: 1,
}
if err := cfg.Unpack(&config); err != nil {
return err
Expand Down Expand Up @@ -176,10 +178,11 @@ func (k *Kinesis) Template() *cloudformation.Template {
for _, trigger := range k.config.Triggers {
resourceName := prefix(k.Name() + trigger.EventSourceArn)
template.Resources[resourceName] = &lambda.EventSourceMapping{
BatchSize: trigger.BatchSize,
EventSourceArn: trigger.EventSourceArn,
FunctionName: cloudformation.GetAtt(prefix(""), "Arn"),
StartingPosition: trigger.StartingPosition.String(),
BatchSize: trigger.BatchSize,
ParallelizationFactor: trigger.ParallelizationFactor,
EventSourceArn: trigger.EventSourceArn,
FunctionName: cloudformation.GetAtt(prefix(""), "Arn"),
StartingPosition: trigger.StartingPosition.String(),
}
}

Expand Down
35 changes: 31 additions & 4 deletions x-pack/functionbeat/provider/aws/aws/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestKinesis(t *testing.T) {
assert.NoError(t, err)
})

t.Run("when publish is not succesful", func(t *testing.T) {
t.Run("when publish is not successful", func(t *testing.T) {
e := errors.New("something bad")
client := &arrayBackedClient{err: e}

Expand Down Expand Up @@ -141,6 +141,32 @@ func testKinesisConfig(t *testing.T) {
},
},
},
"test upper bound parallelization factor limit": {
valid: false,
rawConfig: map[string]interface{}{
"name": "mysuperfunctionname",
"description": "mylong description",
"triggers": []map[string]interface{}{
map[string]interface{}{
"event_source_arn": "abc123",
"parallelization_factor": 13,
},
},
},
},
"test lower bound parallelization factor limit": {
valid: false,
rawConfig: map[string]interface{}{
"name": "mysuperfunctionname",
"description": "mylong description",
"triggers": []map[string]interface{}{
map[string]interface{}{
"event_source_arn": "abc123",
"parallelization_factor": 0,
},
},
},
},
"test default values": {
valid: true,
rawConfig: map[string]interface{}{
Expand All @@ -158,9 +184,10 @@ func testKinesisConfig(t *testing.T) {
LambdaConfig: DefaultLambdaConfig,
Triggers: []*KinesisTriggerConfig{
&KinesisTriggerConfig{
EventSourceArn: "abc123",
BatchSize: 100,
StartingPosition: trimHorizonPos,
EventSourceArn: "abc123",
BatchSize: 100,
StartingPosition: trimHorizonPos,
ParallelizationFactor: 1,
},
},
},
Expand Down

0 comments on commit ed33c7b

Please sign in to comment.