Skip to content

Commit

Permalink
fix: cross compile fixed + switched to sarama kafka consumer (#68)
Browse files Browse the repository at this point in the history
* fix: cross compile fixed + switched to sarama kafka consumer

* ci: add goreleaser to PR build

* chore: cleanup kafka consumer
  • Loading branch information
trietsch authored Nov 15, 2021
1 parent 2b77074 commit 0e728e3
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 239 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.17
- name: Build CLI
run: |
make
Expand All @@ -33,3 +33,9 @@ jobs:
STRM_TEST_S3_SECRET_ACCESS_KEY: ${{ secrets.STRM_TEST_S3_SECRET_ACCESS_KEY }}
run: |
go test ./test -v
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
distribution: goreleaser
version: latest
args: --snapshot --skip-publish --rm-dist
5 changes: 1 addition & 4 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ builds:
- id: strm
binary: strm
main: ./cmd/strm
env:
- CGO_ENABLED=1
ldflags:
- -s -w -X streammachine.io/strm/pkg/cmd.Version={{.Version}} -X streammachine.io/strm/pkg/cmd.GitSha={{.Commit}} -X streammachine.io/strm/pkg/cmd.BuiltOn={{.Date}}
goos:
- linux
- windows
- darwin
goarch:
- amd64

archives:
- name_template: "{{ .ProjectName }}_{{ .Os }}_{{ .Arch }}"
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module streammachine.io/strm
go 1.16

require (
github.com/Shopify/sarama v1.30.0
github.com/actgardner/gogen-avro/v7 v7.3.1
github.com/bykof/gostradamus v1.0.4
github.com/confluentinc/confluent-kafka-go v1.7.0
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-rod/rod v0.101.8
github.com/golang-jwt/jwt/v4 v4.0.0
Expand All @@ -26,8 +26,6 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/streammachineio/api-definitions-go v1.30.0 //v1.16.0
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b // indirect
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
Expand Down
221 changes: 65 additions & 156 deletions go.sum

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions pkg/auth/kafka_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package auth

import (
"context"

"github.com/Shopify/sarama"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)

// Based on https://github.com/damiannolan/sasl/blob/master/oauthbearer/token_provider.go

type TokenProvider struct {
tokenSource oauth2.TokenSource
}

func NewTokenProvider(clientID, clientSecret, tokenURL string) sarama.AccessTokenProvider {
cfg := clientcredentials.Config{
ClientID: clientID,
ClientSecret: clientSecret,
TokenURL: tokenURL,
}

return &TokenProvider{
tokenSource: cfg.TokenSource(context.Background()),
}
}

func (t *TokenProvider) Token() (*sarama.AccessToken, error) {
token, err := t.tokenSource.Token()
if err != nil {
return nil, err
}

return &sarama.AccessToken{Token: token.AccessToken}, nil
}
1 change: 0 additions & 1 deletion pkg/kafkaconsumer/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ func init() {
flags.String(common.ClientIdFlag, "", "Client id to be used for receiving data")
flags.String(common.ClientSecretFlag, "", "Client secret to be used for receiving data")
flags.String(GroupIdFlag, "", "Kafka consumer group id. Uses a random value when not set")
flags.String(SslCaLocationFlag, "", "The location of the SSL CA pem file. Only needed in specific cases. Sets the OpenSSL 'ssl.ca.location' flag.")
}
154 changes: 80 additions & 74 deletions pkg/kafkaconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,25 @@ package kafkaconsumer
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/Shopify/sarama"
"github.com/spf13/cobra"
"golang.org/x/oauth2/clientcredentials"
"math/rand"
"log"
"os"
"os/signal"
"streammachine.io/strm/pkg/auth"
"streammachine.io/strm/pkg/common"
"streammachine.io/strm/pkg/entity/kafka_exporter"
"streammachine.io/strm/pkg/util"
"strings"
"sync"
"syscall"
"time"
)

const (
KafkaBrokerFlag = "kafka-broker"
GroupIdFlag = "group-id"
SslCaLocationFlag = "ssl-ca-location"
KafkaBrokerFlag = "kafka-broker"
GroupIdFlag = "group-id"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

func Run(cmd *cobra.Command, kafkaExporterName *string) {
flags := cmd.Flags()
clientId := util.GetStringAndErr(flags, common.ClientIdFlag)
Expand All @@ -40,80 +36,90 @@ func Run(cmd *cobra.Command, kafkaExporterName *string) {
topic := kafkaExporter.Target.Topic
groupId := util.GetStringAndErr(flags, GroupIdFlag)
if len(groupId) == 0 {
groupId = fmt.Sprintf("random-%d", rand.Int())
common.CliExit(fmt.Sprintf("Please set a Kafka Consumer group id with --%v", GroupIdFlag))
}

sslCaLocation := util.GetStringAndErr(flags, SslCaLocationFlag)
//sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

configMap := kafka.ConfigMap{
"bootstrap.servers": brokers,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "OAUTHBEARER",
"group.id": groupId,
"socket.keepalive.enable": "true",
"log.connection.close": "false",
}
if len(sslCaLocation) > 0 {
_ = configMap.SetKey("ssl.ca.location", sslCaLocation)
}
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.TLS.Enable = true
config.Consumer.Return.Errors = true
config.Version = sarama.MaxVersion
config.Net.SASL.TokenProvider = auth.NewTokenProvider(clientId, clientSecret, common.EventAuthHost+"/token")

consumer, err := kafka.NewConsumer(&configMap)
common.CliExit(err)
consumer := Consumer{
ready: make(chan bool),
}

clientConfig := &clientcredentials.Config{
ClientID: clientId,
ClientSecret: clientSecret,
TokenURL: common.EventAuthHost + "/token",
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupId, config)
if err != nil {
common.CliExit(fmt.Sprintf("Error creating consumer group client: %v", err))
}

refreshToken(clientConfig, consumer)
err = consumer.SubscribeTopics([]string{topic}, nil)
common.CliExit(err)

sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

// Process messages
// librdkafka shows `AllBrokersDown` messages for a simple tcp disconnect.
// we're only acting on it if we have 2 in a row in the poll loop
hadError := false
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := consumer.Poll(100)
if ev == nil {
continue
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, []string{topic}, &consumer); err != nil {
common.CliExit(fmt.Sprintf("Error from consumer: %v", err))
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Println(string(e.Value))
hadError = false
case kafka.OffsetsCommitted:
case kafka.OAuthBearerTokenRefresh:
refreshToken(clientConfig, consumer)
case kafka.Error:
if hadError {
common.CliExit(e)
}
//fmt.Println("Error", e)
hadError = true
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()

<-consumer.ready // Await till the consumer has been set up

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("Shutting down Kafka Consumer")
case <-sigterm:
log.Println("Shutting down Kafka Consumer")
}
_ = consumer.Close()
cancel()
wg.Wait()
if err = client.Close(); err != nil {
common.CliExit(fmt.Sprintf("Error closing client: %v", err))
}

}

func refreshToken(config *clientcredentials.Config, consumer *kafka.Consumer) {
token, err := config.Token(context.Background())
common.CliExit(err)
err = consumer.SetOAuthBearerToken(kafka.OAuthBearerToken{
TokenValue: token.AccessToken,
Expiration: token.Expiry,
})
common.CliExit(err)
//_, _ = fmt.Fprintf(os.Stderr, "Token refreshed until %s", gostradamus.DateTimeFromTime(token.Expiry).IsoFormat())
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Println(string(message.Value))
session.MarkMessage(message, "")
}

return nil
}

0 comments on commit 0e728e3

Please sign in to comment.