From 1792ca473d8cb731cf3b9d16da27a205a62fb1f8 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Tue, 22 Mar 2022 20:09:44 +0000 Subject: [PATCH] Remove pulsar components (for now) --- CHANGELOG.md | 1 + internal/bloblang/query/methods_strings.go | 6 +- internal/cli/list.go | 4 +- internal/impl/pulsar/auth/config.go | 98 --------- internal/impl/pulsar/auth/docs.go | 19 -- internal/impl/pulsar/auth_field.go | 135 +++++++++++++ internal/impl/pulsar/input.go | 219 ++++++++++----------- internal/impl/pulsar/integration_test.go | 3 +- internal/impl/pulsar/logger.go | 6 +- internal/impl/pulsar/output.go | 205 +++++++++---------- internal/old/input/constructor.go | 3 - internal/old/input/pulsar.go | 25 --- internal/old/output/constructor.go | 3 - internal/old/output/pulsar.go | 25 --- public/components/all/package.go | 1 - public/components/pulsar/package.go | 3 + website/docs/components/inputs/pulsar.md | 218 -------------------- website/docs/components/outputs/pulsar.md | 208 ------------------- website/docs/guides/migration/v4.md | 22 +++ 19 files changed, 360 insertions(+), 844 deletions(-) delete mode 100644 internal/impl/pulsar/auth/config.go delete mode 100644 internal/impl/pulsar/auth/docs.go create mode 100644 internal/impl/pulsar/auth_field.go delete mode 100644 internal/old/input/pulsar.go delete mode 100644 internal/old/output/pulsar.go create mode 100644 public/components/pulsar/package.go delete mode 100644 website/docs/components/inputs/pulsar.md delete mode 100644 website/docs/components/outputs/pulsar.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 376c0440a1..c6c398cee6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ This is a major version release, for more information and guidance on how to mig ### Changed - All components, features and configuration fields that were marked as deprecated have been removed. +- The `pulsar` input and output are no longer included in the default Benthos builds. - The field `pipeline.threads` field now defaults to `-1`, which automatically matches the host machine CPU count. - Old style interpolation functions (`${!json:foo,1}`) are removed in favour of the newer Bloblang syntax (`${! json("foo") }`). - The Bloblang functions `meta`, `root_meta`, `error` and `env` now return `null` when the target value does not exist. diff --git a/internal/bloblang/query/methods_strings.go b/internal/bloblang/query/methods_strings.go index b145691f4c..871917c12c 100644 --- a/internal/bloblang/query/methods_strings.go +++ b/internal/bloblang/query/methods_strings.go @@ -30,6 +30,8 @@ import ( "github.com/microcosm-cc/bluemonday" "github.com/rickb777/date/period" "github.com/tilinna/z85" + "golang.org/x/text/cases" + "golang.org/x/text/language" "gopkg.in/yaml.v3" "github.com/benthosdev/benthos/v4/internal/impl/xml" @@ -72,9 +74,9 @@ var _ = registerSimpleMethod( return func(v interface{}, ctx FunctionContext) (interface{}, error) { switch t := v.(type) { case string: - return strings.Title(t), nil + return cases.Title(language.English).String(t), nil case []byte: - return bytes.Title(t), nil + return cases.Title(language.English).Bytes(t), nil } return nil, NewTypeError(v, ValueString) }, nil diff --git a/internal/cli/list.go b/internal/cli/list.go index e6c99f1f83..d3868dfcf7 100644 --- a/internal/cli/list.go +++ b/internal/cli/list.go @@ -7,6 +7,8 @@ import ( "strings" "github.com/urfave/cli/v2" + "golang.org/x/text/cases" + "golang.org/x/text/language" "github.com/benthosdev/benthos/v4/internal/config/schema" ) @@ -69,7 +71,7 @@ func listComponents(c *cli.Context) { fmt.Println("") } i++ - title := strings.Title(strings.ReplaceAll(k, "-", " ")) + title := cases.Title(language.English).String(strings.ReplaceAll(k, "-", " ")) fmt.Printf("%v:\n", title) for _, t := range flat[k] { fmt.Printf(" - %v\n", t) diff --git a/internal/impl/pulsar/auth/config.go b/internal/impl/pulsar/auth/config.go deleted file mode 100644 index 86c77e1084..0000000000 --- a/internal/impl/pulsar/auth/config.go +++ /dev/null @@ -1,98 +0,0 @@ -package auth - -import ( - "errors" -) - -// Config contains configuration params for Pulsar authentication. -type Config struct { - OAuth2 OAuth2Config `json:"oauth2" yaml:"oauth2"` - Token TokenConfig `json:"token" yaml:"token"` -} - -// OAuth2Config contains configuration params for Pulsar OAuth2 authentication. -type OAuth2Config struct { - Enabled bool `json:"enabled" yaml:"enabled"` - Audience string `json:"audience" yaml:"audience"` - IssuerURL string `json:"issuer_url" yaml:"issuer_url"` - PrivateKeyFile string `json:"private_key_file" yaml:"private_key_file"` -} - -// TokenConfig contains configuration params for Pulsar Token authentication. -type TokenConfig struct { - Enabled bool `json:"enabled" yaml:"enabled"` - Token string `json:"token" yaml:"token"` -} - -// New creates a new Config instance. -func New() Config { - return Config{ - OAuth2: NewOAuth(), - Token: NewToken(), - } -} - -// NewOAuth creates a new OAuth2Config instance. -func NewOAuth() OAuth2Config { - return OAuth2Config{ - Enabled: false, - PrivateKeyFile: "", - Audience: "", - IssuerURL: "", - } -} - -// NewToken creates a new TokenConfig instance. -func NewToken() TokenConfig { - return TokenConfig{ - Enabled: false, - Token: "", - } -} - -// Validate checks whether Config is valid. -func (c *Config) Validate() error { - if c.OAuth2.Enabled && c.Token.Enabled { - return errors.New("only one auth method can be enabled at once") - } - if c.OAuth2.Enabled { - return c.OAuth2.Validate() - } - if c.Token.Enabled { - return c.Token.Validate() - } - return nil -} - -// Validate checks whether OAuth2Config is valid. -func (c *OAuth2Config) Validate() error { - if c.Audience == "" { - return errors.New("oauth2 audience is empty") - } - if c.IssuerURL == "" { - return errors.New("oauth2 issuer URL is empty") - } - if c.PrivateKeyFile == "" { - return errors.New("oauth2 private key file is empty") - } - return nil -} - -// ToMap returns OAuth2Config as a map representing OAuth2 client credentails. -func (c *OAuth2Config) ToMap() map[string]string { - // Pulsar docs: https://pulsar.apache.org/docs/en/2.8.0/security-oauth2/#go-client - return map[string]string{ - "type": "client_credentials", - "issuerUrl": c.IssuerURL, - "audience": c.Audience, - "privateKey": c.PrivateKeyFile, - } -} - -// Validate checks whether TokenConfig is valid. -func (c *TokenConfig) Validate() error { - if c.Token == "" { - return errors.New("token is empty") - } - return nil -} diff --git a/internal/impl/pulsar/auth/docs.go b/internal/impl/pulsar/auth/docs.go deleted file mode 100644 index bc9f1f73d6..0000000000 --- a/internal/impl/pulsar/auth/docs.go +++ /dev/null @@ -1,19 +0,0 @@ -package auth - -import "github.com/benthosdev/benthos/v4/internal/docs" - -// FieldSpec returns documentation authentication specs for Pulsar components -func FieldSpec() docs.FieldSpec { - return docs.FieldAdvanced("auth", "Optional configuration of Pulsar authentication methods.").WithChildren( - docs.FieldAdvanced("oauth2", "Parameters for Pulsar OAuth2 authentication.").WithChildren( - docs.FieldBool("enabled", "Whether OAuth2 is enabled.", true), - docs.FieldString("audience", "OAuth2 audience."), - docs.FieldString("issuer_url", "OAuth2 issuer URL."), - docs.FieldString("private_key_file", "File containing the private key."), - ), - docs.FieldAdvanced("token", "Parameters for Pulsar Token authentication.").WithChildren( - docs.FieldBool("enabled", "Whether Token Auth is enabled.", true), - docs.FieldString("token", "Actual base64 encoded token."), - ), - ).AtVersion("3.60.0") -} diff --git a/internal/impl/pulsar/auth_field.go b/internal/impl/pulsar/auth_field.go new file mode 100644 index 0000000000..1710946970 --- /dev/null +++ b/internal/impl/pulsar/auth_field.go @@ -0,0 +1,135 @@ +package pulsar + +import ( + "errors" + + "github.com/benthosdev/benthos/v4/public/service" +) + +func authField() *service.ConfigField { + return service.NewObjectField("auth", + service.NewObjectField("oauth2", + service.NewBoolField("enabled"). + Description("Whether OAuth2 is enabled."). + Default(false), + service.NewStringField("audience"). + Description("OAuth2 audience."). + Default(""), + service.NewStringField("issuer_url"). + Description("OAuth2 issuer URL."). + Default(""), + service.NewStringField("private_key_file"). + Description("The path to a file containing a private key."). + Default(""), + ).Description("Parameters for Pulsar OAuth2 authentication."). + Optional(), + service.NewObjectField("token", + service.NewBoolField("enabled"). + Description("Whether Token Auth is enabled."). + Default(false), + service.NewStringField("token"). + Description("Actual base64 encoded token."). + Default(""), + ).Description("Parameters for Pulsar Token authentication."). + Optional(), + ).Description("Optional configuration of Pulsar authentication methods."). + Version("3.60.0"). + Advanced(). + Optional() +} + +type authConfig struct { + OAuth2 oAuth2Config + Token tokenConfig +} + +type oAuth2Config struct { + Enabled bool + Audience string + IssuerURL string + PrivateKeyFile string +} + +type tokenConfig struct { + Enabled bool + Token string +} + +func authFromParsed(p *service.ParsedConfig) (c authConfig, err error) { + if !p.Contains("auth") { + return + } + p = p.Namespace("auth") + + if p.Contains("oauth") { + if c.OAuth2.Enabled, err = p.FieldBool("oauth", "enabled"); err != nil { + return + } + if c.OAuth2.Audience, err = p.FieldString("oauth", "audience"); err != nil { + return + } + if c.OAuth2.IssuerURL, err = p.FieldString("oauth", "issuer_url"); err != nil { + return + } + if c.OAuth2.PrivateKeyFile, err = p.FieldString("oauth", "private_key_file"); err != nil { + return + } + } + + if p.Contains("token") { + if c.Token.Enabled, err = p.FieldBool("token", "enabled"); err != nil { + return + } + if c.Token.Token, err = p.FieldString("token", "token"); err != nil { + return + } + } + return +} + +// Validate checks whether Config is valid. +func (c *authConfig) Validate() error { + if c.OAuth2.Enabled && c.Token.Enabled { + return errors.New("only one auth method can be enabled at once") + } + if c.OAuth2.Enabled { + return c.OAuth2.Validate() + } + if c.Token.Enabled { + return c.Token.Validate() + } + return nil +} + +// Validate checks whether OAuth2Config is valid. +func (c *oAuth2Config) Validate() error { + if c.Audience == "" { + return errors.New("oauth2 audience is empty") + } + if c.IssuerURL == "" { + return errors.New("oauth2 issuer URL is empty") + } + if c.PrivateKeyFile == "" { + return errors.New("oauth2 private key file is empty") + } + return nil +} + +// ToMap returns OAuth2Config as a map representing OAuth2 client credentails. +func (c *oAuth2Config) ToMap() map[string]string { + // Pulsar docs: https://pulsar.apache.org/docs/en/2.8.0/security-oauth2/#go-client + return map[string]string{ + "type": "client_credentials", + "issuerUrl": c.IssuerURL, + "audience": c.Audience, + "privateKey": c.PrivateKeyFile, + } +} + +// Validate checks whether TokenConfig is valid. +func (c *tokenConfig) Validate() error { + if c.Token == "" { + return errors.New("token is empty") + } + return nil +} diff --git a/internal/impl/pulsar/input.go b/internal/impl/pulsar/input.go index abf3175004..fc97c1f8c8 100644 --- a/internal/impl/pulsar/input.go +++ b/internal/impl/pulsar/input.go @@ -10,17 +10,8 @@ import ( "github.com/apache/pulsar-client-go/pulsar" - "github.com/benthosdev/benthos/v4/internal/bundle" "github.com/benthosdev/benthos/v4/internal/component" - iinput "github.com/benthosdev/benthos/v4/internal/component/input" - "github.com/benthosdev/benthos/v4/internal/component/metrics" - "github.com/benthosdev/benthos/v4/internal/docs" - "github.com/benthosdev/benthos/v4/internal/impl/pulsar/auth" - "github.com/benthosdev/benthos/v4/internal/log" - "github.com/benthosdev/benthos/v4/internal/message" - "github.com/benthosdev/benthos/v4/internal/old/input" - "github.com/benthosdev/benthos/v4/internal/old/input/reader" - "github.com/benthosdev/benthos/v4/internal/shutdown" + "github.com/benthosdev/benthos/v4/public/service" ) const ( @@ -28,25 +19,18 @@ const ( ) func init() { - bundle.AllInputs.Add(bundle.InputConstructorFromSimple(func(c input.Config, nm bundle.NewManagement) (iinput.Streamed, error) { - var a reader.Async - var err error - if a, err = newPulsarReader(c.Pulsar, nm.Logger(), nm.Metrics()); err != nil { - return nil, err - } - return input.NewAsyncReader(input.TypePulsar, false, a, nm.Logger(), nm.Metrics()) - }), docs.ComponentSpec{ - Name: input.TypePulsar, - Type: docs.TypeInput, - Status: docs.StatusExperimental, - Version: "3.43.0", - Summary: `Reads messages from an Apache Pulsar server.`, - Description: ` + err := service.RegisterInput( + "pulsar", + service.NewConfigSpec(). + Version("3.43.0"). + Categories("Services"). + Summary("Reads messages from an Apache Pulsar server."). + Description(` ### Metadata This input adds the following metadata fields to each message: -` + "```text" + ` +`+"```text"+` - pulsar_message_id - pulsar_key - pulsar_ordering_key @@ -56,28 +40,30 @@ This input adds the following metadata fields to each message: - pulsar_producer_name - pulsar_redelivery_count - All properties of the message -` + "```" + ` +`+"```"+` You can access these metadata fields using -[function interpolation](/docs/configuration/interpolation#metadata).`, - Categories: []string{ - string(input.CategoryServices), - }, - Config: docs.FieldComponent().WithChildren( - docs.FieldCommon("url", - "A URL to connect to.", - "pulsar://localhost:6650", - "pulsar://pulsar.us-west.example.com:6650", - "pulsar+ssl://pulsar.us-west.example.com:6651", - ), - docs.FieldString("topics", "A list of topics to subscribe to.").Array(), - docs.FieldCommon("subscription_name", "Specify the subscription name for this consumer."), - docs.FieldCommon("subscription_type", "Specify the subscription type for this consumer.\n\n> NOTE: Using a `key_shared` subscription type will __allow out-of-order delivery__ since nack-ing messages sets non-zero nack delivery delay - this can potentially cause consumers to stall. See [Pulsar documentation](https://pulsar.apache.org/docs/en/2.8.1/concepts-messaging/#negative-acknowledgement) and [this Github issue](https://github.com/apache/pulsar/issues/12208) for more details."). - HasOptions("shared", "key_shared", "failover", "exclusive"). - HasDefault(defaultSubscriptionType), - auth.FieldSpec(), - ).ChildDefaultAndTypesFromStruct(input.NewPulsarConfig()), - }) +[function interpolation](/docs/configuration/interpolation#metadata). +`). + Field(service.NewStringField("url"). + Description("A URL to connect to."). + Example("pulsar://localhost:6650"). + Example("pulsar://pulsar.us-west.example.com:6650"). + Example("pulsar+ssl://pulsar.us-west.example.com:6651")). + Field(service.NewStringListField("topics"). + Description("A list of topics to subscribe to.")). + Field(service.NewStringField("subscription_name"). + Description("Specify the subscription name for this consumer.")). + Field(service.NewStringEnumField("subscription_type", "shared", "key_shared", "failover", "exclusive"). + Description("Specify the subscription type for this consumer.\n\n> NOTE: Using a `key_shared` subscription type will __allow out-of-order delivery__ since nack-ing messages sets non-zero nack delivery delay - this can potentially cause consumers to stall. See [Pulsar documentation](https://pulsar.apache.org/docs/en/2.8.1/concepts-messaging/#negative-acknowledgement) and [this Github issue](https://github.com/apache/pulsar/issues/12208) for more details."). + Default(defaultSubscriptionType)). + Field(authField()), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return newPulsarReaderFromParsed(conf, mgr.Logger()) + }) + if err != nil { + panic(err) + } } //------------------------------------------------------------------------------ @@ -85,42 +71,62 @@ You can access these metadata fields using type pulsarReader struct { client pulsar.Client consumer pulsar.Consumer + m sync.RWMutex - conf input.PulsarConfig - stats metrics.Type - log log.Modular + log *service.Logger - m sync.RWMutex - shutSig *shutdown.Signaller + authConf authConfig + url string + topics []string + subName string + subType string } -func newPulsarReader(conf input.PulsarConfig, log log.Modular, stats metrics.Type) (*pulsarReader, error) { - if conf.URL == "" { - return nil, errors.New("field url must not be empty") +func newPulsarReaderFromParsed(conf *service.ParsedConfig, log *service.Logger) (p *pulsarReader, err error) { + p = &pulsarReader{ + log: log, } - if len(conf.Topics) == 0 { - return nil, errors.New("field topics must not be empty") + + if p.authConf, err = authFromParsed(conf); err != nil { + return } - if conf.SubscriptionName == "" { - return nil, errors.New("field subscription_name must not be empty") + + if p.url, err = conf.FieldString("url"); err != nil { + return } - if conf.SubscriptionType == "" { - conf.SubscriptionType = defaultSubscriptionType // set default subscription type if empty + if p.topics, err = conf.FieldStringList("topics"); err != nil { + return } - if _, err := parseSubscriptionType(conf.SubscriptionType); err != nil { - return nil, fmt.Errorf("field subscription_type is invalid: %v", err) + if p.subName, err = conf.FieldString("subscription_name"); err != nil { + return } - if err := conf.Auth.Validate(); err != nil { - return nil, fmt.Errorf("field auth is invalid: %v", err) + if p.subType, err = conf.FieldString("subscription_type"); err != nil { + return } - p := pulsarReader{ - conf: conf, - stats: stats, - log: log, - shutSig: shutdown.NewSignaller(), + if p.url == "" { + err = errors.New("field url must not be empty") + return } - return &p, nil + if len(p.topics) == 0 { + err = errors.New("field topics must not be empty") + return + } + if p.subName == "" { + err = errors.New("field subscription_name must not be empty") + return + } + if p.subType == "" { + p.subType = defaultSubscriptionType // set default subscription type if empty + } + if _, err = parseSubscriptionType(p.subType); err != nil { + err = fmt.Errorf("field subscription_type is invalid: %v", err) + return + } + if err = p.authConf.Validate(); err != nil { + err = fmt.Errorf("field auth is invalid: %v", err) + } + return } func parseSubscriptionType(subType string) (pulsar.SubscriptionType, error) { @@ -140,8 +146,7 @@ func parseSubscriptionType(subType string) (pulsar.SubscriptionType, error) { //------------------------------------------------------------------------------ -// ConnectWithContext establishes a connection to an Pulsar server. -func (p *pulsarReader) ConnectWithContext(ctx context.Context) error { +func (p *pulsarReader) Connect(ctx context.Context) error { p.m.Lock() defer p.m.Unlock() @@ -157,28 +162,28 @@ func (p *pulsarReader) ConnectWithContext(ctx context.Context) error { ) opts := pulsar.ClientOptions{ - Logger: DefaultLogger(p.log), + Logger: createDefaultLogger(p.log), ConnectionTimeout: time.Second * 3, - URL: p.conf.URL, + URL: p.url, } - if p.conf.Auth.OAuth2.Enabled { - opts.Authentication = pulsar.NewAuthenticationOAuth2(p.conf.Auth.OAuth2.ToMap()) - } else if p.conf.Auth.Token.Enabled { - opts.Authentication = pulsar.NewAuthenticationToken(p.conf.Auth.Token.Token) + if p.authConf.OAuth2.Enabled { + opts.Authentication = pulsar.NewAuthenticationOAuth2(p.authConf.OAuth2.ToMap()) + } else if p.authConf.Token.Enabled { + opts.Authentication = pulsar.NewAuthenticationToken(p.authConf.Token.Token) } if client, err = pulsar.NewClient(opts); err != nil { return err } - if subType, err = parseSubscriptionType(p.conf.SubscriptionType); err != nil { + if subType, err = parseSubscriptionType(p.subType); err != nil { return err } if consumer, err = client.Subscribe(pulsar.ConsumerOptions{ - Topics: p.conf.Topics, - SubscriptionName: p.conf.SubscriptionName, + Topics: p.topics, + SubscriptionName: p.subName, Type: subType, KeySharedPolicy: &pulsar.KeySharedPolicy{ AllowOutOfOrderDelivery: true, @@ -191,11 +196,10 @@ func (p *pulsarReader) ConnectWithContext(ctx context.Context) error { p.client = client p.consumer = consumer - p.log.Infof("Receiving Pulsar messages to URL: %v\n", p.conf.URL) + p.log.Infof("Receiving Pulsar messages to URL: %v\n", p.url) return nil } -// disconnect safely closes a connection to an Pulsar server. func (p *pulsarReader) disconnect(ctx context.Context) error { p.m.Lock() defer p.m.Unlock() @@ -209,17 +213,10 @@ func (p *pulsarReader) disconnect(ctx context.Context) error { p.consumer = nil p.client = nil - - if p.shutSig.ShouldCloseAtLeisure() { - p.shutSig.ShutdownComplete() - } return nil } -//------------------------------------------------------------------------------ - -// ReadWithContext a new Pulsar message. -func (p *pulsarReader) ReadWithContext(ctx context.Context) (*message.Batch, reader.AsyncAckFn, error) { +func (p *pulsarReader) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { var r pulsar.Consumer p.m.RLock() if p.consumer != nil { @@ -244,32 +241,28 @@ func (p *pulsarReader) ReadWithContext(ctx context.Context) (*message.Batch, rea return nil, nil, err } - msg := message.QuickBatch(nil) + msg := service.NewMessage(pulMsg.Payload()) - part := message.NewPart(pulMsg.Payload()) - - part.MetaSet("pulsar_message_id", string(pulMsg.ID().Serialize())) - part.MetaSet("pulsar_topic", pulMsg.Topic()) - part.MetaSet("pulsar_publish_time_unix", strconv.FormatInt(pulMsg.PublishTime().Unix(), 10)) - part.MetaSet("pulsar_redelivery_count", strconv.FormatInt(int64(pulMsg.RedeliveryCount()), 10)) + msg.MetaSet("pulsar_message_id", string(pulMsg.ID().Serialize())) + msg.MetaSet("pulsar_topic", pulMsg.Topic()) + msg.MetaSet("pulsar_publish_time_unix", strconv.FormatInt(pulMsg.PublishTime().Unix(), 10)) + msg.MetaSet("pulsar_redelivery_count", strconv.FormatInt(int64(pulMsg.RedeliveryCount()), 10)) if key := pulMsg.Key(); len(key) > 0 { - part.MetaSet("pulsar_key", key) + msg.MetaSet("pulsar_key", key) } if orderingKey := pulMsg.OrderingKey(); len(orderingKey) > 0 { - part.MetaSet("pulsar_ordering_key", orderingKey) + msg.MetaSet("pulsar_ordering_key", orderingKey) } if !pulMsg.EventTime().IsZero() { - part.MetaSet("pulsar_event_time_unix", strconv.FormatInt(pulMsg.EventTime().Unix(), 10)) + msg.MetaSet("pulsar_event_time_unix", strconv.FormatInt(pulMsg.EventTime().Unix(), 10)) } if producerName := pulMsg.ProducerName(); producerName != "" { - part.MetaSet("pulsar_producer_name", producerName) + msg.MetaSet("pulsar_producer_name", producerName) } for k, v := range pulMsg.Properties() { - part.MetaSet(k, v) + msg.MetaSet(k, v) } - msg.Append(part) - return msg, func(ctx context.Context, res error) error { var r pulsar.Consumer p.m.RLock() @@ -288,20 +281,6 @@ func (p *pulsarReader) ReadWithContext(ctx context.Context) (*message.Batch, rea }, nil } -// CloseAsync shuts down the Pulsar input and stops processing requests. -func (p *pulsarReader) CloseAsync() { - p.shutSig.CloseAtLeisure() - go p.disconnect(context.Background()) +func (p *pulsarReader) Close(ctx context.Context) error { + return p.disconnect(ctx) } - -// WaitForClose blocks until the Pulsar input has closed down. -func (p *pulsarReader) WaitForClose(timeout time.Duration) error { - select { - case <-p.shutSig.HasClosedChan(): - case <-time.After(timeout): - return component.ErrTimeout - } - return nil -} - -//------------------------------------------------------------------------------ diff --git a/internal/impl/pulsar/integration_test.go b/internal/impl/pulsar/integration_test.go index 7c06cf808c..48a01d8fca 100644 --- a/internal/impl/pulsar/integration_test.go +++ b/internal/impl/pulsar/integration_test.go @@ -25,7 +25,7 @@ func TestIntegrationPulsar(t *testing.T) { pool.MaxWait = time.Until(dline) } - resource, err := pool.Run("apachepulsar/pulsar-standalone", "latest", nil) + resource, err := pool.Run("apachepulsar/pulsar-standalone", "2.8.3", nil) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, pool.Purge(resource)) @@ -77,6 +77,7 @@ input: integration.StreamTestOptSleepAfterInput(500*time.Millisecond), integration.StreamTestOptSleepAfterOutput(500*time.Millisecond), integration.StreamTestOptPort(resource.GetPort("6650/tcp")), + integration.StreamTestOptLogging("TRACE"), ) t.Run("with max in flight", func(t *testing.T) { t.Parallel() diff --git a/internal/impl/pulsar/logger.go b/internal/impl/pulsar/logger.go index c3ddbcc2ab..abb8c6dcc7 100644 --- a/internal/impl/pulsar/logger.go +++ b/internal/impl/pulsar/logger.go @@ -3,18 +3,18 @@ package pulsar import ( plog "github.com/apache/pulsar-client-go/pulsar/log" - "github.com/benthosdev/benthos/v4/internal/log" + "github.com/benthosdev/benthos/v4/public/service" ) // DefaultLogger returns a logger that wraps Benthos Modular logger. -func DefaultLogger(l log.Modular) plog.Logger { +func createDefaultLogger(l *service.Logger) plog.Logger { return defaultLogger{ backend: l, } } type defaultLogger struct { - backend log.Modular + backend *service.Logger } func (l defaultLogger) SubLogger(fields plog.Fields) plog.Logger { diff --git a/internal/impl/pulsar/output.go b/internal/impl/pulsar/output.go index 1ae8a82a7a..4bba3e7bdc 100644 --- a/internal/impl/pulsar/output.go +++ b/internal/impl/pulsar/output.go @@ -2,62 +2,53 @@ package pulsar import ( "context" - "errors" - "fmt" "sync" "time" "github.com/apache/pulsar-client-go/pulsar" - "github.com/benthosdev/benthos/v4/internal/bloblang/field" - "github.com/benthosdev/benthos/v4/internal/bundle" "github.com/benthosdev/benthos/v4/internal/component" - "github.com/benthosdev/benthos/v4/internal/component/metrics" - ioutput "github.com/benthosdev/benthos/v4/internal/component/output" - "github.com/benthosdev/benthos/v4/internal/docs" - "github.com/benthosdev/benthos/v4/internal/impl/pulsar/auth" - "github.com/benthosdev/benthos/v4/internal/interop" - "github.com/benthosdev/benthos/v4/internal/log" - "github.com/benthosdev/benthos/v4/internal/message" - "github.com/benthosdev/benthos/v4/internal/old/output" - "github.com/benthosdev/benthos/v4/internal/old/output/writer" - "github.com/benthosdev/benthos/v4/internal/shutdown" + "github.com/benthosdev/benthos/v4/public/service" ) func init() { - bundle.AllOutputs.Add(bundle.OutputConstructorFromSimple(func(c output.Config, nm bundle.NewManagement) (ioutput.Streamed, error) { - w, err := newPulsarWriter(c.Pulsar, nm, nm.Logger(), nm.Metrics()) - if err != nil { - return nil, err - } - o, err := output.NewAsyncWriter(output.TypePulsar, c.Pulsar.MaxInFlight, w, nm.Logger(), nm.Metrics()) - if err != nil { - return nil, err - } - return output.OnlySinglePayloads(o), nil - }), docs.ComponentSpec{ - Name: output.TypePulsar, - Type: docs.TypeOutput, - Status: docs.StatusExperimental, - Version: "3.43.0", - Summary: `Write messages to an Apache Pulsar server.`, - Categories: []string{ - string(output.CategoryServices), - }, - Config: docs.FieldComponent().WithChildren( - docs.FieldCommon("url", - "A URL to connect to.", - "pulsar://localhost:6650", - "pulsar://pulsar.us-west.example.com:6650", - "pulsar+ssl://pulsar.us-west.example.com:6651", - ), - docs.FieldCommon("topic", "A topic to publish to."), - docs.FieldCommon("key", "The key to publish messages with.").IsInterpolated(), - docs.FieldCommon("ordering_key", "The ordering key to publish messages with.").IsInterpolated(), - docs.FieldCommon("max_in_flight", "The maximum number of messages to have in flight at a given time. Increase this to improve throughput."), - auth.FieldSpec(), - ).ChildDefaultAndTypesFromStruct(output.NewPulsarConfig()), - }) + err := service.RegisterOutput( + "pulsar", + service.NewConfigSpec(). + Version("3.43.0"). + Categories("Services"). + Summary("Write messages to an Apache Pulsar server."). + Field(service.NewStringField("url"). + Description("A URL to connect to."). + Example("pulsar://localhost:6650"). + Example("pulsar://pulsar.us-west.example.com:6650"). + Example("pulsar+ssl://pulsar.us-west.example.com:6651")). + Field(service.NewStringField("topic"). + Description("The topic to publish to.")). + Field(service.NewInterpolatedStringField("key"). + Description("The key to publish messages with."). + Default("")). + Field(service.NewInterpolatedStringField("ordering_key"). + Description("The ordering key to publish messages with."). + Default("")). + Field(service.NewIntField("max_in_flight"). + Description("The maximum number of messages to have in flight at a given time. Increase this to improve throughput."). + Default(64)). + Field(authField()), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Output, int, error) { + w, err := newPulsarWriterFromParsed(conf, mgr.Logger()) + if err != nil { + return nil, 0, err + } + n, err := conf.FieldInt("max_in_flight") + if err != nil { + return nil, 0, err + } + return w, n, err + }) + if err != nil { + panic(err) + } } //------------------------------------------------------------------------------ @@ -65,50 +56,44 @@ func init() { type pulsarWriter struct { client pulsar.Client producer pulsar.Producer + m sync.RWMutex - conf output.PulsarConfig - stats metrics.Type - log log.Modular - - key *field.Expression - orderingKey *field.Expression + log *service.Logger - m sync.RWMutex - shutSig *shutdown.Signaller + authConf authConfig + url string + topic string + key *service.InterpolatedString + orderingKey *service.InterpolatedString } -func newPulsarWriter(conf output.PulsarConfig, mgr interop.Manager, log log.Modular, stats metrics.Type) (*pulsarWriter, error) { - var err error - var key, orderingKey *field.Expression +func newPulsarWriterFromParsed(conf *service.ParsedConfig, log *service.Logger) (p *pulsarWriter, err error) { + p = &pulsarWriter{ + log: log, + } - if conf.URL == "" { - return nil, errors.New("field url must not be empty") + if p.authConf, err = authFromParsed(conf); err != nil { + return } - if conf.Topic == "" { - return nil, errors.New("field topic must not be empty") + + if p.url, err = conf.FieldString("url"); err != nil { + return } - if key, err = mgr.BloblEnvironment().NewField(conf.Key); err != nil { - return nil, fmt.Errorf("failed to parse key expression: %v", err) + if p.topic, err = conf.FieldString("topic"); err != nil { + return } - if orderingKey, err = mgr.BloblEnvironment().NewField(conf.OrderingKey); err != nil { - return nil, fmt.Errorf("failed to parse ordering_key expression: %v", err) + if p.key, err = conf.FieldInterpolatedString("key"); err != nil { + return } - - p := pulsarWriter{ - conf: conf, - stats: stats, - log: log, - key: key, - orderingKey: orderingKey, - shutSig: shutdown.NewSignaller(), + if p.orderingKey, err = conf.FieldInterpolatedString("ordering_key"); err != nil { + return } - return &p, nil + return } //------------------------------------------------------------------------------ -// ConnectWithContext establishes a connection to an Pulsar server. -func (p *pulsarWriter) ConnectWithContext(ctx context.Context) error { +func (p *pulsarWriter) Connect(ctx context.Context) error { p.m.Lock() defer p.m.Unlock() @@ -123,15 +108,15 @@ func (p *pulsarWriter) ConnectWithContext(ctx context.Context) error { ) opts := pulsar.ClientOptions{ - Logger: DefaultLogger(p.log), + Logger: createDefaultLogger(p.log), ConnectionTimeout: time.Second * 3, - URL: p.conf.URL, + URL: p.url, } - if p.conf.Auth.OAuth2.Enabled { - opts.Authentication = pulsar.NewAuthenticationOAuth2(p.conf.Auth.OAuth2.ToMap()) - } else if p.conf.Auth.Token.Enabled { - opts.Authentication = pulsar.NewAuthenticationToken(p.conf.Auth.Token.Token) + if p.authConf.OAuth2.Enabled { + opts.Authentication = pulsar.NewAuthenticationOAuth2(p.authConf.OAuth2.ToMap()) + } else if p.authConf.Token.Enabled { + opts.Authentication = pulsar.NewAuthenticationToken(p.authConf.Token.Token) } if client, err = pulsar.NewClient(opts); err != nil { @@ -139,7 +124,7 @@ func (p *pulsarWriter) ConnectWithContext(ctx context.Context) error { } if producer, err = client.CreateProducer(pulsar.ProducerOptions{ - Topic: p.conf.Topic, + Topic: p.topic, }); err != nil { client.Close() return err @@ -148,7 +133,7 @@ func (p *pulsarWriter) ConnectWithContext(ctx context.Context) error { p.client = client p.producer = producer - p.log.Infof("Writing Pulsar messages to URL: %v\n", p.conf.URL) + p.log.Infof("Writing Pulsar messages to URL: %v\n", p.url) return nil } @@ -166,18 +151,12 @@ func (p *pulsarWriter) disconnect(ctx context.Context) error { p.producer = nil p.client = nil - - if p.shutSig.ShouldCloseAtLeisure() { - p.shutSig.ShutdownComplete() - } return nil } //------------------------------------------------------------------------------ -// WriteWithContext will attempt to write a message over Pulsar, wait for -// acknowledgement, and returns an error if applicable. -func (p *pulsarWriter) WriteWithContext(ctx context.Context, msg *message.Batch) error { +func (p *pulsarWriter) Write(ctx context.Context, msg *service.Message) error { var r pulsar.Producer p.m.RLock() if p.producer != nil { @@ -189,33 +168,25 @@ func (p *pulsarWriter) WriteWithContext(ctx context.Context, msg *message.Batch) return component.ErrNotConnected } - return writer.IterateBatchedSend(msg, func(i int, part *message.Part) error { - m := &pulsar.ProducerMessage{ - Payload: part.Get(), - } - if key := p.key.Bytes(i, msg); len(key) > 0 { - m.Key = string(key) - } - if orderingKey := p.orderingKey.Bytes(i, msg); len(orderingKey) > 0 { - m.OrderingKey = string(orderingKey) - } - _, err := r.Send(context.Background(), m) + b, err := msg.AsBytes() + if err != nil { return err - }) -} + } -// CloseAsync shuts down the Pulsar input and stops processing requests. -func (p *pulsarWriter) CloseAsync() { - p.shutSig.CloseAtLeisure() - go p.disconnect(context.Background()) + m := &pulsar.ProducerMessage{ + Payload: b, + } + if key := p.key.Bytes(msg); len(key) > 0 { + m.Key = string(key) + } + if orderingKey := p.orderingKey.Bytes(msg); len(orderingKey) > 0 { + m.OrderingKey = string(orderingKey) + } + + _, err = r.Send(context.Background(), m) + return err } -// WaitForClose blocks until the Pulsar input has closed down. -func (p *pulsarWriter) WaitForClose(timeout time.Duration) error { - select { - case <-p.shutSig.HasClosedChan(): - case <-time.After(timeout): - return component.ErrTimeout - } - return nil +func (p *pulsarWriter) Close(ctx context.Context) error { + return p.disconnect(ctx) } diff --git a/internal/old/input/constructor.go b/internal/old/input/constructor.go index a56c4b01e6..01c4674574 100644 --- a/internal/old/input/constructor.go +++ b/internal/old/input/constructor.go @@ -155,7 +155,6 @@ const ( TypeNATSJetStream = "nats_jetstream" TypeNATSStream = "nats_stream" TypeNSQ = "nsq" - TypePulsar = "pulsar" TypeReadUntil = "read_until" TypeRedisList = "redis_list" TypeRedisPubSub = "redis_pubsub" @@ -203,7 +202,6 @@ type Config struct { NATSStream reader.NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ reader.NSQConfig `json:"nsq" yaml:"nsq"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` - Pulsar PulsarConfig `json:"pulsar" yaml:"pulsar"` ReadUntil ReadUntilConfig `json:"read_until" yaml:"read_until"` RedisList reader.RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub reader.RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` @@ -251,7 +249,6 @@ func NewConfig() Config { NATSStream: reader.NewNATSStreamConfig(), NSQ: reader.NewNSQConfig(), Plugin: nil, - Pulsar: NewPulsarConfig(), ReadUntil: NewReadUntilConfig(), RedisList: reader.NewRedisListConfig(), RedisPubSub: reader.NewRedisPubSubConfig(), diff --git a/internal/old/input/pulsar.go b/internal/old/input/pulsar.go deleted file mode 100644 index 480abef2a2..0000000000 --- a/internal/old/input/pulsar.go +++ /dev/null @@ -1,25 +0,0 @@ -package input - -import ( - "github.com/benthosdev/benthos/v4/internal/impl/pulsar/auth" -) - -// PulsarConfig contains configuration for the Pulsar input type. -type PulsarConfig struct { - URL string `json:"url" yaml:"url"` - Topics []string `json:"topics" yaml:"topics"` - SubscriptionName string `json:"subscription_name" yaml:"subscription_name"` - SubscriptionType string `json:"subscription_type" yaml:"subscription_type"` - Auth auth.Config `json:"auth" yaml:"auth"` -} - -// NewPulsarConfig creates a new PulsarConfig with default values. -func NewPulsarConfig() PulsarConfig { - return PulsarConfig{ - URL: "", - Topics: []string{}, - SubscriptionName: "", - SubscriptionType: "", - Auth: auth.New(), - } -} diff --git a/internal/old/output/constructor.go b/internal/old/output/constructor.go index 1c96e24a23..938a4d3915 100644 --- a/internal/old/output/constructor.go +++ b/internal/old/output/constructor.go @@ -171,7 +171,6 @@ const ( TypeNATSJetStream = "nats_jetstream" TypeNATSStream = "nats_stream" TypeNSQ = "nsq" - TypePulsar = "pulsar" TypeRedisHash = "redis_hash" TypeRedisList = "redis_list" TypeRedisPubSub = "redis_pubsub" @@ -230,7 +229,6 @@ type Config struct { NATSStream writer.NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ writer.NSQConfig `json:"nsq" yaml:"nsq"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` - Pulsar PulsarConfig `json:"pulsar" yaml:"pulsar"` RedisHash writer.RedisHashConfig `json:"redis_hash" yaml:"redis_hash"` RedisList writer.RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub writer.RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` @@ -289,7 +287,6 @@ func NewConfig() Config { NATSStream: writer.NewNATSStreamConfig(), NSQ: writer.NewNSQConfig(), Plugin: nil, - Pulsar: NewPulsarConfig(), RedisHash: writer.NewRedisHashConfig(), RedisList: writer.NewRedisListConfig(), RedisPubSub: writer.NewRedisPubSubConfig(), diff --git a/internal/old/output/pulsar.go b/internal/old/output/pulsar.go deleted file mode 100644 index fd4b7e08ad..0000000000 --- a/internal/old/output/pulsar.go +++ /dev/null @@ -1,25 +0,0 @@ -package output - -import "github.com/benthosdev/benthos/v4/internal/impl/pulsar/auth" - -// PulsarConfig contains configuration for the Pulsar input type. -type PulsarConfig struct { - URL string `json:"url" yaml:"url"` - Topic string `json:"topic" yaml:"topic"` - MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` - Key string `json:"key" yaml:"key"` - OrderingKey string `json:"ordering_key" yaml:"ordering_key"` - Auth auth.Config `json:"auth" yaml:"auth"` -} - -// NewPulsarConfig creates a new PulsarConfig with default values. -func NewPulsarConfig() PulsarConfig { - return PulsarConfig{ - URL: "", - Topic: "", - MaxInFlight: 64, - Key: "", - OrderingKey: "", - Auth: auth.New(), - } -} diff --git a/public/components/all/package.go b/public/components/all/package.go index 55b9b91395..c867fc4cbf 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -23,7 +23,6 @@ import ( _ "github.com/benthosdev/benthos/v4/internal/impl/nats" _ "github.com/benthosdev/benthos/v4/internal/impl/parquet" _ "github.com/benthosdev/benthos/v4/internal/impl/prometheus" - _ "github.com/benthosdev/benthos/v4/internal/impl/pulsar" _ "github.com/benthosdev/benthos/v4/internal/impl/redis" _ "github.com/benthosdev/benthos/v4/internal/impl/sql" _ "github.com/benthosdev/benthos/v4/internal/impl/statsd" diff --git a/public/components/pulsar/package.go b/public/components/pulsar/package.go new file mode 100644 index 0000000000..f3cf1d6df7 --- /dev/null +++ b/public/components/pulsar/package.go @@ -0,0 +1,3 @@ +package pulsar + +import _ "github.com/benthosdev/benthos/v4/internal/impl/pulsar" diff --git a/website/docs/components/inputs/pulsar.md b/website/docs/components/inputs/pulsar.md deleted file mode 100644 index 7a27b4d80e..0000000000 --- a/website/docs/components/inputs/pulsar.md +++ /dev/null @@ -1,218 +0,0 @@ ---- -title: pulsar -type: input -status: experimental -categories: ["Services"] ---- - - - -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; - -:::caution EXPERIMENTAL -This component is experimental and therefore subject to change or removal outside of major version releases. -::: -Reads messages from an Apache Pulsar server. - -Introduced in version 3.43.0. - - - - - - -```yml -# Common config fields, showing default values -input: - label: "" - pulsar: - url: "" - topics: [] - subscription_name: "" - subscription_type: "" -``` - - - - -```yml -# All config fields, showing default values -input: - label: "" - pulsar: - url: "" - topics: [] - subscription_name: "" - subscription_type: "" - auth: - oauth2: - enabled: false - audience: "" - issuer_url: "" - private_key_file: "" - token: - enabled: false - token: "" -``` - - - - -### Metadata - -This input adds the following metadata fields to each message: - -```text -- pulsar_message_id -- pulsar_key -- pulsar_ordering_key -- pulsar_event_time_unix -- pulsar_publish_time_unix -- pulsar_topic -- pulsar_producer_name -- pulsar_redelivery_count -- All properties of the message -``` - -You can access these metadata fields using -[function interpolation](/docs/configuration/interpolation#metadata). - -## Fields - -### `url` - -A URL to connect to. - - -Type: `string` -Default: `""` - -```yml -# Examples - -url: pulsar://localhost:6650 - -url: pulsar://pulsar.us-west.example.com:6650 - -url: pulsar+ssl://pulsar.us-west.example.com:6651 -``` - -### `topics` - -A list of topics to subscribe to. - - -Type: `array` -Default: `[]` - -### `subscription_name` - -Specify the subscription name for this consumer. - - -Type: `string` -Default: `""` - -### `subscription_type` - -Specify the subscription type for this consumer. - -> NOTE: Using a `key_shared` subscription type will __allow out-of-order delivery__ since nack-ing messages sets non-zero nack delivery delay - this can potentially cause consumers to stall. See [Pulsar documentation](https://pulsar.apache.org/docs/en/2.8.1/concepts-messaging/#negative-acknowledgement) and [this Github issue](https://github.com/apache/pulsar/issues/12208) for more details. - - -Type: `string` -Default: `"shared"` -Options: `shared`, `key_shared`, `failover`, `exclusive`. - -### `auth` - -Optional configuration of Pulsar authentication methods. - - -Type: `object` -Requires version 3.60.0 or newer - -### `auth.oauth2` - -Parameters for Pulsar OAuth2 authentication. - - -Type: `object` - -### `auth.oauth2.enabled` - -Whether OAuth2 is enabled. - - -Type: `bool` -Default: `false` - -```yml -# Examples - -enabled: true -``` - -### `auth.oauth2.audience` - -OAuth2 audience. - - -Type: `string` -Default: `""` - -### `auth.oauth2.issuer_url` - -OAuth2 issuer URL. - - -Type: `string` -Default: `""` - -### `auth.oauth2.private_key_file` - -File containing the private key. - - -Type: `string` -Default: `""` - -### `auth.token` - -Parameters for Pulsar Token authentication. - - -Type: `object` - -### `auth.token.enabled` - -Whether Token Auth is enabled. - - -Type: `bool` -Default: `false` - -```yml -# Examples - -enabled: true -``` - -### `auth.token.token` - -Actual base64 encoded token. - - -Type: `string` -Default: `""` - - diff --git a/website/docs/components/outputs/pulsar.md b/website/docs/components/outputs/pulsar.md deleted file mode 100644 index 10c9587b93..0000000000 --- a/website/docs/components/outputs/pulsar.md +++ /dev/null @@ -1,208 +0,0 @@ ---- -title: pulsar -type: output -status: experimental -categories: ["Services"] ---- - - - -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; - -:::caution EXPERIMENTAL -This component is experimental and therefore subject to change or removal outside of major version releases. -::: -Write messages to an Apache Pulsar server. - -Introduced in version 3.43.0. - - - - - - -```yml -# Common config fields, showing default values -output: - label: "" - pulsar: - url: "" - topic: "" - key: "" - ordering_key: "" - max_in_flight: 64 -``` - - - - -```yml -# All config fields, showing default values -output: - label: "" - pulsar: - url: "" - topic: "" - key: "" - ordering_key: "" - max_in_flight: 64 - auth: - oauth2: - enabled: false - audience: "" - issuer_url: "" - private_key_file: "" - token: - enabled: false - token: "" -``` - - - - -## Fields - -### `url` - -A URL to connect to. - - -Type: `string` -Default: `""` - -```yml -# Examples - -url: pulsar://localhost:6650 - -url: pulsar://pulsar.us-west.example.com:6650 - -url: pulsar+ssl://pulsar.us-west.example.com:6651 -``` - -### `topic` - -A topic to publish to. - - -Type: `string` -Default: `""` - -### `key` - -The key to publish messages with. -This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). - - -Type: `string` -Default: `""` - -### `ordering_key` - -The ordering key to publish messages with. -This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). - - -Type: `string` -Default: `""` - -### `max_in_flight` - -The maximum number of messages to have in flight at a given time. Increase this to improve throughput. - - -Type: `int` -Default: `64` - -### `auth` - -Optional configuration of Pulsar authentication methods. - - -Type: `object` -Requires version 3.60.0 or newer - -### `auth.oauth2` - -Parameters for Pulsar OAuth2 authentication. - - -Type: `object` - -### `auth.oauth2.enabled` - -Whether OAuth2 is enabled. - - -Type: `bool` -Default: `false` - -```yml -# Examples - -enabled: true -``` - -### `auth.oauth2.audience` - -OAuth2 audience. - - -Type: `string` -Default: `""` - -### `auth.oauth2.issuer_url` - -OAuth2 issuer URL. - - -Type: `string` -Default: `""` - -### `auth.oauth2.private_key_file` - -File containing the private key. - - -Type: `string` -Default: `""` - -### `auth.token` - -Parameters for Pulsar Token authentication. - - -Type: `object` - -### `auth.token.enabled` - -Whether Token Auth is enabled. - - -Type: `bool` -Default: `false` - -```yml -# Examples - -enabled: true -``` - -### `auth.token.token` - -Actual base64 encoded token. - - -Type: `string` -Default: `""` - - diff --git a/website/docs/guides/migration/v4.md b/website/docs/guides/migration/v4.md index 5106907501..009686989c 100644 --- a/website/docs/guides/migration/v4.md +++ b/website/docs/guides/migration/v4.md @@ -30,6 +30,28 @@ It should be pretty quick to update your imports, either using a tool or just ru grep "Jeffail/benthos/v3" . -Rl | xargs -I{} sed -i 's/Jeffail\/benthos\/v3/benthosdev\/benthos\/v4/g' {} ``` +## Pulsar Components Disabled (for now) + +There have been multiple issues with the Go Pulsar client libraries. Since some are still outstanding and causing problems with unrelated components the decision has been made to remove the `pulsar` input and output from standard builds. However, it is still possible to build custom versions of Benthos with them included by importing the package `./public/components/pulsar`: + +```go +package main + +import ( + "context" + + "github.com/Jeffail/benthos/v3/public/service" + + // Import all plugins defined within the repo. + _ "github.com/benthosdev/benthos/v4/public/components/all" + _ "github.com/benthosdev/benthos/v4/public/components/pulsar" +) + +func main() { + service.RunCLI(context.Background()) +} +``` + ## Pipeline Threads Behaviour Change https://github.com/benthosdev/benthos/issues/399