Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar][receiver] add apache pulsar receiver #9792

Merged
merged 45 commits into from
Aug 17, 2022
Merged
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8d40e58
add apache pulsar receiver
dao-jun May 8, 2022
cee2dab
add apache pulsar receiver
dao-jun May 8, 2022
184e2f2
fix imports
dao-jun May 9, 2022
53311d1
fix imports
dao-jun May 9, 2022
e66a7f4
fix imports
dao-jun May 9, 2022
016cb56
fix components exporter_tests versions.yaml and go mod.
dao-jun May 9, 2022
34e5404
review fix
dao-jun May 13, 2022
93424d9
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun May 16, 2022
40dc4e4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
176b7d0
rename serviceUrl -> endpoint
dao-jun May 30, 2022
b6a90a4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
5420dc4
Merge branch 'main' into dev/pulsar_recv
dao-jun May 30, 2022
b3b8895
fix cancel consume loop
dao-jun Jun 28, 2022
64fa981
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Jun 29, 2022
a1f4e15
fix go.mod
dao-jun Jun 29, 2022
7f4a5c9
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 21, 2022
373f81b
review & CI fix
dao-jun Jul 21, 2022
342a225
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 26, 2022
aa13f05
merge master into current
dao-jun Jul 26, 2022
2ff5328
add CODEOWNERS & remove -race opt
dao-jun Jul 26, 2022
da4c9ad
fix CI checks
dao-jun Jul 26, 2022
604b306
fix lint
dao-jun Jul 26, 2022
f924d8f
fix lint
dao-jun Jul 26, 2022
fbcbda2
review fix
dao-jun Jul 29, 2022
14eac12
review fix
dao-jun Jul 29, 2022
012a399
fix lint
dao-jun Jul 31, 2022
ccf563a
review fix
dao-jun Aug 2, 2022
d7f830d
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 2, 2022
a435e99
merge master into current & update dep
dao-jun Aug 2, 2022
7898838
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
dbfd404
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
70fbb68
merge master into current & update dep
dao-jun Aug 3, 2022
9f1a1f2
change WithxxxReceiverAndStabilityLevel -> WithxxxReceiver
dao-jun Aug 3, 2022
b94c375
review fix
dao-jun Aug 3, 2022
8727b16
review fix
dao-jun Aug 4, 2022
df08aaf
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 4, 2022
453743b
update deps
dao-jun Aug 4, 2022
6162a26
review fix
dao-jun Aug 5, 2022
9973d59
Merge branch 'main' into dev/pulsar_recv
dao-jun Aug 5, 2022
17807e6
update deps
dao-jun Aug 5, 2022
191396f
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 14, 2022
69cf68b
update deps & merge master
dao-jun Aug 14, 2022
c5e4168
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 17, 2022
77af47e
update deps & merge master
dao-jun Aug 17, 2022
3b2729a
update deps & merge master
dao-jun Aug 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix cancel consume loop
  • Loading branch information
dao-jun committed Jun 28, 2022
commit b3b88957c8b21cdfd7084c702f1aeb77854ef268
28 changes: 19 additions & 9 deletions receiver/pulsarreceiver/pulsar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,13 @@ func consumerTracesLoop(ctx context.Context, c *PulsarTracesConsumer) error {
if err != nil {
if value, ok := err.(*pulsar.Error); ok && value.Result() == pulsar.AlreadyClosedError {
return err
} else {
time.Sleep(time.Second)
continue
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume traces loop")
return err
}
time.Sleep(time.Second)
continue
}

traces, err := unmarshaler.Unmarshal(message.Payload())
Expand Down Expand Up @@ -208,10 +211,14 @@ func consumeMetricsLoop(ctx context.Context, c *PulsarMetricsConsumer) error {
if err != nil {
if value, ok := err.(*pulsar.Error); ok && value.Result() == pulsar.AlreadyClosedError {
return err
} else {
time.Sleep(time.Second)
continue
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume metrics loop")
return err
}

time.Sleep(time.Second)
continue
}

metrics, err := unmarshaler.Unmarshal(message.Payload())
Expand Down Expand Up @@ -314,10 +321,13 @@ func consumeLogsLoop(ctx context.Context, c *PulsarLogsConsumer) error {
if err != nil {
if value, ok := err.(*pulsar.Error); ok && value.Result() == pulsar.AlreadyClosedError {
return err
} else {
time.Sleep(time.Second)
continue
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume traces loop canceled")
return err
}
time.Sleep(time.Second)
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
continue
}

logs, err := unmarshaler.Unmarshal(message.Payload())
Expand Down