diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 65840e3aa..c237f1e12 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -27,6 +27,15 @@ jobs: - 9091:9091 - 9092:9092 + kafka_confluent: + image: confluentinc/confluent-local:7.6.0 + ports: + - "9192:9192" + env: + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29192,PLAINTEXT_HOST://localhost:9192' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:29193' + KAFKA_LISTENERS: 'PLAINTEXT://localhost:29192,CONTROLLER://localhost:29193,PLAINTEXT_HOST://0.0.0.0:9192' + natss: image: nats-streaming:0.22.1 ports: diff --git a/docs/index.md b/docs/index.md index 23f9b275d..060ad3e91 100644 --- a/docs/index.md +++ b/docs/index.md @@ -124,7 +124,8 @@ err := json.Unmarshal(bytes, &event) | AVRO Event Format | :x: | :x: | | [HTTP Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/http) | :heavy_check_mark: | :heavy_check_mark: | | [JSON Event Format](event_data_structure.md#marshalunmarshal-event-to-json) | :heavy_check_mark: | :heavy_check_mark: | -| [Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: | +| [Sarama Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: | +| [Confluent Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka_confluent) | :heavy_check_mark: | :heavy_check_mark: | | MQTT Protocol Binding | :x: | :x: | | [NATS Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/nats) | :heavy_check_mark: | :heavy_check_mark: | | [STAN Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/stan) | :heavy_check_mark: | :heavy_check_mark: | diff --git a/protocol/kafka_confluent/v2/go.mod b/protocol/kafka_confluent/v2/go.mod new file mode 100644 index 000000000..88690072e --- /dev/null +++ b/protocol/kafka_confluent/v2/go.mod @@ -0,0 +1,25 @@ +module github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 + +go 1.18 + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +require ( + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/go-cmp v0.5.9 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/json-iterator/go v1.1.11 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.10.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/protocol/kafka_confluent/v2/go.sum b/protocol/kafka_confluent/v2/go.sum new file mode 100644 index 000000000..389b199e2 --- /dev/null +++ b/protocol/kafka_confluent/v2/go.sum @@ -0,0 +1,68 @@ +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= +github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= +github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= +github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA= +github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= +github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= +github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= +github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= +github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= +github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec= +github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= +google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/protocol/kafka_confluent/v2/message.go b/protocol/kafka_confluent/v2/message.go new file mode 100644 index 000000000..164879a11 --- /dev/null +++ b/protocol/kafka_confluent/v2/message.go @@ -0,0 +1,156 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "bytes" + "context" + "strconv" + "strings" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +const ( + prefix = "ce-" + contentTypeKey = "content-type" +) + +const ( + KafkaOffsetKey = "kafkaoffset" + KafkaPartitionKey = "kafkapartition" + KafkaTopicKey = "kafkatopic" + KafkaMessageKey = "kafkamessagekey" +) + +var specs = spec.WithPrefix(prefix) + +// Message represents a Kafka message. +// This message *can* be read several times safely +type Message struct { + internal *kafka.Message + properties map[string][]byte + format format.Format + version spec.Version +} + +// Check if Message implements binding.Message +var ( + _ binding.Message = (*Message)(nil) + _ binding.MessageMetadataReader = (*Message)(nil) +) + +// NewMessage returns a binding.Message that holds the provided kafka.Message. +// The returned binding.Message *can* be read several times safely +// This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance +func NewMessage(msg *kafka.Message) *Message { + if msg == nil { + panic("the kafka.Message shouldn't be nil") + } + if msg.TopicPartition.Topic == nil { + panic("the topic of kafka.Message shouldn't be nil") + } + if msg.TopicPartition.Partition < 0 || msg.TopicPartition.Offset < 0 { + panic("the partition or offset of the kafka.Message must be non-negative") + } + + var contentType, contentVersion string + properties := make(map[string][]byte, len(msg.Headers)+3) + for _, header := range msg.Headers { + k := strings.ToLower(string(header.Key)) + if k == strings.ToLower(contentTypeKey) { + contentType = string(header.Value) + } + if k == specs.PrefixedSpecVersionName() { + contentVersion = string(header.Value) + } + properties[k] = header.Value + } + + // add the kafka message key, topic, partition and partition key to the properties + properties[prefix+KafkaOffsetKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Offset), 10)) + properties[prefix+KafkaPartitionKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Partition), 10)) + properties[prefix+KafkaTopicKey] = []byte(*msg.TopicPartition.Topic) + if msg.Key != nil { + properties[prefix+KafkaMessageKey] = msg.Key + } + + message := &Message{ + internal: msg, + properties: properties, + } + if ft := format.Lookup(contentType); ft != nil { + message.format = ft + } else if v := specs.Version(contentVersion); v != nil { + message.version = v + } + + return message +} + +func (m *Message) ReadEncoding() binding.Encoding { + if m.version != nil { + return binding.EncodingBinary + } + if m.format != nil { + return binding.EncodingStructured + } + return binding.EncodingUnknown +} + +func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { + if m.format != nil { + return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.Value)) + } + return binding.ErrNotStructured +} + +func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { + if m.version == nil { + return binding.ErrNotBinary + } + + var err error + for k, v := range m.properties { + if strings.HasPrefix(k, prefix) { + attr := m.version.Attribute(k) + if attr != nil { + err = encoder.SetAttribute(attr, string(v)) + } else { + err = encoder.SetExtension(strings.TrimPrefix(k, prefix), string(v)) + } + } else if k == strings.ToLower(contentTypeKey) { + err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(v)) + } + if err != nil { + return err + } + } + + if m.internal.Value != nil { + err = encoder.SetData(bytes.NewBuffer(m.internal.Value)) + } + return err +} + +func (m *Message) Finish(error) error { + return nil +} + +func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) { + attr := m.version.AttributeFromKind(k) + if attr == nil { + return nil, nil + } + return attr, m.properties[attr.PrefixedName()] +} + +func (m *Message) GetExtension(name string) interface{} { + return m.properties[prefix+name] +} diff --git a/protocol/kafka_confluent/v2/message_test.go b/protocol/kafka_confluent/v2/message_test.go new file mode 100644 index 000000000..e7f599b63 --- /dev/null +++ b/protocol/kafka_confluent/v2/message_test.go @@ -0,0 +1,131 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "context" + "testing" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/stretchr/testify/require" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/test" +) + +var ( + ctx = context.Background() + testEvent = test.FullEvent() + testTopic = "test-topic" + topicPartition = kafka.TopicPartition{ + Topic: &testTopic, + Partition: int32(0), + Offset: kafka.Offset(10), + } + structuredConsumerMessage = &kafka.Message{ + TopicPartition: topicPartition, + Value: func() []byte { + b, _ := format.JSON.Marshal(&testEvent) + return b + }(), + Headers: []kafka.Header{{ + Key: "content-type", + Value: []byte(cloudevents.ApplicationCloudEventsJSON), + }}, + } + binaryConsumerMessage = &kafka.Message{ + TopicPartition: topicPartition, + Value: []byte("hello world!"), + Headers: mapToKafkaHeaders(map[string]string{ + "ce-type": testEvent.Type(), + "ce-source": testEvent.Source(), + "ce-id": testEvent.ID(), + "ce-time": test.Timestamp.String(), + "ce-specversion": "1.0", + "ce-dataschema": test.Schema.String(), + "ce-datacontenttype": "text/json", + "ce-subject": "receiverTopic", + "exta": "someext", + }), + } +) + +func TestNewMessage(t *testing.T) { + tests := []struct { + name string + consumerMessage *kafka.Message + expectedEncoding binding.Encoding + }{ + { + name: "Structured encoding", + consumerMessage: structuredConsumerMessage, + expectedEncoding: binding.EncodingStructured, + }, + { + name: "Binary encoding", + consumerMessage: binaryConsumerMessage, + expectedEncoding: binding.EncodingBinary, + }, + { + name: "Unknown encoding", + consumerMessage: &kafka.Message{ + TopicPartition: topicPartition, + Value: []byte("{}"), + Headers: []kafka.Header{{ + Key: "content-type", + Value: []byte("application/json"), + }}, + }, + expectedEncoding: binding.EncodingUnknown, + }, + { + name: "Binary encoding with empty value", + consumerMessage: &kafka.Message{ + TopicPartition: topicPartition, + Value: nil, + Headers: mapToKafkaHeaders(map[string]string{ + "ce-type": testEvent.Type(), + "ce-source": testEvent.Source(), + "ce-id": testEvent.ID(), + "ce-time": test.Timestamp.String(), + "ce-specversion": "1.0", + "ce-dataschema": test.Schema.String(), + "ce-datacontenttype": "text/json", + "ce-subject": "receiverTopic", + }), + }, + expectedEncoding: binding.EncodingBinary, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msg := NewMessage(tt.consumerMessage) + require.Equal(t, tt.expectedEncoding, msg.ReadEncoding()) + + var err error + if tt.expectedEncoding == binding.EncodingStructured { + err = msg.ReadStructured(ctx, (*kafkaMessageWriter)(tt.consumerMessage)) + } + + if tt.expectedEncoding == binding.EncodingBinary { + err = msg.ReadBinary(ctx, (*kafkaMessageWriter)(tt.consumerMessage)) + } + require.Nil(t, err) + }) + } +} + +func mapToKafkaHeaders(m map[string]string) []kafka.Header { + res := make([]kafka.Header, len(m)) + i := 0 + for k, v := range m { + res[i] = kafka.Header{Key: k, Value: []byte(v)} + i++ + } + return res +} diff --git a/protocol/kafka_confluent/v2/option.go b/protocol/kafka_confluent/v2/option.go new file mode 100644 index 000000000..e3b0b566f --- /dev/null +++ b/protocol/kafka_confluent/v2/option.go @@ -0,0 +1,151 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "context" + "errors" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +// Option is the function signature required to be considered an kafka_confluent.Option. +type Option func(*Protocol) error + +// WithConfigMap sets the configMap to init the kafka client. This option is not required. +func WithConfigMap(config *kafka.ConfigMap) Option { + return func(p *Protocol) error { + if config == nil { + return errors.New("the kafka.ConfigMap option must not be nil") + } + p.kafkaConfigMap = config + return nil + } +} + +// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required. +func WithSenderTopic(defaultTopic string) Option { + return func(p *Protocol) error { + if defaultTopic == "" { + return errors.New("the producer topic option must not be nil") + } + p.producerDefaultTopic = defaultTopic + return nil + } +} + +// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required. +func WithReceiverTopics(topics []string) Option { + return func(p *Protocol) error { + if topics == nil { + return errors.New("the consumer topics option must not be nil") + } + p.consumerTopics = topics + return nil + } +} + +// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required. +func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { + return func(p *Protocol) error { + if rebalanceCb == nil { + return errors.New("the consumer group rebalance callback must not be nil") + } + p.consumerRebalanceCb = rebalanceCb + return nil + } +} + +// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required. +func WithPollTimeout(timeoutMs int) Option { + return func(p *Protocol) error { + p.consumerPollTimeout = timeoutMs + return nil + } +} + +// WithSender set a kafka.Producer instance to init the client directly. This option is not required. +func WithSender(producer *kafka.Producer) Option { + return func(p *Protocol) error { + if producer == nil { + return errors.New("the producer option must not be nil") + } + p.producer = producer + return nil + } +} + +// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required. +func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option { + return func(p *Protocol) error { + p.consumerErrorHandler = handler + return nil + } +} + +// WithSender set a kafka.Consumer instance to init the client directly. This option is not required. +func WithReceiver(consumer *kafka.Consumer) Option { + return func(p *Protocol) error { + if consumer == nil { + return errors.New("the consumer option must not be nil") + } + p.consumer = consumer + return nil + } +} + +// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required. +type topicPartitionOffsetsType struct{} + +var offsetKey = topicPartitionOffsetsType{} + +// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required. +func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context { + if len(topicPartitionOffsets) == 0 { + panic("the topicPartitionOffsets cannot be empty") + } + for _, offset := range topicPartitionOffsets { + if offset.Topic == nil || *(offset.Topic) == "" { + panic("the kafka topic cannot be nil or empty") + } + if offset.Partition < 0 || offset.Offset < 0 { + panic("the kafka partition/offset must be non-negative") + } + } + return context.WithValue(ctx, offsetKey, topicPartitionOffsets) +} + +// TopicPartitionOffsetsFrom looks in the given context and returns []kafka.TopicPartition or nil if not set +func TopicPartitionOffsetsFrom(ctx context.Context) []kafka.TopicPartition { + c := ctx.Value(offsetKey) + if c != nil { + if s, ok := c.([]kafka.TopicPartition); ok { + return s + } + } + return nil +} + +// Opaque key type used to store message key +type messageKeyType struct{} + +var keyForMessageKey = messageKeyType{} + +// WithMessageKey returns back a new context with the given messageKey. +func WithMessageKey(ctx context.Context, messageKey string) context.Context { + return context.WithValue(ctx, keyForMessageKey, messageKey) +} + +// MessageKeyFrom looks in the given context and returns `messageKey` as a string if found and valid, otherwise "". +func MessageKeyFrom(ctx context.Context) string { + c := ctx.Value(keyForMessageKey) + if c != nil { + if s, ok := c.(string); ok { + return s + } + } + return "" +} diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go new file mode 100644 index 000000000..8aa906853 --- /dev/null +++ b/protocol/kafka_confluent/v2/protocol.go @@ -0,0 +1,245 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + + cecontext "github.com/cloudevents/sdk-go/v2/context" +) + +var ( + _ protocol.Sender = (*Protocol)(nil) + _ protocol.Opener = (*Protocol)(nil) + _ protocol.Receiver = (*Protocol)(nil) + _ protocol.Closer = (*Protocol)(nil) +) + +type Protocol struct { + kafkaConfigMap *kafka.ConfigMap + + consumer *kafka.Consumer + consumerTopics []string + consumerRebalanceCb kafka.RebalanceCb // optional + consumerPollTimeout int // optional + consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional + consumerMux sync.Mutex + consumerIncoming chan *kafka.Message + consumerCtx context.Context + consumerCancel context.CancelFunc + + producer *kafka.Producer + producerDeliveryChan chan kafka.Event // optional + producerDefaultTopic string // optional + + closerMux sync.Mutex +} + +func New(opts ...Option) (*Protocol, error) { + p := &Protocol{ + consumerPollTimeout: 100, + consumerIncoming: make(chan *kafka.Message), + } + if err := p.applyOptions(opts...); err != nil { + return nil, err + } + + if p.kafkaConfigMap != nil { + if p.consumerTopics != nil && p.consumer == nil { + consumer, err := kafka.NewConsumer(p.kafkaConfigMap) + if err != nil { + return nil, err + } + p.consumer = consumer + } + if p.producerDefaultTopic != "" && p.producer == nil { + producer, err := kafka.NewProducer(p.kafkaConfigMap) + if err != nil { + return nil, err + } + p.producer = producer + } + if p.producer == nil && p.consumer == nil { + return nil, errors.New("at least receiver or sender topic must be set") + } + } + if p.producerDefaultTopic != "" && p.producer == nil { + return nil, fmt.Errorf("at least configmap or producer must be set for the sender topic: %s", p.producerDefaultTopic) + } + + if len(p.consumerTopics) > 0 && p.consumer == nil { + return nil, fmt.Errorf("at least configmap or consumer must be set for the receiver topics: %s", p.consumerTopics) + } + + if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil { + return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer") + } + if p.producer != nil { + p.producerDeliveryChan = make(chan kafka.Event) + } + return p, nil +} + +func (p *Protocol) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { + if p.producer == nil { + return errors.New("producer client must be set") + } + + p.closerMux.Lock() + defer p.closerMux.Unlock() + if p.producer.IsClosed() { + return errors.New("producer is closed") + } + + defer in.Finish(err) + + kafkaMsg := &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &p.producerDefaultTopic, + Partition: kafka.PartitionAny, + }, + } + + if topic := cecontext.TopicFrom(ctx); topic != "" { + kafkaMsg.TopicPartition.Topic = &topic + } + + if messageKey := MessageKeyFrom(ctx); messageKey != "" { + kafkaMsg.Key = []byte(messageKey) + } + + err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...) + if err != nil { + return err + } + + err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan) + if err != nil { + return err + } + e := <-p.producerDeliveryChan + m := e.(*kafka.Message) + if m.TopicPartition.Error != nil { + return m.TopicPartition.Error + } + return nil +} + +func (p *Protocol) OpenInbound(ctx context.Context) error { + if p.consumer == nil { + return errors.New("the consumer client must be set") + } + if p.consumerTopics == nil { + return errors.New("the consumer topics must be set") + } + + p.consumerMux.Lock() + defer p.consumerMux.Unlock() + logger := cecontext.LoggerFrom(ctx) + + // Query committed offsets for each partition + if positions := TopicPartitionOffsetsFrom(ctx); positions != nil { + if err := p.consumer.Assign(positions); err != nil { + return err + } + } + + logger.Infof("Subscribing to topics: %v", p.consumerTopics) + err := p.consumer.SubscribeTopics(p.consumerTopics, p.consumerRebalanceCb) + if err != nil { + return err + } + + p.closerMux.Lock() + p.consumerCtx, p.consumerCancel = context.WithCancel(ctx) + defer p.consumerCancel() + p.closerMux.Unlock() + + defer func() { + if !p.consumer.IsClosed() { + logger.Infof("Closing consumer %v", p.consumerTopics) + if err = p.consumer.Close(); err != nil { + logger.Errorf("failed to close the consumer: %v", err) + } + } + close(p.consumerIncoming) + }() + + for { + select { + case <-p.consumerCtx.Done(): + return p.consumerCtx.Err() + default: + ev := p.consumer.Poll(p.consumerPollTimeout) + if ev == nil { + continue + } + switch e := ev.(type) { + case *kafka.Message: + p.consumerIncoming <- e + case kafka.Error: + // Errors should generally be considered informational, the client will try to automatically recover. + // But in here, we choose to terminate the application if all brokers are down. + logger.Infof("Error %v: %v", e.Code(), e) + if p.consumerErrorHandler != nil { + p.consumerErrorHandler(ctx, e) + } + if e.Code() == kafka.ErrAllBrokersDown { + logger.Error("All broker connections are down") + return e + } + } + } + } +} + +// Receive implements Receiver.Receive +func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { + select { + case m, ok := <-p.consumerIncoming: + if !ok { + return nil, io.EOF + } + msg := NewMessage(m) + return msg, nil + case <-ctx.Done(): + return nil, io.EOF + } +} + +// Close cleans up resources after use. Must be called to properly close underlying Kafka resources and avoid resource leaks +func (p *Protocol) Close(ctx context.Context) error { + p.closerMux.Lock() + defer p.closerMux.Unlock() + + if p.consumerCancel != nil { + p.consumerCancel() + } + + if p.producer != nil && !p.producer.IsClosed() { + p.producer.Close() + close(p.producerDeliveryChan) + } + + return nil +} diff --git a/protocol/kafka_confluent/v2/protocol_test.go b/protocol/kafka_confluent/v2/protocol_test.go new file mode 100644 index 000000000..0cc3e769f --- /dev/null +++ b/protocol/kafka_confluent/v2/protocol_test.go @@ -0,0 +1,69 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "context" + "testing" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/stretchr/testify/assert" +) + +func TestNewProtocol(t *testing.T) { + tests := []struct { + name string + ctx context.Context + options []Option + errorMessage string + }{ + { + name: "invalidated parameters", + options: nil, + errorMessage: "at least one of the following to initialize the protocol must be set: config, producer, or consumer", + }, + { + name: "Insufficient parameters", + options: []Option{ + WithConfigMap(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + })}, + errorMessage: "at least receiver or sender topic must be set", + }, + { + name: "Insufficient consumer parameters - group.id", + options: []Option{ + WithConfigMap(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + }), + WithReceiverTopics([]string{"topic1", "topic2"}), + }, + errorMessage: "Required property group.id not set", + }, + { + name: "Insufficient consumer parameters - configmap or consumer", + options: []Option{ + WithReceiverTopics([]string{"topic1", "topic2"}), + }, + errorMessage: "at least configmap or consumer must be set for the receiver topics: [topic1 topic2]", + }, + { + name: "Insufficient producer parameters", + options: []Option{ + WithSenderTopic("topic3"), + }, + errorMessage: "at least configmap or producer must be set for the sender topic: topic3", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := New(tt.options...) + if err != nil { + assert.Equal(t, tt.errorMessage, err.Error()) + } + }) + } +} diff --git a/protocol/kafka_confluent/v2/write_producer_message.go b/protocol/kafka_confluent/v2/write_producer_message.go new file mode 100644 index 000000000..e640cbded --- /dev/null +++ b/protocol/kafka_confluent/v2/write_producer_message.go @@ -0,0 +1,125 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "bytes" + "context" + "io" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/types" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +// extends the kafka.Message to support the interfaces for the converting it to binding.Message +type kafkaMessageWriter kafka.Message + +var ( + _ binding.StructuredWriter = (*kafkaMessageWriter)(nil) + _ binding.BinaryWriter = (*kafkaMessageWriter)(nil) +) + +// WriteProducerMessage fills the provided pubMessage with the message m. +// Using context you can tweak the encoding processing (more details on binding.Write documentation). +func WriteProducerMessage(ctx context.Context, in binding.Message, kafkaMsg *kafka.Message, + transformers ...binding.Transformer, +) error { + structuredWriter := (*kafkaMessageWriter)(kafkaMsg) + binaryWriter := (*kafkaMessageWriter)(kafkaMsg) + + _, err := binding.Write( + ctx, + in, + structuredWriter, + binaryWriter, + transformers..., + ) + return err +} + +func (b *kafkaMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error { + b.Headers = []kafka.Header{{ + Key: contentTypeKey, + Value: []byte(f.MediaType()), + }} + + var buf bytes.Buffer + _, err := io.Copy(&buf, event) + if err != nil { + return err + } + + b.Value = buf.Bytes() + return nil +} + +func (b *kafkaMessageWriter) Start(ctx context.Context) error { + b.Headers = []kafka.Header{} + return nil +} + +func (b *kafkaMessageWriter) End(ctx context.Context) error { + return nil +} + +func (b *kafkaMessageWriter) SetData(reader io.Reader) error { + buf, ok := reader.(*bytes.Buffer) + if !ok { + buf = new(bytes.Buffer) + _, err := io.Copy(buf, reader) + if err != nil { + return err + } + } + b.Value = buf.Bytes() + return nil +} + +func (b *kafkaMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + if attribute.Kind() == spec.DataContentType { + if value == nil { + b.removeProperty(contentTypeKey) + return nil + } + b.addProperty(contentTypeKey, value) + } else { + key := prefix + attribute.Name() + if value == nil { + b.removeProperty(key) + return nil + } + b.addProperty(key, value) + } + return nil +} + +func (b *kafkaMessageWriter) SetExtension(name string, value interface{}) error { + if value == nil { + b.removeProperty(prefix + name) + } + return b.addProperty(prefix+name, value) +} + +func (b *kafkaMessageWriter) removeProperty(key string) { + for i, v := range b.Headers { + if v.Key == key { + b.Headers = append(b.Headers[:i], b.Headers[i+1:]...) + break + } + } +} + +func (b *kafkaMessageWriter) addProperty(key string, value interface{}) error { + s, err := types.Format(value) + if err != nil { + return err + } + b.Headers = append(b.Headers, kafka.Header{Key: key, Value: []byte(s)}) + return nil +} diff --git a/protocol/kafka_confluent/v2/write_producer_message_test.go b/protocol/kafka_confluent/v2/write_producer_message_test.go new file mode 100644 index 000000000..eb4cd25bf --- /dev/null +++ b/protocol/kafka_confluent/v2/write_producer_message_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "context" + "strconv" + "testing" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/stretchr/testify/require" + + "github.com/cloudevents/sdk-go/v2/binding" + . "github.com/cloudevents/sdk-go/v2/binding/test" + "github.com/cloudevents/sdk-go/v2/event" + . "github.com/cloudevents/sdk-go/v2/test" +) + +func TestWriteProducerMessage(t *testing.T) { + tests := []struct { + name string + context context.Context + messageFactory func(e event.Event) binding.Message + expectedEncoding binding.Encoding + }{ + { + name: "Structured to Structured", + context: ctx, + messageFactory: func(e event.Event) binding.Message { + return MustCreateMockStructuredMessage(t, e) + }, + expectedEncoding: binding.EncodingStructured, + }, + { + name: "Binary to Binary", + context: ctx, + messageFactory: MustCreateMockBinaryMessage, + expectedEncoding: binding.EncodingBinary, + }, + } + EachEvent(t, Events(), func(t *testing.T, e event.Event) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := tt.context + topic := "test-topic" + kafkaMessage := &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + Partition: int32(0), + Offset: kafka.Offset(10), + }, + } + + eventIn := ConvertEventExtensionsToString(t, e.Clone()) + messageIn := tt.messageFactory(eventIn) + + err := WriteProducerMessage(ctx, messageIn, kafkaMessage) + require.NoError(t, err) + + messageOut := NewMessage(kafkaMessage) + require.Equal(t, tt.expectedEncoding, messageOut.ReadEncoding()) + + if tt.expectedEncoding == binding.EncodingBinary { + err = messageOut.ReadBinary(ctx, (*kafkaMessageWriter)(kafkaMessage)) + } + require.NoError(t, err) + + eventOut, err := binding.ToEvent(ctx, messageOut) + require.NoError(t, err) + if tt.expectedEncoding == binding.EncodingBinary { + eventIn.SetExtension(KafkaPartitionKey, strconv.FormatInt(int64(kafkaMessage.TopicPartition.Partition), 10)) + eventIn.SetExtension(KafkaOffsetKey, strconv.FormatInt(int64(kafkaMessage.TopicPartition.Offset), 10)) + eventIn.SetExtension(KafkaTopicKey, kafkaMessage.TopicPartition.Topic) + } + AssertEventEquals(t, eventIn, *eventOut) + }) + } + }) +} diff --git a/samples/kafka_confluent/README.md b/samples/kafka_confluent/README.md new file mode 100644 index 000000000..969cafb97 --- /dev/null +++ b/samples/kafka_confluent/README.md @@ -0,0 +1,9 @@ +# Confluent kafka samples + +To run the samples, you need a running Kafka cluster. + +To run a sample Kafka cluster using docker: + +``` +docker run --rm --net=host confluentinc/confluent-local +``` \ No newline at end of file diff --git a/samples/kafka_confluent/go.mod b/samples/kafka_confluent/go.mod new file mode 100644 index 000000000..17b2b4ea1 --- /dev/null +++ b/samples/kafka_confluent/go.mod @@ -0,0 +1,23 @@ +module github.com/cloudevents/sdk-go/samples/kafka_confluent + +go 1.18 + +replace github.com/cloudevents/sdk-go/v2 => ../../v2 + +replace github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 => ../../protocol/kafka_confluent/v2 + +require ( + github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 v2.0.0-00010101000000-000000000000 + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 +) + +require ( + github.com/google/uuid v1.3.0 // indirect + github.com/json-iterator/go v1.1.11 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.10.0 // indirect +) diff --git a/samples/kafka_confluent/go.sum b/samples/kafka_confluent/go.sum new file mode 100644 index 000000000..b0200c98f --- /dev/null +++ b/samples/kafka_confluent/go.sum @@ -0,0 +1,61 @@ +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= +github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= +github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= +github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA= +github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= +github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= +github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= +github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= +github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= +github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec= +github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= +google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/samples/kafka_confluent/receiver-assign/main.go b/samples/kafka_confluent/receiver-assign/main.go new file mode 100644 index 000000000..c1ce92610 --- /dev/null +++ b/samples/kafka_confluent/receiver-assign/main.go @@ -0,0 +1,60 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + + confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/client" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +var topic = "test-confluent-topic" + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + receiver, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + "group.id": "test-confluent-offset-id", + // "auto.offset.reset": "earliest", + "enable.auto.commit": "true", + }), confluent.WithReceiverTopics([]string{topic})) + if err != nil { + log.Fatalf("failed to create kafka protocol, %v", err) + } + defer receiver.Close(ctx) + + // Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order + c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + offsetToStart := []kafka.TopicPartition{ + {Topic: &topic, Partition: 0, Offset: 3}, + } + + log.Printf("will listen consuming topic %s\n", topic) + err = c.StartReceiver(confluent.WithTopicPartitionOffsets(ctx, offsetToStart), receive) + if err != nil { + log.Fatalf("failed to start receiver: %s", err) + } else { + log.Printf("receiver stopped\n") + } +} + +func receive(ctx context.Context, event cloudevents.Event) { + ext := event.Extensions() + + fmt.Printf("%s[%s:%s] \n", ext[confluent.KafkaTopicKey], + ext[confluent.KafkaPartitionKey], ext[confluent.KafkaOffsetKey]) +} diff --git a/samples/kafka_confluent/receiver/main.go b/samples/kafka_confluent/receiver/main.go new file mode 100644 index 000000000..1817dd2f1 --- /dev/null +++ b/samples/kafka_confluent/receiver/main.go @@ -0,0 +1,53 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + + confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/client" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +const topic = "test-confluent-topic" + +func main() { + ctx := context.Background() + + receiver, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + "group.id": "test-confluent-group-id", + "auto.offset.reset": "earliest", // only validated when the consumer group offset has saved before + "enable.auto.commit": "true", + }), confluent.WithReceiverTopics([]string{topic})) + + if err != nil { + log.Fatalf("failed to create receiver, %v", err) + } + defer receiver.Close(ctx) + + // Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order + c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + log.Printf("will listen consuming topic %s\n", topic) + err = c.StartReceiver(ctx, receive) + if err != nil { + log.Fatalf("failed to start receiver: %s", err) + } else { + log.Printf("receiver stopped\n") + } +} + +func receive(ctx context.Context, event cloudevents.Event) { + fmt.Printf("%s", event) +} diff --git a/samples/kafka_confluent/sender/main.go b/samples/kafka_confluent/sender/main.go new file mode 100644 index 000000000..bfebf816c --- /dev/null +++ b/samples/kafka_confluent/sender/main.go @@ -0,0 +1,56 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "log" + + confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + count = 10 + topic = "test-confluent-topic" +) + +func main() { + ctx := context.Background() + + sender, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + }), confluent.WithSenderTopic(topic)) + + defer sender.Close(ctx) + + c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + for i := 0; i < count; i++ { + e := cloudevents.NewEvent() + e.SetType("com.cloudevents.sample.sent") + e.SetSource("https://github.com/cloudevents/sdk-go/samples/kafka_confluent/sender") + _ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "id": i, + "message": "Hello, World!", + }) + + if result := c.Send( + // Set the producer message key + confluent.WithMessageKey(ctx, e.ID()), + e, + ); cloudevents.IsUndelivered(result) { + log.Printf("failed to send: %v", result) + } else { + log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) + } + } +} diff --git a/test/integration/go.mod b/test/integration/go.mod index 1fe9e1be4..492b65553 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -18,19 +18,23 @@ replace github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 => ../../protocol replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2 +replace github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 => ../../protocol/kafka_confluent/v2 + require ( github.com/Azure/go-amqp v0.17.0 github.com/IBM/sarama v1.40.1 github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.5.0 + github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.5.0 github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/nats/v2 v2.5.0 github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/stan/v2 v2.5.0 - github.com/cloudevents/sdk-go/v2 v2.14.0 + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 github.com/eclipse/paho.golang v0.12.0 github.com/google/go-cmp v0.6.0 - github.com/google/uuid v1.1.1 + github.com/google/uuid v1.3.0 github.com/nats-io/nats.go v1.31.0 github.com/nats-io/stan.go v0.10.4 github.com/stretchr/testify v1.8.4 diff --git a/test/integration/go.sum b/test/integration/go.sum index 084a0ea60..0976f57fb 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,17 +1,29 @@ github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4= github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/IBM/sarama v1.40.1 h1:lL01NNg/iBeigUbT+wpPysuTYW6roHo6kc1QrffRf0k= github.com/IBM/sarama v1.40.1/go.mod h1:+5OFwA5Du9I6QrznhaMHsuwWdWZNMjaBSIxEWEgKOYE= +github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= +github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I= github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= +github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA= +github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= +github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= @@ -26,15 +38,17 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= @@ -80,6 +94,7 @@ github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= @@ -90,11 +105,15 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= +github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= +github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU= @@ -109,6 +128,9 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec= +github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -125,6 +147,7 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -134,6 +157,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -141,6 +165,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= +go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -216,6 +241,9 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/test/integration/kafka_confluent/kafka_test.go b/test/integration/kafka_confluent/kafka_test.go new file mode 100644 index 000000000..ee3d77130 --- /dev/null +++ b/test/integration/kafka_confluent/kafka_test.go @@ -0,0 +1,138 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka_confluent + +import ( + "context" + "testing" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/test" +) + +const ( + TEST_GROUP_ID = "test_confluent_group_id" + BOOTSTRAP_SERVER = "localhost:9192" +) + +type receiveEvent struct { + event cloudevents.Event + err error +} + +func TestSendEvent(t *testing.T) { + test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + topicName := "test-ce-confluent-" + uuid.New().String() + // create the topic with kafka.AdminClient manually + admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": BOOTSTRAP_SERVER}) + require.NoError(t, err) + + _, err = admin.CreateTopics(ctx, []kafka.TopicSpecification{{ + Topic: topicName, + NumPartitions: 1, + ReplicationFactor: 1}}) + require.NoError(t, err) + + eventIn = test.ConvertEventExtensionsToString(t, eventIn) + + // start a cloudevents receiver client go to receive the event + eventChan := make(chan receiveEvent) + + receiverReady := make(chan bool) + go func() { + p, err := protocolFactory("", []string{topicName}) + if err != nil { + eventChan <- receiveEvent{err: err} + return + } + defer p.Close(ctx) + + client, err := cloudevents.NewClient(p) + if err != nil { + eventChan <- receiveEvent{err: err} + } + + receiverReady <- true + err = client.StartReceiver(ctx, func(event cloudevents.Event) { + eventChan <- receiveEvent{event: event} + }) + if err != nil { + eventChan <- receiveEvent{err: err} + } + }() + + <-receiverReady + + // start a cloudevents sender client go to send the event + p, err := protocolFactory(topicName, nil) + require.NoError(t, err) + defer p.Close(ctx) + + client, err := cloudevents.NewClient(p) + require.NoError(t, err) + res := client.Send(ctx, eventIn) + require.NoError(t, res) + + // check the received event + receivedEvent := <-eventChan + require.NoError(t, receivedEvent.err) + eventOut := test.ConvertEventExtensionsToString(t, receivedEvent.event) + + // test.AssertEventEquals(t, eventIn, receivedEvent.event) + err = test.AllOf( + test.HasExactlyAttributesEqualTo(eventIn.Context), + test.HasData(eventIn.Data()), + test.HasExtensionKeys([]string{confluent.KafkaPartitionKey, confluent.KafkaOffsetKey}), + test.HasExtension(confluent.KafkaTopicKey, topicName), + )(eventOut) + require.NoError(t, err) + }) +} + +// To start a local environment for testing: +// Option 1: Start it on port 9092 +// +// docker run --rm --net=host -p 9092:9092 confluentinc/confluent-local +// +// Option 2: Start it on port 9192 +// docker run --rm \ +// --name broker \ +// --hostname broker \ +// -p 9192:9192 \ +// -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://broker:29192,PLAINTEXT_HOST://localhost:9192' \ +// -e KAFKA_CONTROLLER_QUORUM_VOTERS='1@broker:29193' \ +// -e KAFKA_LISTENERS='PLAINTEXT://broker:29192,CONTROLLER://broker:29193,PLAINTEXT_HOST://0.0.0.0:9192' \ +// confluentinc/confluent-local:latest +func protocolFactory(sendTopic string, receiveTopic []string, +) (*confluent.Protocol, error) { + + var p *confluent.Protocol + var err error + if receiveTopic != nil { + p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ + "bootstrap.servers": BOOTSTRAP_SERVER, + "group.id": TEST_GROUP_ID, + "auto.offset.reset": "earliest", + "enable.auto.commit": "true", + }), confluent.WithReceiverTopics(receiveTopic)) + } + if sendTopic != "" { + p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ + "bootstrap.servers": BOOTSTRAP_SERVER, + }), confluent.WithSenderTopic(sendTopic)) + } + return p, err +}