Skip to content

Commit

Permalink
kafka(ticdc): ignore sarama admin create topic exist (#8392)
Browse files Browse the repository at this point in the history
close #8391
  • Loading branch information
3AceShowHand authored Feb 28, 2023
1 parent 71e2c49 commit ece5df4
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"net"
"strconv"
"strings"
"sync"
"syscall"

Expand Down Expand Up @@ -293,7 +294,12 @@ func (a *saramaAdminClient) CreateTopic(
ReplicationFactor: detail.ReplicationFactor,
}
query := func() error {
return a.admin.CreateTopic(detail.Name, request, validateOnly)
err := a.admin.CreateTopic(detail.Name, request, validateOnly)
// Ignore the already exists error because it's not harmful.
if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) {
return err
}
return nil
}
return a.queryClusterWithRetry(ctx, query)
}
Expand Down

0 comments on commit ece5df4

Please sign in to comment.