Skip to content

Commit

Permalink
[chore] Add pipeline module (#11209)
Browse files Browse the repository at this point in the history
#### Description
To facilitate the work in
#11204 as
some breaking changes and some deprecations, this PR adds the new
pipeline module separately so that future PRs can handle the breaking
changes and deprecations.

In order to make `Signal` uninstantiable outside of this repo, while
still being extendable in places like `componentprofiles`, a new
internal module is added to handle the `Signal` logic. To reduce the
dependency sprawl that would happen if `signal` was an internal package
in `go.opentelemetry.io/collector`, I made it a module, similar to
`globalgates`.

<!-- Issue number if applicable -->
#### Link to tracking issue
Related to
#10947

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added unit tests

<!--Describe the documentation added.-->
#### Documentation
Added godoc comments
  • Loading branch information
TylerHelmuth authored Sep 20, 2024
1 parent 1391fab commit 0c7d347
Show file tree
Hide file tree
Showing 15 changed files with 484 additions and 0 deletions.
25 changes: 25 additions & 0 deletions .chloggen/add-pipeline-module.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pipeline

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds new `pipeline` module to house the concept of pipeline ID and Signal.

# One or more tracking issues or pull requests related to the change
issues: [11209]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,13 @@ check-contrib:
-replace go.opentelemetry.io/collector/extension/zpagesextension=$(CURDIR)/extension/zpagesextension \
-replace go.opentelemetry.io/collector/featuregate=$(CURDIR)/featuregate \
-replace go.opentelemetry.io/collector/internal/globalgates=$(CURDIR)/internal/globalgates \
-replace go.opentelemetry.io/collector/internal/globalsignal=$(CURDIR)/internal/globalsignal \
-replace go.opentelemetry.io/collector/otelcol=$(CURDIR)/otelcol \
-replace go.opentelemetry.io/collector/otelcol/otelcoltest=$(CURDIR)/otelcol/otelcoltest \
-replace go.opentelemetry.io/collector/pdata=$(CURDIR)/pdata \
-replace go.opentelemetry.io/collector/pdata/testdata=$(CURDIR)/pdata/testdata \
-replace go.opentelemetry.io/collector/pdata/pprofile=$(CURDIR)/pdata/pprofile \
-replace go.opentelemetry.io/collector/pipeline=$(CURDIR)/pipeline \
-replace go.opentelemetry.io/collector/processor=$(CURDIR)/processor \
-replace go.opentelemetry.io/collector/processor/batchprocessor=$(CURDIR)/processor/batchprocessor \
-replace go.opentelemetry.io/collector/processor/memorylimiterprocessor=$(CURDIR)/processor/memorylimiterprocessor \
Expand Down Expand Up @@ -369,11 +371,13 @@ restore-contrib:
-dropreplace go.opentelemetry.io/collector/extension/zpagesextension \
-dropreplace go.opentelemetry.io/collector/featuregate \
-dropreplace go.opentelemetry.io/collector/internal/globalgates \
-dropreplace go.opentelemetry.io/collector/internal/globalsignal \
-dropreplace go.opentelemetry.io/collector/otelcol \
-dropreplace go.opentelemetry.io/collector/otelcol/otelcoltest \
-dropreplace go.opentelemetry.io/collector/pdata \
-dropreplace go.opentelemetry.io/collector/pdata/testdata \
-dropreplace go.opentelemetry.io/collector/pdata/pprofile \
-dropreplace go.opentelemetry.io/collector/pipeline \
-dropreplace go.opentelemetry.io/collector/processor \
-dropreplace go.opentelemetry.io/collector/processor/batchprocessor \
-dropreplace go.opentelemetry.io/collector/processor/memorylimiterprocessor \
Expand Down
1 change: 1 addition & 0 deletions internal/globalsignal/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
11 changes: 11 additions & 0 deletions internal/globalsignal/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module go.opentelemetry.io/collector/internal/globalsignal

go 1.22.0

require github.com/stretchr/testify v1.9.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions internal/globalsignal/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions internal/globalsignal/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package globalsignal // import "go.opentelemetry.io/collector/internal/globalsignal"

import (
"fmt"
"regexp"
)

// Signal represents the signals supported by the collector.
type Signal struct {
name string
}

// String returns the string representation of the signal.
func (s Signal) String() string {
return s.name
}

// MarshalText marshals the Signal.
func (s Signal) MarshalText() (text []byte, err error) {
return []byte(s.name), nil
}

// signalRegex is used to validate the signal.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
var signalRegex = regexp.MustCompile(`^[a-z]{1,62}$`)

// NewSignal creates a Signal. It returns an error if the Signal is invalid.
// A Signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func NewSignal(signal string) (Signal, error) {
if len(signal) == 0 {
return Signal{}, fmt.Errorf("signal must not be empty")
}
if !signalRegex.MatchString(signal) {
return Signal{}, fmt.Errorf("invalid character(s) in type %q", signal)
}
return Signal{name: signal}, nil
}

// MustNewSignal creates a Signal. It panics if the Signal is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func MustNewSignal(signal string) Signal {
s, err := NewSignal(signal)
if err != nil {
panic(err)
}
return s
}
41 changes: 41 additions & 0 deletions internal/globalsignal/signal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package globalsignal

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_NewSignal(t *testing.T) {
s, err := NewSignal("traces")
require.NoError(t, err)
assert.Equal(t, Signal{name: "traces"}, s)
}

func Test_NewSignal_Invalid(t *testing.T) {
_, err := NewSignal("")
require.Error(t, err)
_, err = NewSignal("TRACES")
require.Error(t, err)
}

func Test_MustNewSignal(t *testing.T) {
s := MustNewSignal("traces")
assert.Equal(t, Signal{name: "traces"}, s)
}

func Test_Signal_String(t *testing.T) {
s := MustNewSignal("traces")
assert.Equal(t, "traces", s.String())
}

func Test_Signal_MarshalText(t *testing.T) {
s := MustNewSignal("traces")
b, err := s.MarshalText()
require.NoError(t, err)
assert.Equal(t, []byte("traces"), b)
}
1 change: 1 addition & 0 deletions pipeline/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../Makefile.Common
16 changes: 16 additions & 0 deletions pipeline/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module go.opentelemetry.io/collector/pipeline

go 1.22.0

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/internal/globalsignal v0.109.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/internal/globalsignal => ../internal/globalsignal
10 changes: 10 additions & 0 deletions pipeline/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 131 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package pipeline // import "go.opentelemetry.io/collector/pipeline"
import (
"errors"
"fmt"
"regexp"
"strings"

"go.opentelemetry.io/collector/internal/globalsignal"
)

// typeAndNameSeparator is the separator that is used between type and name in type/name composite keys.
const typeAndNameSeparator = "/"

// ID represents the identity for a pipeline. It combines two values:
// * signal - the Signal of the pipeline.
// * name - the name of that pipeline.
type ID struct {
signal Signal `mapstructure:"-"`
name string `mapstructure:"-"`
}

// NewID returns a new ID with the given Signal and empty name.
func NewID(signal Signal) ID {
return ID{signal: signal}
}

// MustNewID builds a Signal and returns a new ID with the given Signal and empty name.
// It panics if the Signal is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func MustNewID(signal string) ID {
return ID{signal: globalsignal.MustNewSignal(signal)}
}

// NewIDWithName returns a new ID with the given Signal and name.
func NewIDWithName(signal Signal, name string) ID {
return ID{signal: signal, name: name}
}

// MustNewIDWithName builds a Signal and returns a new ID with the given Signal and name.
// It panics if the Signal is invalid or name is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
// A name must consist of 1 to 1024 unicode characters excluding whitespace, control characters, and symbols.
func MustNewIDWithName(signal string, name string) ID {
id := ID{signal: globalsignal.MustNewSignal(signal)}
err := validateName(name)
if err != nil {
panic(err)
}
id.name = name
return id
}

// Signal returns the Signal of the ID.
func (i ID) Signal() Signal {
return i.signal
}

// Name returns the name of the ID.
func (i ID) Name() string {
return i.name
}

// MarshalText implements the encoding.TextMarshaler interface.
// This marshals the Signal and name as one string in the config.
func (i ID) MarshalText() (text []byte, err error) {
return []byte(i.String()), nil
}

// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (i *ID) UnmarshalText(text []byte) error {
idStr := string(text)
items := strings.SplitN(idStr, typeAndNameSeparator, 2)
var signalStr, nameStr string
if len(items) >= 1 {
signalStr = strings.TrimSpace(items[0])
}

if len(items) == 1 && signalStr == "" {
return errors.New("id must not be empty")
}

if signalStr == "" {
return fmt.Errorf("in %q id: the part before %s should not be empty", idStr, typeAndNameSeparator)
}

if len(items) > 1 {
// "name" part is present.
nameStr = strings.TrimSpace(items[1])
if nameStr == "" {
return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator)
}
if err := validateName(nameStr); err != nil {
return fmt.Errorf("in %q id: %w", nameStr, err)
}
}

var err error
if i.signal, err = globalsignal.NewSignal(signalStr); err != nil {
return fmt.Errorf("in %q id: %w", idStr, err)
}
i.name = nameStr

return nil
}

// String returns the ID string representation as "signal[/name]" format.
func (i ID) String() string {
if i.name == "" {
return i.signal.String()
}

return i.signal.String() + typeAndNameSeparator + i.name
}

// nameRegexp is used to validate the name of an ID. A name can consist of
// 1 to 1024 unicode characters excluding whitespace, control characters, and
// symbols.
var nameRegexp = regexp.MustCompile(`^[^\pZ\pC\pS]+$`)

func validateName(nameStr string) error {
if len(nameStr) > 1024 {
return fmt.Errorf("name %q is longer than 1024 characters (%d characters)", nameStr, len(nameStr))
}
if !nameRegexp.MatchString(nameStr) {
return fmt.Errorf("invalid character(s) in name %q", nameStr)
}
return nil
}
Loading

0 comments on commit 0c7d347

Please sign in to comment.