Skip to content

Commit

Permalink
return an error in constructor function
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Nov 24, 2021
1 parent 25080b9 commit 3846c02
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 29 deletions.
4 changes: 2 additions & 2 deletions examples/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ func main() {
config.Version, _ = sarama.ParseKafkaVersion("2.2.0")
groupID := "testConsumer"

source := ext.NewKafkaSource(ctx, hosts, groupID, config, "test")
source, _ := ext.NewKafkaSource(ctx, hosts, groupID, config, "test")
flow1 := flow.NewMap(toUpper, 1)
flow2 := flow.NewFlatMap(appendAsterix, 1)
sink := ext.NewKafkaSink(hosts, config, "test2")
sink, _ := ext.NewKafkaSink(hosts, config, "test2")
throttler := flow.NewThrottler(1, time.Second*1, 50, flow.Discard)
// slidingWindow := flow.NewSlidingWindow(time.Second*30, time.Second*5)
tumblingWindow := flow.NewTumblingWindow(time.Second * 5)
Expand Down
70 changes: 43 additions & 27 deletions kafka/kafka_sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (
"github.com/Shopify/sarama"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/util"
)

// KafkaSource connector
// KafkaSource represents an Apache Kafka source connector.
type KafkaSource struct {
consumer sarama.ConsumerGroup
handler sarama.ConsumerGroupHandler
Expand All @@ -25,33 +24,36 @@ type KafkaSource struct {
wg *sync.WaitGroup
}

// NewKafkaSource returns a new KafkaSource instance
// NewKafkaSource returns a new KafkaSource instance.
func NewKafkaSource(ctx context.Context, addrs []string, groupID string,
config *sarama.Config, topics ...string) *KafkaSource {
config *sarama.Config, topics ...string) (*KafkaSource, error) {
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
util.Check(err)
if err != nil {
return nil, err
}

out := make(chan interface{})
cctx, cancel := context.WithCancel(ctx)

sink := &KafkaSource{
consumerGroup,
&GroupHandler{make(chan struct{}), out},
topics,
out,
cctx,
cancel,
&sync.WaitGroup{},
consumer: consumerGroup,
handler: &GroupHandler{make(chan struct{}), out},
topics: topics,
out: out,
ctx: cctx,
cancelCtx: cancel,
wg: &sync.WaitGroup{},
}

go sink.init()
return sink
return sink, nil
}

func (ks *KafkaSource) claimLoop() {
ks.wg.Add(1)
defer func() {
ks.wg.Done()
log.Printf("Exiting the Kafka claimLoop")
log.Printf("Exiting Kafka claimLoop")
}()
for {
handler := ks.handler.(*GroupHandler)
Expand Down Expand Up @@ -84,7 +86,7 @@ func (ks *KafkaSource) init() {
case <-ks.ctx.Done():
}

log.Printf("Closing the Kafka consumer")
log.Printf("Closing Kafka consumer")
ks.wg.Wait()
close(ks.out)
ks.consumer.Close()
Expand Down Expand Up @@ -130,56 +132,70 @@ func (handler *GroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, c
session.MarkMessage(message, "")
handler.out <- message
}

case <-session.Context().Done():
return session.Context().Err()
}
}
}

// KafkaSink connector
// KafkaSink represents an Apache Kafka sink connector.
type KafkaSink struct {
producer sarama.SyncProducer
topic string
in chan interface{}
}

// NewKafkaSink returns a new KafkaSink instance
func NewKafkaSink(addrs []string, config *sarama.Config, topic string) *KafkaSink {
// NewKafkaSink returns a new KafkaSink instance.
func NewKafkaSink(addrs []string, config *sarama.Config, topic string) (*KafkaSink, error) {
producer, err := sarama.NewSyncProducer(addrs, config)
util.Check(err)
if err != nil {
return nil, err
}

sink := &KafkaSink{
producer,
topic,
make(chan interface{}),
producer: producer,
topic: topic,
in: make(chan interface{}),
}

go sink.init()
return sink
return sink, nil
}

// init starts the main loop
func (ks *KafkaSink) init() {
for msg := range ks.in {
var err error
switch m := msg.(type) {
case *sarama.ProducerMessage:
ks.producer.SendMessage(m)
_, _, err = ks.producer.SendMessage(m)

case *sarama.ConsumerMessage:
sMsg := &sarama.ProducerMessage{
Topic: ks.topic,
Key: sarama.StringEncoder(m.Key),
Value: sarama.StringEncoder(m.Value),
}
ks.producer.SendMessage(sMsg)
_, _, err = ks.producer.SendMessage(sMsg)

case string:
sMsg := &sarama.ProducerMessage{
Topic: ks.topic,
Value: sarama.StringEncoder(m),
}
ks.producer.SendMessage(sMsg)
_, _, err = ks.producer.SendMessage(sMsg)

default:
log.Printf("Unsupported message type %v", m)
}

if err != nil {
log.Printf("Error processing Kafka message: %s", err)
}
}
log.Printf("Closing the Kafka producer")

log.Printf("Closing Kafka producer")
ks.producer.Close()
}

Expand Down

0 comments on commit 3846c02

Please sign in to comment.