Skip to content

feat: Support AWS Kinesis destination #316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/dev/deps/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
aws:
image: localstack/localstack:latest
environment:
- SERVICES=sns,sts,sqs
- SERVICES=sns,sts,sqs,kinesis
ports:
- 4566:4566
- 4571:4571
Expand Down
2 changes: 1 addition & 1 deletion build/test/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:
aws:
image: localstack/localstack:latest
environment:
- SERVICES=sns,sts,sqs
- SERVICES=sns,sts,sqs,kinesis
ports:
- 34566:4566
- 34571:4571
Expand Down
241 changes: 241 additions & 0 deletions cmd/destinations/awskinesis/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package main

import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)

const (
AWSRegion = "us-east-1"
AWSEndpoint = "http://localhost:4566"
AWSCredentials = "test:test:"
DestinationStreamName = "destination_kinesis_stream"
ShardCount = 1
)

func main() {
ctx := context.Background()

// Set up AWS configuration and client
kinesisClient, err := setupKinesisClient(ctx)
if err != nil {
log.Fatalf("Failed to set up Kinesis client: %v", err)
}

// Check for command line arguments
if len(os.Args) > 1 && os.Args[1] == "down" {
if err := deleteStream(ctx, kinesisClient); err != nil {
log.Fatalf("Error deleting stream: %v", err)
}
return
}

if err := consumeMessages(ctx, kinesisClient); err != nil {
log.Fatalf("Error consuming messages: %v", err)
}
}

// setupKinesisClient creates and configures a Kinesis client
func setupKinesisClient(ctx context.Context) (*kinesis.Client, error) {
// Configure AWS SDK
awsConfig, err := config.LoadDefaultConfig(ctx,
config.WithRegion(AWSRegion),
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(
AWSCredentials[:len(AWSCredentials)-1], // remove the trailing colon
AWSCredentials[len(AWSCredentials)-1:], // just get the empty string after colon
""),
),
)
if err != nil {
return nil, err
}

// Create Kinesis client with custom endpoint
kinesisClient := kinesis.NewFromConfig(awsConfig, func(o *kinesis.Options) {
o.BaseEndpoint = aws.String(AWSEndpoint)
})

return kinesisClient, nil
}

// deleteStream deletes the Kinesis stream
func deleteStream(ctx context.Context, client *kinesis.Client) error {
// Check if stream exists before attempting to delete
_, err := client.DescribeStream(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(DestinationStreamName),
})

if err != nil {
log.Printf("[*] Stream %s does not exist or cannot be accessed, nothing to clean up", DestinationStreamName)
return nil
}

// Delete the stream
log.Printf("[*] Deleting stream %s...", DestinationStreamName)
_, err = client.DeleteStream(ctx, &kinesis.DeleteStreamInput{
StreamName: aws.String(DestinationStreamName),
})
if err != nil {
return err
}

log.Printf("[*] Stream %s delete request sent successfully", DestinationStreamName)

// Wait for stream to be deleted
log.Printf("[*] Waiting for stream %s to be deleted...", DestinationStreamName)
waiter := kinesis.NewStreamNotExistsWaiter(client)
err = waiter.Wait(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(DestinationStreamName),
}, 30*time.Second)

if err == nil {
log.Printf("[*] Stream %s has been deleted", DestinationStreamName)
}

return err
}

// consumeMessages runs the Kinesis consumer
func consumeMessages(ctx context.Context, client *kinesis.Client) error {
// Ensure stream exists
if err := ensureKinesisStream(ctx, client, DestinationStreamName); err != nil {
return err
}

// Get shard ID
describeOutput, err := client.DescribeStream(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(DestinationStreamName),
})
if err != nil {
return err
}

if len(describeOutput.StreamDescription.Shards) == 0 {
log.Printf("[*] No shards found in stream %s", DestinationStreamName)
return nil
}

shardId := *describeOutput.StreamDescription.Shards[0].ShardId
log.Printf("[*] Using shard ID: %s", shardId)

// Set up termination signal handler
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

// Get shard iterator
iteratorOutput, err := client.GetShardIterator(ctx, &kinesis.GetShardIteratorInput{
StreamName: aws.String(DestinationStreamName),
ShardId: aws.String(shardId),
ShardIteratorType: types.ShardIteratorTypeLatest,
})
if err != nil {
return err
}

iterator := iteratorOutput.ShardIterator

// Start consuming in a goroutine
go func() {
for {
// Get records using the shard iterator
recordsOutput, err := client.GetRecords(ctx, &kinesis.GetRecordsInput{
ShardIterator: iterator,
Limit: aws.Int32(25),
})
if err != nil {
log.Printf("[*] Error getting records: %v", err)
time.Sleep(1 * time.Second)
continue
}

// Process each record
for _, record := range recordsOutput.Records {
// Try to unmarshal as JSON to pretty print
var payload map[string]interface{}
if err := json.Unmarshal(record.Data, &payload); err != nil {
// If not JSON, just print raw data
log.Printf("[x] Sequence Number: %s | Partition Key: %s | Payload: %s",
*record.SequenceNumber, *record.PartitionKey, string(record.Data))
} else {
// Pretty print the JSON payload
formattedData, _ := json.MarshalIndent(payload, "", " ")
log.Printf("[x] Sequence Number: %s | Partition Key: %s | Payload: %s",
*record.SequenceNumber, *record.PartitionKey, string(formattedData))
}
}

// Update the iterator for the next call
iterator = recordsOutput.NextShardIterator
if iterator == nil {
log.Printf("[*] End of shard reached")
break
}

// If no records, sleep a bit to avoid hitting API limits
if len(recordsOutput.Records) == 0 {
time.Sleep(1 * time.Second)
}
}
}()

log.Printf("[*] Ready to receive messages from Kinesis")
log.Printf("[*] Configuration:")
log.Printf("\tEndpoint: %s (use 'aws:4566' if running in Docker)", AWSEndpoint)
log.Printf("\tRegion: %s", AWSRegion)
log.Printf("\tStream: %s", DestinationStreamName)
log.Printf("[*] Consumer set to read only NEW messages that arrive after startup")
log.Printf("[*] Available commands:")
log.Printf("\tgo run ./cmd/destinations/awskinesis - Start consumer")
log.Printf("\tgo run ./cmd/destinations/awskinesis down - Delete stream")
log.Printf("[*] Waiting for logs. To exit press CTRL+C")
<-termChan

return nil
}

// Create or ensure Kinesis stream exists
func ensureKinesisStream(ctx context.Context, client *kinesis.Client, streamName string) error {
// Check if stream exists
_, err := client.DescribeStream(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
})
if err == nil {
log.Printf("[*] Stream %s already exists", streamName)
return nil
}

// Create the stream
log.Printf("[*] Creating stream %s with %d shard(s)...", streamName, ShardCount)
_, err = client.CreateStream(ctx, &kinesis.CreateStreamInput{
StreamName: aws.String(streamName),
ShardCount: aws.Int32(ShardCount),
})
if err != nil {
return err
}

// Wait for stream to become active
log.Printf("[*] Waiting for stream %s to become active...", streamName)
waiter := kinesis.NewStreamExistsWaiter(client)
err = waiter.Wait(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
}, 30*time.Second)

if err == nil {
log.Printf("[*] Stream %s is now active", streamName)
}

return err
}
File renamed without changes.
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ require (
github.com/Masterminds/sprig/v3 v3.3.0
github.com/alicebob/miniredis/v2 v2.33.0
github.com/aws/aws-sdk-go v1.55.5
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2 v1.36.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
github.com/aws/aws-sdk-go-v2/service/kinesis v1.33.2
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.3
github.com/aws/smithy-go v1.21.0
github.com/aws/smithy-go v1.22.2
github.com/caarlos0/env/v9 v9.0.0
github.com/getsentry/sentry-go v0.31.1
github.com/getsentry/sentry-go/gin v0.31.1
Expand Down Expand Up @@ -85,9 +86,10 @@ require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
Expand Down
20 changes: 12 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -670,24 +670,28 @@ github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+ye
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
github.com/aws/aws-sdk-go-v2/config v1.27.27 h1:HdqgGt1OAP0HkEDDShEl0oSYa9ZZBSOmKpdpsDMdO90=
github.com/aws/aws-sdk-go-v2/config v1.27.27/go.mod h1:MVYamCg76dFNINkZFu4n4RjDixhVr51HLj4ErWzrVwg=
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 h1:2raNba6gr2IfA0eqqiP2XiQ0UVOpGPgDSi0I9iAP+UI=
github.com/aws/aws-sdk-go-v2/credentials v1.17.27/go.mod h1:gniiwbGahQByxan6YjQUMcW4Aov6bLC3m+evgcoN4r4=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 h1:KreluoV8FZDEtI6Co2xuNk/UqI9iwMrOx/87PBNIKqw=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11/go.mod h1:SeSUYBLsMYFoRvHE0Tjvn7kbxaUhl75CJi1sbfhMxkU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 h1:SoNJ4RlFEQEbtDcCEt+QG56MY4fm4W8rYirAmq+/DdU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15/go.mod h1:U9ke74k1n2bf+RIgoX1SXFed1HLs51OgUSs+Ph0KJP8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 h1:C6WHdGnTDIYETAm5iErQUiVNsclNx9qbJVPIt03B6bI=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15/go.mod h1:ZQLZqhcu+JhSrA9/NXRm8SkDvsycE+JkV3WGY41e+IM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 h1:dT3MqvGhSoaIhRseqw2I0yH81l7wiR2vjs57O51EAm8=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3/go.mod h1:GlAeCkHwugxdHaueRr4nhPuY+WW+gR8UjlcqzPr1SPI=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 h1:HGErhhrxZlQ044RiM+WdoZxp0p+EGM62y3L6pwA4olE=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17/go.mod h1:RkZEx4l0EHYDJpWppMJ3nD9wZJAa8/0lq9aVC+r2UII=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.33.2 h1:t3Ukha929to7c4SZDeCP3aRQBgn01nhwKxggYOVRMR0=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.33.2/go.mod h1:dJngkoVMrq0K7QvRkdRZYM4NUp6cdWa2GBdpm8zoY8U=
github.com/aws/aws-sdk-go-v2/service/sns v1.31.3 h1:eSTEdxkfle2G98FE+Xl3db/XAXXVTJPNQo9K/Ar8oAI=
github.com/aws/aws-sdk-go-v2/service/sns v1.31.3/go.mod h1:1dn0delSO3J69THuty5iwP0US2Glt0mx2qBBlI13pvw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.3 h1:Vjqy5BZCOIsn4Pj8xzyqgGmsSqzz7y/WXbN3RgOoVrc=
Expand All @@ -698,8 +702,8 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 h1:yiwVzJW2ZxZTurVbYWA7QOrA
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4/go.mod h1:0oxfLkpz3rQ/CHlx5hB7H69YUpFiI1tql6Q6Ne+1bCw=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 h1:ZsDKRLXGWHk8WdtyYMoGNO7bTudrvuKpDKgMVRlepGE=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzxOl8SRqgf/IDw5aUt9UKFcQ=
github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA=
github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func (c *Config) InitDefaults() {
Webhook: DestinationWebhookConfig{
HeaderPrefix: "x-outpost-",
},
AWSKinesis: DestinationAWSKinesisConfig{
MetadataInPayload: true,
},
}

c.Alert = AlertConfig{
Expand Down
24 changes: 20 additions & 4 deletions internal/config/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ import (
destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers"
)

// DestinationsConfig is the main configuration for all destination types
type DestinationsConfig struct {
MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH"`
Webhook DestinationWebhookConfig `yaml:"webhook"`
MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH"`
Webhook DestinationWebhookConfig `yaml:"webhook"`
AWSKinesis DestinationAWSKinesisConfig `yaml:"aws_kinesis"`
}

func (c *DestinationsConfig) ToConfig() destregistrydefault.RegisterDefaultDestinationOptions {
return destregistrydefault.RegisterDefaultDestinationOptions{
Webhook: c.Webhook.toConfig(),
Webhook: c.Webhook.toConfig(),
AWSKinesis: c.AWSKinesis.toConfig(),
}
}

// Webhook configuration
type DestinationWebhookConfig struct {
HeaderPrefix string `yaml:"header_prefix" env:"DESTINATIONS_WEBHOOK_HEADER_PREFIX"`
DisableDefaultEventIDHeader bool `yaml:"disable_default_event_id_header" env:"DESTINATIONS_WEBHOOK_DISABLE_DEFAULT_EVENT_ID_HEADER"`
Expand All @@ -27,7 +31,7 @@ type DestinationWebhookConfig struct {
SignatureAlgorithm string `yaml:"signature_algorithm" env:"DESTINATIONS_WEBHOOK_SIGNATURE_ALGORITHM"`
}

// toConfig is now private since it's only used internally by DestinationsConfig.ToConfig
// toConfig converts WebhookConfig to the provider config - private since it's only used internally
func (c *DestinationWebhookConfig) toConfig() *destregistrydefault.DestWebhookConfig {
return &destregistrydefault.DestWebhookConfig{
HeaderPrefix: c.HeaderPrefix,
Expand All @@ -41,3 +45,15 @@ func (c *DestinationWebhookConfig) toConfig() *destregistrydefault.DestWebhookCo
SignatureAlgorithm: c.SignatureAlgorithm,
}
}

// AWS Kinesis configuration
type DestinationAWSKinesisConfig struct {
MetadataInPayload bool `yaml:"metadata_in_payload" env:"DESTINATIONS_AWS_KINESIS_METADATA_IN_PAYLOAD"`
}

// toConfig converts AWSKinesisConfig to the provider config
func (c *DestinationAWSKinesisConfig) toConfig() *destregistrydefault.DestAWSKinesisConfig {
return &destregistrydefault.DestAWSKinesisConfig{
MetadataInPayload: c.MetadataInPayload,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Placeholder

This file is a placeholder. Documentation will be added later.
Loading