Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar][receiver] add apache pulsar receiver #9792

Merged
merged 45 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8d40e58
add apache pulsar receiver
dao-jun May 8, 2022
cee2dab
add apache pulsar receiver
dao-jun May 8, 2022
184e2f2
fix imports
dao-jun May 9, 2022
53311d1
fix imports
dao-jun May 9, 2022
e66a7f4
fix imports
dao-jun May 9, 2022
016cb56
fix components exporter_tests versions.yaml and go mod.
dao-jun May 9, 2022
34e5404
review fix
dao-jun May 13, 2022
93424d9
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun May 16, 2022
40dc4e4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
176b7d0
rename serviceUrl -> endpoint
dao-jun May 30, 2022
b6a90a4
rename serviceUrl -> endpoint
dao-jun May 30, 2022
5420dc4
Merge branch 'main' into dev/pulsar_recv
dao-jun May 30, 2022
b3b8895
fix cancel consume loop
dao-jun Jun 28, 2022
64fa981
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Jun 29, 2022
a1f4e15
fix go.mod
dao-jun Jun 29, 2022
7f4a5c9
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 21, 2022
373f81b
review & CI fix
dao-jun Jul 21, 2022
342a225
Merge branch 'main' into dev/pulsar_recv
dao-jun Jul 26, 2022
aa13f05
merge master into current
dao-jun Jul 26, 2022
2ff5328
add CODEOWNERS & remove -race opt
dao-jun Jul 26, 2022
da4c9ad
fix CI checks
dao-jun Jul 26, 2022
604b306
fix lint
dao-jun Jul 26, 2022
f924d8f
fix lint
dao-jun Jul 26, 2022
fbcbda2
review fix
dao-jun Jul 29, 2022
14eac12
review fix
dao-jun Jul 29, 2022
012a399
fix lint
dao-jun Jul 31, 2022
ccf563a
review fix
dao-jun Aug 2, 2022
d7f830d
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 2, 2022
a435e99
merge master into current & update dep
dao-jun Aug 2, 2022
7898838
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
dbfd404
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 3, 2022
70fbb68
merge master into current & update dep
dao-jun Aug 3, 2022
9f1a1f2
change WithxxxReceiverAndStabilityLevel -> WithxxxReceiver
dao-jun Aug 3, 2022
b94c375
review fix
dao-jun Aug 3, 2022
8727b16
review fix
dao-jun Aug 4, 2022
df08aaf
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 4, 2022
453743b
update deps
dao-jun Aug 4, 2022
6162a26
review fix
dao-jun Aug 5, 2022
9973d59
Merge branch 'main' into dev/pulsar_recv
dao-jun Aug 5, 2022
17807e6
update deps
dao-jun Aug 5, 2022
191396f
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 14, 2022
69cf68b
update deps & merge master
dao-jun Aug 14, 2022
c5e4168
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-coll…
dao-jun Aug 17, 2022
77af47e
update deps & merge master
dao-jun Aug 17, 2022
3b2729a
update deps & merge master
dao-jun Aug 17, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ receiver/postgresqlreceiver/ @open-telemetry/collector-c
receiver/prometheusexecreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax
receiver/prometheusreceiver/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
receiver/rabbitmqreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @cpheps
receiver/pulsarreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax @tjiuming
receiver/receivercreator/ @open-telemetry/collector-contrib-approvers @jrcamp
receiver/redisreceiver/ @open-telemetry/collector-contrib-approvers @pmcollins @dmitryax
receiver/riakreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @armstrmi
Expand Down
3 changes: 3 additions & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver v0.57.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusexecreceiver v0.57.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.57.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver v0.0.0-00010101000000-000000000000 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver v0.57.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator v0.57.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver v0.57.2 // indirect
Expand Down Expand Up @@ -895,6 +896,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prome

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => ../../receiver/prometheusreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver => ../../receiver/pulsarreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver => ../../receiver/rabbitmqreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator => ../../receiver/receivercreator
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver v0.57.2
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusexecreceiver v0.57.2
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.57.2
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver v0.57.2
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator v0.57.2
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver v0.57.2
Expand Down Expand Up @@ -899,6 +900,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prome

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => ./receiver/prometheusreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver => ./receiver/pulsarreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver => ./receiver/rabbitmqreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator => ./receiver/receivercreator
Expand Down
2 changes: 2 additions & 0 deletions internal/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusexecreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver"
Expand Down Expand Up @@ -246,6 +247,7 @@ func Components() (component.Factories, error) {
postgresqlreceiver.NewFactory(),
prometheusexecreceiver.NewFactory(),
prometheusreceiver.NewFactory(),
pulsarreceiver.NewFactory(),
rabbitmqreceiver.NewFactory(),
receivercreator.NewFactory(),
redisreceiver.NewFactory(),
Expand Down
4 changes: 4 additions & 0 deletions internal/components/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func TestDefaultReceivers(t *testing.T) {
receiver: "prometheus_exec",
skipLifecyle: true, // Requires running a subproccess that can not be easily set across platforms
},
{
receiver: "pulsar",
skipLifecyle: true, // TODO It requires a running pulsar instance to start successfully.
},
{
receiver: "rabbitmq",
},
Expand Down
6 changes: 6 additions & 0 deletions receiver/pulsarreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include ../../Makefile.Common

# Remove "-race" from the default set of test arguments.
# exporter/pulsarexporter tests are failing with the -race check.
# See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/12734
GOTEST_OPT = -v -timeout 300s --tags=$(GO_BUILD_TAGS)
68 changes: 68 additions & 0 deletions receiver/pulsarreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Pulsar Receiver

| Status | |
| ------------------------ |-----------------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs, metrics |
| Distributions | [contrib] |

Pulsar receiver receives logs, metrics, and traces from Pulsar.

Supported pipeline types: logs, metrics, traces

## Getting Started

The following settings can be optionally configured:
- `endpoint` (default = pulsar://localhost:6650): The url of pulsar cluster.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the pulsar topic to consume from.
- `encoding` (default = otlp_proto): The encoding of the payload sent to pulsar. Available encodings:
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
- `consumer_name`: specifies the consumer name.
- `auth`
- `tls`
- `cert_file`:
- `key_file`:
- `token`
- `token`
- `oauth2`
- `issuer_url`:
- `client_id`:
- `audience`:
- `athenz`
- `provider_domain`:
- `tenant_domain`:
- `tenant_service`:
- `private_key`:
- `key_id`:
- `principal_header`:
- `zts_url`:
- `subscription` (default = otlp_subscription): the subscription name of consumer.
- `tls_trust_certs_file_path`: path to the CA cert. For a client this verifies the server certificate. Should
only be used if `insecure` is set to true.
- `tls_allow_insecure_connection`: configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)


Example configuration:
```yaml
receivers:
pulsar:
endpoint: pulsar://localhost:6650
topic: otlp-spans
subscription: otlp_spans_sub
consumer_name: otlp_spans_sub_1
encoding: otlp_proto
auth:
tls:
cert_file: cert.pem
key_file: key.pem
tls_allow_insecure_connection: false
tls_trust_certs_file_path: ca.pem
```

[alpha]:https://github.com/open-telemetry/opentelemetry-collector#alpha
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
148 changes: 148 additions & 0 deletions receiver/pulsarreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver"

import (
"errors"

"github.com/apache/pulsar-client-go/pulsar"
"go.opentelemetry.io/collector/config"
)

type Config struct {
config.ReceiverSettings `mapstructure:",squash"`
// Configure the service URL for the Pulsar service.
Endpoint string `mapstructure:"endpoint"`
// The topic of pulsar to consume logs,metrics,traces. (default = "otlp_traces" for traces,
// "otlp_metrics" for metrics, "otlp_logs" for logs)
Topic string `mapstructure:"topic"`
// The Subscription that receiver will be consuming messages from (default "otlp_subscription")
Subscription string `mapstructure:"subscription"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
// Name specifies the consumer name.
ConsumerName string `mapstructure:"consumer_name"`
// Set the path to the trusted TLS certificate file
TLSTrustCertsFilePath string `mapstructure:"tls_trust_certs_file_path"`
// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
TLSAllowInsecureConnection bool `mapstructure:"tls_allow_insecure_connection"`
Authentication Authentication `mapstructure:"auth"`
}

type Authentication struct {
TLS *TLS `mapstructure:"tls"`
Token *Token `mapstructure:"Token"`
Athenz *Athenz `mapstructure:"athenz"`
OAuth2 *OAuth2 `mapstructure:"oauth2"`
}

type TLS struct {
CertFile string `mapstructure:"cert_file"`
KeyFile string `mapstructure:"key_file"`
}

type Token struct {
Token string `mapstructure:"Token"`
}

type Athenz struct {
ProviderDomain string `mapstructure:"provider_domain"`
TenantDomain string `mapstructure:"tenant_domain"`
TenantService string `mapstructure:"tenant_service"`
PrivateKey string `mapstructure:"private_key"`
KeyID string `mapstructure:"key_id"`
PrincipalHeader string `mapstructure:"principal_header"`
ZtsURL string `mapstructure:"zts_url"`
}

type OAuth2 struct {
IssuerURL string `mapstructure:"issuer_url"`
ClientID string `mapstructure:"client_id"`
Audience string `mapstructure:"audience"`
}

var _ config.Receiver = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) auth() pulsar.Authentication {
authentication := cfg.Authentication
if authentication.TLS != nil {
return pulsar.NewAuthenticationTLS(authentication.TLS.CertFile, authentication.TLS.KeyFile)
}
if authentication.Token != nil {
return pulsar.NewAuthenticationToken(authentication.Token.Token)
}
if authentication.OAuth2 != nil {
return pulsar.NewAuthenticationOAuth2(map[string]string{
"issuerUrl": authentication.OAuth2.IssuerURL,
"clientId": authentication.OAuth2.ClientID,
"audience": authentication.OAuth2.Audience,
})
}
if authentication.Athenz != nil {
return pulsar.NewAuthenticationAthenz(map[string]string{
"providerDomain": authentication.Athenz.ProviderDomain,
"tenantDomain": authentication.Athenz.TenantDomain,
"tenantService": authentication.Athenz.TenantService,
"privateKey": authentication.Athenz.PrivateKey,
"keyId": authentication.Athenz.KeyID,
"principalHeader": authentication.Athenz.PrincipalHeader,
"ztsUrl": authentication.Athenz.ZtsURL,
})
}

return nil
}
dmitryax marked this conversation as resolved.
Show resolved Hide resolved

func (cfg *Config) clientOptions() pulsar.ClientOptions {
url := cfg.Endpoint
if len(url) == 0 {
url = defaultServiceURL
}
options := pulsar.ClientOptions{
URL: url,
}

options.TLSAllowInsecureConnection = cfg.TLSAllowInsecureConnection
if len(cfg.TLSTrustCertsFilePath) > 0 {
options.TLSTrustCertsFilePath = cfg.TLSTrustCertsFilePath
}

auth := cfg.auth()
options.Authentication = auth
return options
}

func (cfg *Config) consumerOptions() (pulsar.ConsumerOptions, error) {
options := pulsar.ConsumerOptions{
Type: pulsar.Failover,
Topic: cfg.Topic,
SubscriptionName: cfg.Subscription,
}

if len(cfg.ConsumerName) > 0 {
options.Name = cfg.ConsumerName
}

if options.SubscriptionName == "" || options.Topic == "" {
return options, errors.New("topic and subscription is required")
}

return options, nil
}
50 changes: 50 additions & 0 deletions receiver/pulsarreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver"

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yml"), factories)
require.NoError(t, err)
require.Equal(t, 1, len(cfg.Receivers))

r := cfg.Receivers[config.NewComponentID(typeStr)].(*Config)

assert.Equal(t, &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Topic: "otel-pulsar",
Endpoint: "pulsar://localhost:6500",
ConsumerName: "otel-collector",
Subscription: "otel-collector",
Encoding: defaultEncoding,
TLSTrustCertsFilePath: "ca.pem",
Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}},
}, r)
}
Loading