Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar][receiver] add apache pulsar receiver #9792

Merged
merged 45 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8d40e58
add apache pulsar receiver
dao-jun May 8, 2022
cee2dab
add apache pulsar receiver
dao-jun May 8, 2022
184e2f2
fix imports
dao-jun May 9, 2022
53311d1
fix imports
dao-jun May 9, 2022
e66a7f4
fix imports
dao-jun May 9, 2022
016cb56
fix components exporter_tests versions.yaml and go mod.
dao-jun May 9, 2022
34e5404
review fix
dao-jun May 13, 2022
93424d9
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun May 16, 2022
40dc4e4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
176b7d0
rename serviceUrl -> endpoint
dao-jun May 30, 2022
b6a90a4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
5420dc4
Merge branch 'main' into dev/pulsar_recv
dao-jun May 30, 2022
b3b8895
fix cancel consume loop
dao-jun Jun 28, 2022
64fa981
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Jun 29, 2022
a1f4e15
fix go.mod
dao-jun Jun 29, 2022
7f4a5c9
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 21, 2022
373f81b
review & CI fix
dao-jun Jul 21, 2022
342a225
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 26, 2022
aa13f05
merge master into current
dao-jun Jul 26, 2022
2ff5328
add CODEOWNERS & remove -race opt
dao-jun Jul 26, 2022
da4c9ad
fix CI checks
dao-jun Jul 26, 2022
604b306
fix lint
dao-jun Jul 26, 2022
f924d8f
fix lint
dao-jun Jul 26, 2022
fbcbda2
review fix
dao-jun Jul 29, 2022
14eac12
review fix
dao-jun Jul 29, 2022
012a399
fix lint
dao-jun Jul 31, 2022
ccf563a
review fix
dao-jun Aug 2, 2022
d7f830d
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 2, 2022
a435e99
merge master into current & update dep
dao-jun Aug 2, 2022
7898838
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
dbfd404
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
70fbb68
merge master into current & update dep
dao-jun Aug 3, 2022
9f1a1f2
change WithxxxReceiverAndStabilityLevel -> WithxxxReceiver
dao-jun Aug 3, 2022
b94c375
review fix
dao-jun Aug 3, 2022
8727b16
review fix
dao-jun Aug 4, 2022
df08aaf
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 4, 2022
453743b
update deps
dao-jun Aug 4, 2022
6162a26
review fix
dao-jun Aug 5, 2022
9973d59
Merge branch 'main' into dev/pulsar_recv
dao-jun Aug 5, 2022
17807e6
update deps
dao-jun Aug 5, 2022
191396f
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 14, 2022
69cf68b
update deps & merge master
dao-jun Aug 14, 2022
c5e4168
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 17, 2022
77af47e
update deps & merge master
dao-jun Aug 17, 2022
3b2729a
update deps & merge master
dao-jun Aug 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review fix
  • Loading branch information
dao-jun committed May 13, 2022
commit 34e54043ad8f97498cd4c23ee801de786e6f9cde
40 changes: 22 additions & 18 deletions receiver/pulsarreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,32 @@ package pulsarreceiver

import (
"errors"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"go.opentelemetry.io/collector/config"
)

type Config struct {
config.ReceiverSettings `mapstructure:",squash"`
ServiceUrl string `mapstructure:"service_url"`
Topic string `mapstructure:"topic"`
Subscription string `mapstructure:"subscription"`
Encoding string `mapstructure:"encoding"`
ConsumerName string `mapstructure:"consumer_name"`
TLSTrustCertsFilePath string `mapstructure:"tls_trust_certs_file_path"`
Insecure bool `mapstructure:"insecure"`
AuthName string `mapstructure:"auth_name"`
AuthParam string `mapstructure:"auth_param"`
// Configure the service URL for the Pulsar service.
ServiceUrl string `mapstructure:"service_url"`
// The topic of pulsar to consume logs,metrics,traces. (default = "otlp_traces" for traces,
//"otlp_metrics" for metrics, "otlp_logs" for logs)
Topic string `mapstructure:"topic"`
// The Subscription that receiver will be consuming messages from (default "otlp_subscription")
Subscription string `mapstructure:"subscription"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
// Name specifies the consumer name.
ConsumerName string `mapstructure:"consumer_name"`
// Set the path to the trusted TLS certificate file
TLSTrustCertsFilePath string `mapstructure:"tls_trust_certs_file_path"`
// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
Insecure bool `mapstructure:"insecure"`
//AuthName to create an authentication
AuthName string `mapstructure:"auth_name"`
//AuthParam to create an authentication
AuthParam string `mapstructure:"auth_param"`
}

var _ config.Receiver = (*Config)(nil)
Expand All @@ -42,18 +51,13 @@ func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) ClientOptions() (pulsar.ClientOptions, error) {
duration := 20 * time.Second

func (cfg *Config) clientOptions() (pulsar.ClientOptions, error) {
url := cfg.ServiceUrl
if len(url) <= 0 {
url = defaultServiceUrl
}
options := pulsar.ClientOptions{
URL: url,
MaxConnectionsPerBroker: 2,
ConnectionTimeout: duration,
OperationTimeout: duration,
URL: url,
}

options.TLSAllowInsecureConnection = cfg.Insecure
Expand All @@ -72,7 +76,7 @@ func (cfg *Config) ClientOptions() (pulsar.ClientOptions, error) {
return options, nil
}

func (cfg *Config) ConsumerOptions() (pulsar.ConsumerOptions, error) {
func (cfg *Config) consumerOptions() (pulsar.ConsumerOptions, error) {
options := pulsar.ConsumerOptions{
Type: pulsar.Failover,
Topic: cfg.Topic,
Expand Down
15 changes: 13 additions & 2 deletions receiver/pulsarreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
const (
typeStr = "pulsar"
defaultEncoding = "otlp_proto"
defaultTopic = "otlp_spans"
defaultTraceTopic = "otlp_spans"
defaultMeticsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultConsumerName = ""
defaultSubscription = "otlp_subscription"
defaultServiceUrl = "pulsar://localhost:6650"
Expand Down Expand Up @@ -94,6 +96,9 @@ func (f *PulsarReceiverFactory) createTracesReceiver(
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {
c := cfg.(*Config)
if len(c.Topic) <= 0 {
c.Topic = defaultTraceTopic
}
r, err := newTracesReceiver(*c, set, f.tracesUnmarshalers, nextConsumer)
if err != nil {
return nil, err
Expand All @@ -108,6 +113,9 @@ func (f *PulsarReceiverFactory) createMetricsReceiver(
nextConsumer consumer.Metrics,
) (component.MetricsReceiver, error) {
c := cfg.(*Config)
if len(c.Topic) <= 0 {
c.Topic = defaultMeticsTopic
}
r, err := newMetricsReceiver(*c, set, f.metricsUnmarshalers, nextConsumer)
if err != nil {
return nil, err
Expand All @@ -122,6 +130,9 @@ func (f *PulsarReceiverFactory) createLogsReceiver(
nextConsumer consumer.Logs,
) (component.LogsReceiver, error) {
c := cfg.(*Config)
if len(c.Topic) <= 0 {
c.Topic = defaultLogsTopic
}
r, err := newLogsReceiver(*c, set, f.logsUnmarshalers, nextConsumer)
if err != nil {
return nil, err
Expand All @@ -132,7 +143,7 @@ func (f *PulsarReceiverFactory) createLogsReceiver(
func createDefaultConfig() config.Receiver {
return &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Topic: defaultTopic,
Topic: "",
Encoding: defaultEncoding,
ConsumerName: defaultConsumerName,
Subscription: defaultSubscription,
Expand Down
2 changes: 1 addition & 1 deletion receiver/pulsarreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig()
assert.Equal(t, &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Topic: defaultTopic,
Topic: "",
Encoding: defaultEncoding,
ConsumerName: defaultConsumerName,
Subscription: defaultSubscription,
Expand Down
66 changes: 42 additions & 24 deletions receiver/pulsarreceiver/pulsar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pulsarreceiver
import (
"context"
"errors"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -46,7 +47,7 @@ func newTracesReceiver(config Config, set component.ReceiverCreateSettings, unma
return nil, errUnrecognizedEncoding
}

options, err := config.ClientOptions()
options, err := config.clientOptions()
if err != nil {
return nil, err
}
Expand All @@ -55,7 +56,7 @@ func newTracesReceiver(config Config, set component.ReceiverCreateSettings, unma
return nil, err
}

consumerOptions, err := config.ConsumerOptions()
consumerOptions, err := config.consumerOptions()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -92,26 +93,31 @@ func consumerTracesLoop(ctx context.Context, c *PulsarTracesConsumer) error {
traceConsumer := c.tracesConsumer
close(c.ready)

subscribe, err := c.client.Subscribe(c.consumerOptions)
_consumer, err := c.client.Subscribe(c.consumerOptions)
if nil != err {
return err
}

c.consumer = subscribe
c.consumer = _consumer

for true {
message, err := subscribe.Receive(ctx)
message, err := _consumer.Receive(ctx)
if err != nil {
return err
if value, ok := err.(*pulsar.Error); ok && value.Result() == pulsar.AlreadyClosedError {
return err
} else {
time.Sleep(time.Second)
continue
}
}

traces, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
return err
c.settings.Logger.Error("unmarshaler message failed", zap.Error(err))
}

if err := traceConsumer.ConsumeTraces(context.Background(), traces); err != nil {
return err
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}
c.consumer.Ack(message)
}
Expand Down Expand Up @@ -145,7 +151,7 @@ func newMetricsReceiver(config Config, set component.ReceiverCreateSettings, unm
return nil, errUnrecognizedEncoding
}

options, err := config.ClientOptions()
options, err := config.clientOptions()
if err != nil {
return nil, err
}
Expand All @@ -154,7 +160,7 @@ func newMetricsReceiver(config Config, set component.ReceiverCreateSettings, unm
return nil, err
}

consumerOptions, err := config.ConsumerOptions()
consumerOptions, err := config.consumerOptions()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -190,24 +196,31 @@ func consumeMetricsLoop(ctx context.Context, c *PulsarMetricsConsumer) error {
unmarshaler := c.unmarshaler
close(c.ready)

subscribe, err := c.client.Subscribe(c.consumerOptions)
_consumer, err := c.client.Subscribe(c.consumerOptions)
if nil != err {
return err
}

c.consumer = subscribe
c.consumer = _consumer

for true {
message, err := subscribe.Receive(ctx)
message, err := _consumer.Receive(ctx)
if err != nil {
return err
if value, ok := err.(*pulsar.Error); ok && value.Result() == pulsar.AlreadyClosedError {
return err
} else {
time.Sleep(time.Second)
continue
}
}

metrics, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
return err
c.settings.Logger.Error("unmarshaler message failed", zap.Error(err))
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
}

if err := c.metricsConsumer.ConsumeMetrics(context.Background(), metrics); err != nil {
return err
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}

c.consumer.Ack(message)
Expand Down Expand Up @@ -242,7 +255,7 @@ func newLogsReceiver(config Config, set component.ReceiverCreateSettings, unmars
return nil, errUnrecognizedEncoding
}

options, err := config.ClientOptions()
options, err := config.clientOptions()
if err != nil {
return nil, err
}
Expand All @@ -251,7 +264,7 @@ func newLogsReceiver(config Config, set component.ReceiverCreateSettings, unmars
return nil, err
}

consumerOptions, err := config.ConsumerOptions()
consumerOptions, err := config.consumerOptions()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -288,27 +301,32 @@ func consumeLogsLoop(ctx context.Context, c *PulsarLogsConsumer) error {
unmarshaler := c.unmarshaler
close(c.ready)

subscribe, err := c.client.Subscribe(c.consumerOptions)
_consumer, err := c.client.Subscribe(c.consumerOptions)

if nil != err {
return err
}

c.consumer = subscribe
c.consumer = _consumer

for true {
message, err := subscribe.Receive(ctx)
message, err := _consumer.Receive(ctx)
if err != nil {
return err
if value, ok := err.(*pulsar.Error); ok && value.Result() == pulsar.AlreadyClosedError {
return err
} else {
time.Sleep(time.Second)
continue
}
}

logs, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
return err
c.settings.Logger.Error("unmarshaler message failed", zap.Error(err))
}

if err := c.logsConsumer.ConsumeLogs(context.Background(), logs); err != nil {
return err
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}

c.consumer.Ack(message)
Expand Down