diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index 73da1f2adee..2cfe7e33291 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -19,6 +19,7 @@ import ( "io" "net" "strconv" + "strings" "sync" "syscall" @@ -293,7 +294,12 @@ func (a *saramaAdminClient) CreateTopic( ReplicationFactor: detail.ReplicationFactor, } query := func() error { - return a.admin.CreateTopic(detail.Name, request, validateOnly) + err := a.admin.CreateTopic(detail.Name, request, validateOnly) + // Ignore the already exists error because it's not harmful. + if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) { + return err + } + return nil } return a.queryClusterWithRetry(ctx, query) }