Skip to content

Netcracker/qubership-core-lib-go-maas-segmentio

Go build Coverage duplicated_lines_density vulnerabilities bugs code_smells

segmentio

This lib provides methods to provision segmentio kafka-go structs by topic information received from MaaS. The provided Writer and Reader will receive all required configuration regarding communication with Kafka server

1. Writer. To create kafka-go Write struct based on response from MaaS, the following code can be used:

import (
  "context"
  "fmt"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
  maas "github.com/netcracker/qubership-core-lib-go-maas-core"
  "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  kafkago "github.com/segmentio/kafka-go"
)

func producer() {
  ctxmanager.Register(baseproviders.Get())
  ctx := context.Background()
  kafkaClient := maas.NewKafkaClient()
  topicAddress, _ := kafkaClient.GetOrCreateTopic(ctx, classifier.New("test"))
  writer, _ := segmentio.NewWriter(*topicAddress)
  ctxData, _ := ctxmanager.GetResponsePropagatableContextData(ctx)
  message := kafkago.Message{
  	Key:     []byte("price"),
  	Value:   []byte("10USD"),
  	Headers: segmentioHelper.BuildHeaders(ctxData),
  }
  writer.WriteMessages(ctx, message)
}

2. Reader. to create kafka-go Reader struct based on response from MaaS, the following code can be used:

GetTopic example:

import (
  "context"
  "fmt"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
  maas "github.com/netcracker/qubership-core-lib-go-maas-core"
  segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  "github.com/segmentio/kafka-go"
  "time"
)

func consumer() {
  ctxmanager.Register(baseproviders.Get())
  ctx := context.Background()
  kafkaClient := maas.NewKafkaClient()
  topicAddress, _ := kafkaClient.GetTopic(ctx, classifier.New("test"))
  readerConfig, _ := segmentioHelper.NewReaderConfig(*topicAddress, "prices-group-id")
  reader := kafka.NewReader(*readerConfig)
  defer reader.Close()
  for {
  	msg, _ := reader.ReadMessage(ctx)
  	localCtx := ctxmanager.InitContext(ctx, segmentioHelper.ExtractHeaders(msg.Headers))
  	fmt.Printf("message=%v, ctx=%v", msg, localCtx)
  }
}

WatchTopicCreate example:

import (
  "context"
  "fmt"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
  maas "github.com/netcracker/qubership-core-lib-go-maas-core"
  segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  "github.com/segmentio/kafka-go"
  "time"
)

func watchingConsumer() {
  ctxmanager.Register(baseproviders.Get())
  ctx := context.Background()
  kafkaClient := maas.NewKafkaClient()
  kafkaClient.WatchTopicCreate(ctx, classifier.New("test"), func(topicAddress model.TopicAddress) {
  	readerConfig, _ := segmentioHelper.NewReaderConfig(topicAddress, "prices-group-id")
  	reader := kafka.NewReader(*readerConfig)
  	defer reader.Close()
  	for {
  		msg, _ := reader.ReadMessage(ctx)
  		localCtx := ctxmanager.InitContext(ctx, segmentioHelper.ExtractHeaders(msg.Headers))
  		fmt.Printf("message=%v, ctx=%v", msg, localCtx)
  	}
  })
}

WatchTenantTopics example:

import (
	"context"
	"github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
	"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
	kafkaCl "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka"
	segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
	"github.com/segmentio/kafka-go"
)

func watchingTenantTopics(ctx context.Context, readers []*kafka.Reader, kafkaClient kafkaCl.MaasClient) {
	kafkaClient.WatchTenantTopics(ctx, classifier.New("test"), func(topicAddresses []model.TopicAddress) {
		// close current readers
		for _, reader := range readers  {
			reader.Close()
		}
		// create new readers
		for _, topicAddress := range topicAddresses {
			tenantId := topicAddress.Classifier[classifier.TenantId]
			groupId := "tenant-prices-group-id-" + tenantId
			readerConfig, err := segmentioHelper.NewReaderConfig(topicAddress, groupId)
			if err != nil {
				panic(err.Error())
			}
			readers = append(readers, kafka.NewReader(*readerConfig))
		}
		// start consuming messages from readers
		// ...
	})
}

3. Customize underlying segmentio structs:

  import (
	"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
	segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  )

func writerWithOptions(topicAddress model.TopicAddress) {
    writer, err := segmentioHelper.NewWriter(topicAddress,
        segmentioHelper.WriterOptions{AlterTransport: func(transport *kafka.Transport) (*kafka.Transport, error) {
            transport.MetadataTTL = time.Minute
            transport.IdleTimeout = 2 * time.Minute
            transport.DialTimeout = 3 * time.Minute
            return transport, nil
        }})
}

func readerConfigWithOptions(topicAddress model.TopicAddress) {
	readerConfig, err := segmentioHelper.NewReaderConfig(topicAddress, "group-id",
		segmentioHelper.ReaderOptions{AlterDialer: func(dialer *kafka.Dialer) (*kafka.Dialer, error) {
			dialer.Timeout = time.Minute
			dialer.KeepAlive = 2 * time.Minute
			dialer.FallbackDelay = 3 * time.Minute
			return dialer, nil
		}})
}

func clientWithOptions(topicAddress model.TopicAddress) {
	client, err := segmentioHelper.NewClient(topicAddress,
		segmentioHelper.ClientOptions{AlterTransport: func(transport *kafka.Transport) (*kafka.Transport, error) {
			transport.MetadataTTL = time.Minute
			transport.IdleTimeout = 2 * time.Minute
			transport.DialTimeout = 3 * time.Minute
			return transport, nil
		}})
}

About

No description or website provided.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

No packages published

Contributors 8

Languages