From ff0e2f106876d1f509cabd742c246b6c989a4b1f Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Tue, 24 May 2022 10:01:09 -0600 Subject: [PATCH] feat: add field key option to set event partition key (#11076) --- plugins/outputs/event_hubs/README.md | 8 ++++++++ plugins/outputs/event_hubs/event_hubs.go | 16 ++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/event_hubs/README.md b/plugins/outputs/event_hubs/README.md index 24ed32f49b5bd..7f9915cf94811 100644 --- a/plugins/outputs/event_hubs/README.md +++ b/plugins/outputs/event_hubs/README.md @@ -21,8 +21,16 @@ JSON is probably the easiest to integrate with downstream components. ## The full connection string to the Event Hub (required) ## The shared access key must have "Send" permissions on the target Event Hub. connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" + ## Client timeout (defaults to 30s) # timeout = "30s" + + ## Partition key + ## Metric tag or field name to use for the event partition key. The value of + ## this tag or field is set as the key for events if it exists. If both, tag + ## and field, exist the tag is preferred. + # partition_key = "" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index e6a4c1da53cd1..5a0c8d3d613cf 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -50,7 +50,8 @@ func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterat type EventHubs struct { Log telegraf.Logger `toml:"-"` ConnectionString string `toml:"connection_string"` - Timeout config.Duration + Timeout config.Duration `toml:"timeout"` + PartitionKey string `toml:"partition_key"` Hub EventHubInterface serializer serializers.Serializer @@ -102,7 +103,18 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { continue } - events = append(events, eventhub.NewEvent(payload)) + event := eventhub.NewEvent(payload) + if e.PartitionKey != "" { + if key, ok := metric.GetTag(e.PartitionKey); ok { + event.PartitionKey = &key + } else if key, ok := metric.GetField(e.PartitionKey); ok { + if strKey, ok := key.(string); ok { + event.PartitionKey = &strKey + } + } + } + + events = append(events, event) } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))