Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 7 additions & 59 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ package zkafka

import (
"context"
"fmt"
"sync"

"github.com/zillow/zfmt"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -36,15 +34,12 @@ type Client struct {
tp trace.TracerProvider
p propagation.TextMapPropagator

srf *schemaRegistryFactory

producerProvider confluentProducerProvider
consumerProvider confluentConsumerProvider
}

// NewClient instantiates a kafka client to get readers and writers
func NewClient(conf Config, opts ...Option) *Client {
srf := newSchemaRegistryFactory()
c := &Client{
conf: conf,
readers: make(map[string]*KReader),
Expand All @@ -53,7 +48,6 @@ func NewClient(conf Config, opts ...Option) *Client {

producerProvider: defaultConfluentProducerProvider{}.NewProducer,
consumerProvider: defaultConfluentConsumerProvider{}.NewConsumer,
srf: srf,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -81,20 +75,15 @@ func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts
if exist && !r.isClosed {
return r, nil
}

formatter, err := c.getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
})
marshaler, err := topicConfig.MarshalerFactory.GetMarshaler()
if err != nil {
return nil, err
}
reader, err := newReader(readerArgs{
cfg: c.conf,
cCfg: topicConfig,
consumerProvider: c.consumerProvider,
f: formatter,
marshaler: marshaler,
l: c.logger,
prefix: c.groupPrefix,
hooks: c.lifecycle,
Expand Down Expand Up @@ -128,20 +117,15 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts
if exist && !w.isClosed {
return w, nil
}
formatter, err := c.getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
})

marshaler, err := topicConfig.MarshalerFactory.GetMarshaler()
if err != nil {
return nil, err
}
writer, err := newWriter(writerArgs{
cfg: c.conf,
pCfg: topicConfig,
producerProvider: c.producerProvider,
f: formatter,
marshaler: marshaler,
l: c.logger,
t: getTracer(c.tp),
p: c.p,
Expand Down Expand Up @@ -176,42 +160,6 @@ func (c *Client) Close() error {
return err
}

func (c *Client) getFormatter(args formatterArgs) (kFormatter, error) {
formatter := args.formatter
schemaID := args.schemaID

switch formatter {
case AvroSchemaRegistry:
scl, err := c.srf.createAvro(args.srCfg)
if err != nil {
return nil, err
}
return newAvroSchemaRegistryFormatter(scl), nil
case ProtoSchemaRegistry:
scl, err := c.srf.createProto(args.srCfg)
if err != nil {
return nil, err
}
cf := newProtoSchemaRegistryFormatter(scl)
return cf, nil
case JSONSchemaRegistry:
scl, err := c.srf.createJson(args.srCfg)
if err != nil {
return nil, err
}
cf := newJsonSchemaRegistryFormatter(scl)
return cf, nil
case CustomFmt:
return &errFormatter{}, nil
default:
f, err := zfmt.GetFormatter(formatter, schemaID)
if err != nil {
return nil, fmt.Errorf("unsupported formatter %s", formatter)
}
return zfmtShim{F: f}, nil
}
}

func getTracer(tp trace.TracerProvider) trace.Tracer {
if tp == nil {
return nil
Expand All @@ -223,8 +171,8 @@ func getWriterKey(cfg ProducerTopicConfig) string {
return cfg.ClientID + "-" + cfg.Topic
}

type formatterArgs struct {
formatter zfmt.FormatterType
type marshalerArgs struct {
marshaler KMarshaler
schemaID int
srCfg SchemaRegistryConfig
rCfg SchemaRegistryConfig
}
83 changes: 11 additions & 72 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/zillow/zfmt"
zfmt_json "github.com/zillow/zfmt/json"
)

const (
Expand Down Expand Up @@ -75,16 +75,8 @@ type ConsumerTopicConfig struct {
// The values here will be overwritten by the values of TopicConfig fields if specified there as well.
AdditionalProps map[string]interface{}

// Formatter is json if not defined
Formatter zfmt.FormatterType

SchemaRegistry SchemaRegistryConfig

// SchemaID defines the schema registered with Confluent schema Registry
// Default value is 0, and it implies that both Writer and Reader do not care about schema validation
// and should encode/decode the message based on data type provided.
// Currently, this only works with SchematizedAvroFormatter
SchemaID int
// Marshaler defaults to json if not defined
MarshalerFactory KMarshalerFactory

// Enable kafka transaction, default to false
Transaction bool
Expand Down Expand Up @@ -124,14 +116,6 @@ type ConsumerTopicConfig struct {
DeadLetterTopicConfig *ProducerTopicConfig
}

func (p ConsumerTopicConfig) GetFormatter() zfmt.FormatterType {
return p.Formatter
}

func (p ConsumerTopicConfig) GetSchemaID() int {
return p.SchemaID
}

// topics returns a logical slice of the topics specified in the configuration,
// a combination of the singular Topic and enumerable Topics. It removes any empty topicNames
func (p ConsumerTopicConfig) topics() []string {
Expand Down Expand Up @@ -171,18 +155,8 @@ type ProducerTopicConfig struct {
// The values here will be overwritten by the values of TopicConfig fields if specified there as well.
AdditionalProps map[string]interface{}

// Formatter is json if not defined
Formatter zfmt.FormatterType

// SchemaRegistry provides details about connecting to a schema registry including URL
// as well as others.
SchemaRegistry SchemaRegistryConfig

// SchemaID defines the schema registered with Confluent schema Registry
// Default value is 0, and it implies that both Writer and Reader do not care about schema validation
// and should encode/decode the message based on data type provided.
// Currently, this only works with SchematizedAvroFormatter
SchemaID int
// Marshaler defaults to json if not defined
MarshalerFactory KMarshalerFactory

// Enable kafka transaction, default to false
Transaction bool
Expand All @@ -205,41 +179,6 @@ type ProducerTopicConfig struct {
SaslPassword *string
}

func (p ProducerTopicConfig) GetFormatter() zfmt.FormatterType {
return p.Formatter
}

func (p ProducerTopicConfig) GetSchemaID() int {
return p.SchemaID
}

type SchemaRegistryConfig struct {
// URL is the schema registry URL. During serialization and deserialization
// schema registry is checked against to confirm schema compatability.
URL string
// Serialization provides additional information used by schema registry formatters during serialization (data write)
Serialization SerializationConfig
// Deserialization provides additional information used by schema registry formatters during deserialization (data read)
Deserialization DeserializationConfig
// SubjectName allows the specification of the SubjectName. If not specified defaults to [topic name strategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy)
SubjectName string
}

type SerializationConfig struct {
// AutoRegisterSchemas indicates whether new schemas (those that evolve existing schemas or are brand new) should be registered
// with schema registry dynamically. This feature is typically not used for production workloads
AutoRegisterSchemas bool
// Schema is used exclusively by the avro schema registry formatter today. Its necessary to provide proper schema evolution properties
// expected by typical use cases.
Schema string
}

type DeserializationConfig struct {
// Schema is used exclusively by the avro schema registry formatter today. It's necessary to provide proper schema evolution properties
// expected by typical use cases.
Schema string
}

func getDefaultConsumerTopicConfig(topicConfig *ConsumerTopicConfig) error {
if topicConfig.ClientID == "" {
return errors.New("invalid config, ClientID must not be empty")
Expand All @@ -251,9 +190,9 @@ func getDefaultConsumerTopicConfig(topicConfig *ConsumerTopicConfig) error {
return &permError{e: errors.New("invalid config, no topics specified")}
}

if string(topicConfig.Formatter) == "" {
// default to json formatter
topicConfig.Formatter = zfmt.JSONFmt
if topicConfig.MarshalerFactory == nil {
// default to json marshaler
topicConfig.MarshalerFactory = KMarshalerFactoryShim{F: &zfmt_json.Formatter{}}
}

const defaultProcessTimeoutMillis = 60 * 1000
Expand Down Expand Up @@ -288,9 +227,9 @@ func getDefaultProducerTopicConfig(topicConfig *ProducerTopicConfig) error {
topicConfig.NagleDisable = ptr(true)
}

if string(topicConfig.Formatter) == "" {
// default to json formatter
topicConfig.Formatter = zfmt.JSONFmt
if topicConfig.MarshalerFactory == nil {
// default to json marshaler
topicConfig.MarshalerFactory = KMarshalerFactoryShim{F: &zfmt_json.Formatter{}}
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions example/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
"math/rand"
"time"

"github.com/zillow/zfmt"
zfmt_json "github.com/zillow/zfmt/json"
"github.com/zillow/zkafka/v2"

Check failure on line 10 in example/producer/main.go

View workflow job for this annotation

GitHub Actions / Lint

could not import github.com/zillow/zkafka/v2 (-: # github.com/zillow/zkafka/v2
)

func main() {
ctx := context.Background()
writer, err := zkafka.NewClient(zkafka.Config{

Check failure on line 15 in example/producer/main.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: zkafka (typecheck)
BootstrapServers: []string{"localhost:29092"},
}).Writer(ctx, zkafka.ProducerTopicConfig{

Check failure on line 17 in example/producer/main.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: zkafka (typecheck)
ClientID: "example",
Topic: "zkafka-example-topic",
Formatter: zfmt.JSONFmt,
ClientID: "example",
Topic: "zkafka-example-topic",
MarshalerFactory: zkafka.KMarshalerFactoryShim{F: &zfmt_json.Formatter{}},

Check failure on line 20 in example/producer/main.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: zkafka (typecheck)
})
randomNames := []string{"stewy", "lydia", "asif", "mike", "justin"}
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions example/v3_example/event.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "record",
"name": "DummyEvent",
"fields": [
{"name": "IntField", "type": "int"},
{"name": "DoubleField", "type": "double"},
{"name": "StringField", "type": "string"},
{"name": "BoolField", "type": "boolean"},
{"name": "BytesField", "type": "bytes"}
]
}
11 changes: 11 additions & 0 deletions example/v3_example/event_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading