Skip to content

Commit 1ed1dea

Browse files
committed
[FAB-11574] Use AdminClient to create topics
The Kafka AdminClient API is now used in order to create the topic required for a channel. Prior to this, the orderer relied on the Kafka cluster to have auto create topics enabled. This API is only supported for Kafka v0.10.1.0 and higher. If the API fails, the code still falls through to the auto create path. Change-Id: Id23d5b5f5826b24cb0cc3202e3ac6088e5ae3a49 Signed-off-by: Gari Singh <gari.r.singh@gmail.com>
1 parent 8cfb9ed commit 1ed1dea

File tree

9 files changed

+329
-1
lines changed

9 files changed

+329
-1
lines changed

integration/nwo/fabricconfig/orderer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ type OrdererAuthentication struct {
6767
TimeWindow time.Duration `yaml:"TimeWindow,omitempty"`
6868
}
6969

70+
type OrdererTopic struct {
71+
ReplicationFactor int16
72+
}
73+
7074
type FileLedger struct {
7175
Location string `yaml:"Location,omitempty"`
7276
Prefix string `yaml:"Prefix,omitempty"`
@@ -81,6 +85,7 @@ type Kafka struct {
8185
Verbose bool `yaml:"Verbose"`
8286
TLS *OrdererTLS `yaml:"TLS,omitempty"`
8387
SASLPlain *OrdererSASLPlain `yaml:"SASLPlain,omitempty"`
88+
Topic *OrdererTopic `yaml:"Topic,omitempty"`
8489
}
8590

8691
type Retry struct {

integration/nwo/orderer_template.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ Kafka:
6868
RetryMax: 3
6969
Consumer:
7070
RetryBackoff: 2s
71+
Topic:
72+
ReplicationFactor: 1
7173
Verbose: false
7274
TLS:
7375
Enabled: false

integration/runner/kafka.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
211211
func (k *Kafka) buildEnv() []string {
212212
env := []string{
213213
"KAFKA_LOG_RETENTION_MS=-1",
214+
//"KAFKA_AUTO_CREATE_TOPICS_ENABLE=false",
214215
fmt.Sprintf("KAFKA_MESSAGE_MAX_BYTES=%d", k.MessageMaxBytes),
215216
fmt.Sprintf("KAFKA_REPLICA_FETCH_MAX_BYTES=%d", k.ReplicaFetchMaxBytes),
216217
fmt.Sprintf("KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=%s", strconv.FormatBool(k.UncleanLeaderElectionEnable)),

orderer/common/localconfig/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ type Kafka struct {
110110
Version sarama.KafkaVersion // TODO Move this to global config
111111
TLS TLS
112112
SASLPlain SASLPlain
113+
Topic Topic
113114
}
114115

115116
// Retry contains configuration related to retries and timeouts when the
@@ -155,6 +156,11 @@ type Consumer struct {
155156
RetryBackoff time.Duration
156157
}
157158

159+
// Topic contains the settings to use when creating Kafka topics
160+
type Topic struct {
161+
ReplicationFactor int16
162+
}
163+
158164
// Debug contains configuration for the orderer's debug parameters.
159165
type Debug struct {
160166
BroadcastTraceDir string
@@ -219,6 +225,9 @@ var Defaults = TopLevel{
219225
TLS: TLS{
220226
Enabled: false,
221227
},
228+
Topic: Topic{
229+
ReplicationFactor: 3,
230+
},
222231
},
223232
Debug: Debug{
224233
BroadcastTraceDir: "",

orderer/consensus/kafka/chain.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,13 @@ func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
255255
func startThread(chain *chainImpl) {
256256
var err error
257257

258+
// Create topic if it does not exist (requires Kafka v0.10.1.0)
259+
err = setupTopicForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.consenter.topicDetail(), chain.channel)
260+
if err != nil {
261+
// log for now and fallback to auto create topics setting for broker
262+
logger.Infof("[channel: %s]: failed to create Kafka topic = %s", chain.channel.topic(), err)
263+
}
264+
258265
// Set up the producer
259266
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
260267
if err != nil {
@@ -950,3 +957,137 @@ func setupProducerForChannel(retryOptions localconfig.Retry, haltChan chan struc
950957

951958
return producer, setupProducer.retry()
952959
}
960+
961+
// Creates the Kafka topic for the channel if it does not already exist
962+
func setupTopicForChannel(retryOptions localconfig.Retry, haltChan chan struct{}, brokers []string, brokerConfig *sarama.Config, topicDetail *sarama.TopicDetail, channel channel) error {
963+
964+
// requires Kafka v0.10.1.0 or higher
965+
if !brokerConfig.Version.IsAtLeast(sarama.V0_10_1_0) {
966+
return nil
967+
}
968+
969+
logger.Infof("[channel: %s] Setting up the topic for this channel...",
970+
channel.topic())
971+
972+
retryMsg := fmt.Sprintf("Creating Kafka topic [%s] for channel [%s]",
973+
channel.topic(), channel.String())
974+
975+
setupTopic := newRetryProcess(
976+
retryOptions,
977+
haltChan,
978+
channel,
979+
retryMsg,
980+
func() error {
981+
982+
var err error
983+
clusterMembers := map[int32]*sarama.Broker{}
984+
var controllerId int32
985+
986+
// loop through brokers to access metadata
987+
for _, address := range brokers {
988+
broker := sarama.NewBroker(address)
989+
err = broker.Open(brokerConfig)
990+
991+
if err != nil {
992+
continue
993+
}
994+
995+
var ok bool
996+
ok, err = broker.Connected()
997+
if !ok {
998+
continue
999+
}
1000+
defer broker.Close()
1001+
1002+
// metadata request which includes the topic
1003+
var apiVersion int16
1004+
if brokerConfig.Version.IsAtLeast(sarama.V0_11_0_0) {
1005+
// use API version 4 to disable auto topic creation for
1006+
// metadata requests
1007+
apiVersion = 4
1008+
} else {
1009+
apiVersion = 1
1010+
}
1011+
metadata, err := broker.GetMetadata(&sarama.MetadataRequest{
1012+
Version: apiVersion,
1013+
Topics: []string{channel.topic()},
1014+
AllowAutoTopicCreation: false})
1015+
1016+
if err != nil {
1017+
continue
1018+
}
1019+
1020+
controllerId = metadata.ControllerID
1021+
for _, broker := range metadata.Brokers {
1022+
clusterMembers[broker.ID()] = broker
1023+
}
1024+
1025+
for _, topic := range metadata.Topics {
1026+
if topic.Name == channel.topic() {
1027+
if topic.Err != sarama.ErrUnknownTopicOrPartition {
1028+
// auto create topics must be enabled so return
1029+
return nil
1030+
}
1031+
}
1032+
}
1033+
break
1034+
}
1035+
1036+
// check to see if we got any metadata from any of the brokers in the list
1037+
if len(clusterMembers) == 0 {
1038+
return fmt.Errorf(
1039+
"error creating topic [%s]; failed to retrieve metadata for the cluster",
1040+
channel.topic())
1041+
}
1042+
1043+
// get the controller
1044+
controller := clusterMembers[controllerId]
1045+
err = controller.Open(brokerConfig)
1046+
1047+
if err != nil {
1048+
return err
1049+
}
1050+
1051+
var ok bool
1052+
ok, err = controller.Connected()
1053+
if !ok {
1054+
return err
1055+
}
1056+
defer controller.Close()
1057+
1058+
// create the topic
1059+
req := &sarama.CreateTopicsRequest{
1060+
Version: 0,
1061+
TopicDetails: map[string]*sarama.TopicDetail{
1062+
channel.topic(): topicDetail},
1063+
Timeout: 3 * time.Second}
1064+
resp := &sarama.CreateTopicsResponse{}
1065+
resp, err = controller.CreateTopics(req)
1066+
if err != nil {
1067+
return err
1068+
}
1069+
1070+
// check the response
1071+
if topicErr, ok := resp.TopicErrors[channel.topic()]; ok {
1072+
// treat no error and topic exists error as success
1073+
if topicErr.Err == sarama.ErrNoError ||
1074+
topicErr.Err == sarama.ErrTopicAlreadyExists {
1075+
return nil
1076+
}
1077+
if topicErr.Err == sarama.ErrInvalidTopic {
1078+
// topic is invalid so abort
1079+
logger.Warningf("[channel: %s] Failed to set up topic = %s",
1080+
channel.topic(), topicErr.Err.Error())
1081+
go func() {
1082+
haltChan <- struct{}{}
1083+
}()
1084+
}
1085+
return fmt.Errorf("error creating topic: [%s]",
1086+
topicErr.Err.Error())
1087+
}
1088+
1089+
return nil
1090+
})
1091+
1092+
return setupTopic.retry()
1093+
}

orderer/consensus/kafka/chain_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,161 @@ func TestChain(t *testing.T) {
447447
})
448448
}
449449

450+
func TestSetupTopicForChannel(t *testing.T) {
451+
452+
mockChannel := newChannel(channelNameForTest(t), defaultPartition)
453+
haltChan := make(chan struct{})
454+
455+
mockBrokerNoError := sarama.NewMockBroker(t, 0)
456+
defer mockBrokerNoError.Close()
457+
metadataResponse := sarama.NewMockMetadataResponse(t)
458+
metadataResponse.SetBroker(mockBrokerNoError.Addr(),
459+
mockBrokerNoError.BrokerID())
460+
metadataResponse.SetController(mockBrokerNoError.BrokerID())
461+
462+
mdrUnknownTopicOrPartition := &sarama.MetadataResponse{
463+
Version: 1,
464+
Brokers: []*sarama.Broker{sarama.NewBroker(mockBrokerNoError.Addr())},
465+
ControllerID: -1,
466+
Topics: []*sarama.TopicMetadata{
467+
{
468+
Err: sarama.ErrUnknownTopicOrPartition,
469+
Name: mockChannel.topic(),
470+
},
471+
},
472+
}
473+
474+
mockBrokerNoError.SetHandlerByMap(map[string]sarama.MockResponse{
475+
"CreateTopicsRequest": sarama.NewMockWrapper(
476+
&sarama.CreateTopicsResponse{
477+
TopicErrors: map[string]*sarama.TopicError{
478+
mockChannel.topic(): {
479+
Err: sarama.ErrNoError}}}),
480+
"MetadataRequest": sarama.NewMockWrapper(mdrUnknownTopicOrPartition)})
481+
482+
mockBrokerTopicExists := sarama.NewMockBroker(t, 1)
483+
defer mockBrokerTopicExists.Close()
484+
mockBrokerTopicExists.SetHandlerByMap(map[string]sarama.MockResponse{
485+
"CreateTopicsRequest": sarama.NewMockWrapper(
486+
&sarama.CreateTopicsResponse{
487+
TopicErrors: map[string]*sarama.TopicError{
488+
mockChannel.topic(): {
489+
Err: sarama.ErrTopicAlreadyExists}}}),
490+
"MetadataRequest": sarama.NewMockWrapper(&sarama.MetadataResponse{
491+
Version: 1,
492+
Topics: []*sarama.TopicMetadata{
493+
{
494+
Name: channelNameForTest(t),
495+
Err: sarama.ErrNoError}}})})
496+
497+
mockBrokerInvalidTopic := sarama.NewMockBroker(t, 2)
498+
defer mockBrokerInvalidTopic.Close()
499+
metadataResponse = sarama.NewMockMetadataResponse(t)
500+
metadataResponse.SetBroker(mockBrokerInvalidTopic.Addr(),
501+
mockBrokerInvalidTopic.BrokerID())
502+
metadataResponse.SetController(mockBrokerInvalidTopic.BrokerID())
503+
mockBrokerInvalidTopic.SetHandlerByMap(map[string]sarama.MockResponse{
504+
"CreateTopicsRequest": sarama.NewMockWrapper(
505+
&sarama.CreateTopicsResponse{
506+
TopicErrors: map[string]*sarama.TopicError{
507+
mockChannel.topic(): {
508+
Err: sarama.ErrInvalidTopic}}}),
509+
"MetadataRequest": metadataResponse})
510+
511+
mockBrokerInvalidTopic2 := sarama.NewMockBroker(t, 3)
512+
defer mockBrokerInvalidTopic2.Close()
513+
mockBrokerInvalidTopic2.SetHandlerByMap(map[string]sarama.MockResponse{
514+
"CreateTopicsRequest": sarama.NewMockWrapper(
515+
&sarama.CreateTopicsResponse{
516+
TopicErrors: map[string]*sarama.TopicError{
517+
mockChannel.topic(): {
518+
Err: sarama.ErrInvalidTopic}}}),
519+
"MetadataRequest": sarama.NewMockWrapper(&sarama.MetadataResponse{
520+
Version: 1,
521+
Brokers: []*sarama.Broker{sarama.NewBroker(mockBrokerInvalidTopic2.Addr())},
522+
ControllerID: mockBrokerInvalidTopic2.BrokerID()})})
523+
524+
closedBroker := sarama.NewMockBroker(t, 99)
525+
badAddress := closedBroker.Addr()
526+
closedBroker.Close()
527+
528+
var tests = []struct {
529+
name string
530+
brokers []string
531+
brokerConfig *sarama.Config
532+
version sarama.KafkaVersion
533+
expectErr bool
534+
errorMsg string
535+
}{
536+
{
537+
name: "Unsupported Version",
538+
brokers: []string{mockBrokerNoError.Addr()},
539+
brokerConfig: sarama.NewConfig(),
540+
version: sarama.V0_9_0_0,
541+
expectErr: false,
542+
},
543+
{
544+
name: "No Error",
545+
brokers: []string{mockBrokerNoError.Addr()},
546+
brokerConfig: sarama.NewConfig(),
547+
version: sarama.V0_10_2_0,
548+
expectErr: false,
549+
},
550+
{
551+
name: "Topic Exists",
552+
brokers: []string{mockBrokerTopicExists.Addr()},
553+
brokerConfig: sarama.NewConfig(),
554+
version: sarama.V0_10_2_0,
555+
expectErr: false,
556+
},
557+
{
558+
name: "Invalid Topic",
559+
brokers: []string{mockBrokerInvalidTopic.Addr()},
560+
brokerConfig: sarama.NewConfig(),
561+
version: sarama.V0_10_2_0,
562+
expectErr: true,
563+
errorMsg: "process asked to exit",
564+
},
565+
{
566+
name: "Multiple Brokers - One No Error",
567+
brokers: []string{badAddress, mockBrokerNoError.Addr()},
568+
brokerConfig: sarama.NewConfig(),
569+
version: sarama.V0_10_2_0,
570+
expectErr: false,
571+
},
572+
{
573+
name: "Multiple Brokers - All Errors",
574+
brokers: []string{badAddress, badAddress},
575+
brokerConfig: sarama.NewConfig(),
576+
version: sarama.V0_10_2_0,
577+
expectErr: true,
578+
errorMsg: "failed to retrieve metadata",
579+
},
580+
}
581+
582+
for _, test := range tests {
583+
test := test
584+
t.Run(test.name, func(t *testing.T) {
585+
test.brokerConfig.Version = test.version
586+
err := setupTopicForChannel(
587+
mockRetryOptions,
588+
haltChan,
589+
test.brokers,
590+
test.brokerConfig,
591+
&sarama.TopicDetail{
592+
NumPartitions: 1,
593+
ReplicationFactor: 2},
594+
mockChannel)
595+
if test.expectErr {
596+
assert.Contains(t, err.Error(), test.errorMsg)
597+
} else {
598+
assert.NoError(t, err)
599+
}
600+
})
601+
}
602+
603+
}
604+
450605
func TestSetupProducerForChannel(t *testing.T) {
451606
if testing.Short() {
452607
t.Skip("Skipping test in short mode")

0 commit comments

Comments
 (0)