Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/kafka: fix send on closed channel panic #912

Merged
merged 8 commits into from
Sep 3, 2020
31 changes: 22 additions & 9 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type kafkaSaramaProducer struct {
flushedReceiver *notify.Receiver

closeCh chan struct{}
closed int32
}

func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, key []byte, value []byte, partition int32) error {
Expand All @@ -83,7 +84,8 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, key []byte, value
return ctx.Err()
case <-k.closeCh:
return nil
case k.asyncClient.Input() <- msg:
default:
k.asyncClient.Input() <- msg
}
return nil
}
Expand Down Expand Up @@ -146,31 +148,42 @@ func (k *kafkaSaramaProducer) GetPartitionNum() int32 {
return k.partitionNum
}

func (k *kafkaSaramaProducer) Close() error {
// stop closes the closeCh to signal other routines to exit
func (k *kafkaSaramaProducer) stop() {
select {
case <-k.closeCh:
return nil
return
default:
close(k.closeCh)
}
}

// Close implements the Producer interface
func (k *kafkaSaramaProducer) Close() error {
// close sarama client multiple times will cause panic
if atomic.LoadInt32(&k.closed) == 1 {
return nil
}
k.stop()
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
return err1
log.Error("close sync client with error", zap.Error(err1))
}
if err2 != nil {
return err2
log.Error("close async client with error", zap.Error(err2))
}
atomic.StoreInt32(&k.closed, 1)
return nil
}

func (k *kafkaSaramaProducer) run(ctx context.Context) error {
defer func() {
k.flushedReceiver.Stop()
err := k.Close()
if err != nil {
log.Error("close kafkaSaramaProducer with error", zap.Error(err))
}
k.stop()
}()
for {
select {
Expand Down