Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar][receiver] add apache pulsar receiver #9792

Merged
merged 45 commits into from
Aug 17, 2022
Merged
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8d40e58
add apache pulsar receiver
dao-jun May 8, 2022
cee2dab
add apache pulsar receiver
dao-jun May 8, 2022
184e2f2
fix imports
dao-jun May 9, 2022
53311d1
fix imports
dao-jun May 9, 2022
e66a7f4
fix imports
dao-jun May 9, 2022
016cb56
fix components exporter_tests versions.yaml and go mod.
dao-jun May 9, 2022
34e5404
review fix
dao-jun May 13, 2022
93424d9
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun May 16, 2022
40dc4e4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
176b7d0
rename serviceUrl -> endpoint
dao-jun May 30, 2022
b6a90a4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
5420dc4
Merge branch 'main' into dev/pulsar_recv
dao-jun May 30, 2022
b3b8895
fix cancel consume loop
dao-jun Jun 28, 2022
64fa981
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Jun 29, 2022
a1f4e15
fix go.mod
dao-jun Jun 29, 2022
7f4a5c9
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 21, 2022
373f81b
review & CI fix
dao-jun Jul 21, 2022
342a225
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 26, 2022
aa13f05
merge master into current
dao-jun Jul 26, 2022
2ff5328
add CODEOWNERS & remove -race opt
dao-jun Jul 26, 2022
da4c9ad
fix CI checks
dao-jun Jul 26, 2022
604b306
fix lint
dao-jun Jul 26, 2022
f924d8f
fix lint
dao-jun Jul 26, 2022
fbcbda2
review fix
dao-jun Jul 29, 2022
14eac12
review fix
dao-jun Jul 29, 2022
012a399
fix lint
dao-jun Jul 31, 2022
ccf563a
review fix
dao-jun Aug 2, 2022
d7f830d
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 2, 2022
a435e99
merge master into current & update dep
dao-jun Aug 2, 2022
7898838
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
dbfd404
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
70fbb68
merge master into current & update dep
dao-jun Aug 3, 2022
9f1a1f2
change WithxxxReceiverAndStabilityLevel -> WithxxxReceiver
dao-jun Aug 3, 2022
b94c375
review fix
dao-jun Aug 3, 2022
8727b16
review fix
dao-jun Aug 4, 2022
df08aaf
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 4, 2022
453743b
update deps
dao-jun Aug 4, 2022
6162a26
review fix
dao-jun Aug 5, 2022
9973d59
Merge branch 'main' into dev/pulsar_recv
dao-jun Aug 5, 2022
17807e6
update deps
dao-jun Aug 5, 2022
191396f
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 14, 2022
69cf68b
update deps & merge master
dao-jun Aug 14, 2022
c5e4168
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 17, 2022
77af47e
update deps & merge master
dao-jun Aug 17, 2022
3b2729a
update deps & merge master
dao-jun Aug 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review fix
  • Loading branch information
dao-jun committed Aug 3, 2022
commit b94c375e4449657311d56bf95aeb88c8560453a0
97 changes: 38 additions & 59 deletions receiver/pulsarreceiver/pulsar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type pulsarTracesConsumer struct {
client pulsar.Client
cancel context.CancelFunc
consumer pulsar.Consumer
ready chan error
unmarshaler TracesUnmarshaler
settings component.ReceiverCreateSettings
consumerOptions pulsar.ConsumerOptions
Expand Down Expand Up @@ -68,7 +67,6 @@ func newTracesReceiver(config Config, set component.ReceiverCreateSettings, unma
unmarshaler: unmarshaler,
settings: set,
client: client,
ready: make(chan error),
consumerOptions: consumerOptions,
}, nil
}
Expand All @@ -77,32 +75,26 @@ func (c *pulsarTracesConsumer) Start(context.Context, component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

go func() {
err := consumerTracesLoop(ctx, c)
if err != nil {
c.settings.Logger.Error("Consume tracesConsumer failed", zap.Error(err))
}
}()
_consumer, err := c.client.Subscribe(c.consumerOptions)
if err == nil {
c.consumer = _consumer
go func() {
err := consumerTracesLoop(ctx, c)
if err != nil {
c.settings.Logger.Error("Consume tracesConsumer failed", zap.Error(err))
}
}()
}

err := <-c.ready
close(c.ready)
return err
}

func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error {
unmarshaler := c.unmarshaler
traceConsumer := c.tracesConsumer

_consumer, err := c.client.Subscribe(c.consumerOptions)
c.ready <- err
if nil != err {
return err
}

c.consumer = _consumer

for {
message, err := _consumer.Receive(ctx)
message, err := c.consumer.Receive(ctx)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
return err
Expand Down Expand Up @@ -141,7 +133,6 @@ type pulsarMetricsConsumer struct {
topic string
client pulsar.Client
consumer pulsar.Consumer
ready chan error
cancel context.CancelFunc
settings component.ReceiverCreateSettings
consumerOptions pulsar.ConsumerOptions
Expand Down Expand Up @@ -171,7 +162,6 @@ func newMetricsReceiver(config Config, set component.ReceiverCreateSettings, unm
unmarshaler: unmarshaler,
settings: set,
client: client,
ready: make(chan error),
consumerOptions: consumerOptions,
}, nil
}
Expand All @@ -180,31 +170,27 @@ func (c *pulsarMetricsConsumer) Start(context.Context, component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

go func() {
err := consumeMetricsLoop(ctx, c)
if err != nil {
c.settings.Logger.Error("consume metrics loop occurs an error", zap.Error(err))
}
}()
_consumer, err := c.client.Subscribe(c.consumerOptions)
if err == nil {
c.consumer = _consumer

go func() {
err := consumeMetricsLoop(ctx, c)
if err != nil {
c.settings.Logger.Error("consume metrics loop occurs an error", zap.Error(err))
}
}()
}

err := <-c.ready
close(c.ready)
return err
}

func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error {
unmarshaler := c.unmarshaler

_consumer, err := c.client.Subscribe(c.consumerOptions)
c.ready <- err
if nil != err {
return err
}

c.consumer = _consumer
metricsConsumer := c.metricsConsumer

for {
message, err := _consumer.Receive(ctx)
message, err := c.consumer.Receive(ctx)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
return err
Expand All @@ -223,7 +209,7 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error {
c.settings.Logger.Error("unmarshaler message failed", zap.Error(err))
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
}

if err := c.metricsConsumer.ConsumeMetrics(context.Background(), metrics); err != nil {
if err := metricsConsumer.ConsumeMetrics(context.Background(), metrics); err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}

Expand All @@ -245,7 +231,6 @@ type pulsarLogsConsumer struct {
topic string
client pulsar.Client
consumer pulsar.Consumer
ready chan error
cancel context.CancelFunc
settings component.ReceiverCreateSettings
consumerOptions pulsar.ConsumerOptions
Expand Down Expand Up @@ -276,7 +261,6 @@ func newLogsReceiver(config Config, set component.ReceiverCreateSettings, unmars
unmarshaler: unmarshaler,
settings: set,
client: client,
ready: make(chan error),
consumerOptions: consumerOptions,
}, nil
}
Expand All @@ -285,31 +269,26 @@ func (c *pulsarLogsConsumer) Start(context.Context, component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

go func() {
err := consumeLogsLoop(ctx, c)
if err != nil {
c.settings.Logger.Error("consume logs loop occurs an error")
}
}()
_consumer, err := c.client.Subscribe(c.consumerOptions)
if err == nil {
c.consumer = _consumer
go func() {
err := consumeLogsLoop(ctx, c)
if err != nil {
c.settings.Logger.Error("consume logs loop occurs an error")
}
}()
}

err := <-c.ready
close(c.ready)
return err
}

func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error {
unmarshaler := c.unmarshaler

_consumer, err := c.client.Subscribe(c.consumerOptions)
c.ready <- err
if nil != err {
return err
}

c.consumer = _consumer
logsConsumer := c.logsConsumer

for {
message, err := _consumer.Receive(ctx)
message, err := c.consumer.Receive(ctx)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
return err
Expand All @@ -327,7 +306,7 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error {
c.settings.Logger.Error("unmarshaler message failed", zap.Error(err))
}

if err := c.logsConsumer.ConsumeLogs(context.Background(), logs); err != nil {
if err := logsConsumer.ConsumeLogs(context.Background(), logs); err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}

Expand Down