Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/kafka: fix send on closed channel panic #912

Merged
merged 8 commits into from
Sep 3, 2020
44 changes: 35 additions & 9 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -56,6 +57,10 @@ func NewKafkaConfig() Config {
}

type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncClient and syncClient.
// Since we don't close these two clients (which have a input chan) from the
// sender routine, data race or send on closed chan could happen.
clientLock sync.RWMutex
asyncClient sarama.AsyncProducer
syncClient sarama.SyncProducer
topic string
Expand All @@ -71,9 +76,12 @@ type kafkaSaramaProducer struct {
failpointCh chan error

closeCh chan struct{}
closed int32
}

func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, key []byte, value []byte, partition int32) error {
k.clientLock.RLock()
defer k.clientLock.RUnlock()
msg := &sarama.ProducerMessage{
Topic: k.topic,
Key: sarama.ByteEncoder(key),
Expand All @@ -95,12 +103,15 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, key []byte, value
return ctx.Err()
case <-k.closeCh:
return nil
case k.asyncClient.Input() <- msg:
default:
k.asyncClient.Input() <- msg
}
return nil
}

func (k *kafkaSaramaProducer) SyncBroadcastMessage(ctx context.Context, key []byte, value []byte) error {
k.clientLock.RLock()
defer k.clientLock.RUnlock()
msgs := make([]*sarama.ProducerMessage, k.partitionNum)
for i := 0; i < int(k.partitionNum); i++ {
msgs[i] = &sarama.ProducerMessage{
Expand Down Expand Up @@ -169,31 +180,46 @@ func (k *kafkaSaramaProducer) GetPartitionNum() int32 {
return k.partitionNum
}

func (k *kafkaSaramaProducer) Close() error {
// stop closes the closeCh to signal other routines to exit
func (k *kafkaSaramaProducer) stop() {
k.clientLock.Lock()
defer k.clientLock.Unlock()
select {
case <-k.closeCh:
return nil
return
default:
close(k.closeCh)
}
}

// Close implements the Producer interface
func (k *kafkaSaramaProducer) Close() error {
k.stop()
k.clientLock.Lock()
defer k.clientLock.Unlock()
// close sarama client multiple times will cause panic
if atomic.LoadInt32(&k.closed) == 1 {
return nil
}
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
return err1
log.Error("close sync client with error", zap.Error(err1))
}
if err2 != nil {
return err2
log.Error("close async client with error", zap.Error(err2))
}
atomic.StoreInt32(&k.closed, 1)
return nil
}

func (k *kafkaSaramaProducer) run(ctx context.Context) error {
defer func() {
k.flushedReceiver.Stop()
err := k.Close()
if err != nil {
log.Error("close kafkaSaramaProducer with error", zap.Error(err))
}
k.stop()
}()
for {
select {
Expand Down