-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add observer support to receiver_creator (#173)
* Add observer notification interface (k8s observer will be in separate PR) * Refactor receiver_creator to be more easily testable and organized * receiver.go mostly implements OT interface and delegates to the new files * observerhandler.go responds to observer events and manages the starting/stopping of receivers * rules.go implements rules evaluation (not currently implemented) * runner.go contains a runner interface that handles the details of how to start and stop a receiver instance that the observer handler wants to start/stop * Implement basic add/remove/change response in receiver_creator to observer events
- Loading branch information
Showing
19 changed files
with
759 additions
and
157 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
module github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer | ||
|
||
go 1.14 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
// Copyright 2020, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package observer | ||
|
||
import ( | ||
"fmt" | ||
) | ||
|
||
// Protocol defines network protocol for container ports. | ||
type Protocol string | ||
|
||
const ( | ||
// ProtocolTCP is the TCP protocol. | ||
ProtocolTCP Protocol = "TCP" | ||
// ProtocolUDP is the UDP protocol. | ||
ProtocolUDP Protocol = "UDP" | ||
) | ||
|
||
// Endpoint is a service that can be contacted remotely. | ||
type Endpoint interface { | ||
// ID uniquely identifies this endpoint. | ||
ID() string | ||
// Target is an IP address or hostname of the endpoint. | ||
Target() string | ||
// String pretty formats the endpoint. | ||
String() string | ||
// Labels is a map of arbitrary metadata. | ||
Labels() map[string]string | ||
} | ||
|
||
// endpointBase is common endpoint data used across all endpoint types. | ||
type endpointBase struct { | ||
id string | ||
target string | ||
labels map[string]string | ||
} | ||
|
||
func (e *endpointBase) ID() string { | ||
return e.id | ||
} | ||
|
||
func (e *endpointBase) Target() string { | ||
return e.target | ||
} | ||
|
||
func (e *endpointBase) Labels() map[string]string { | ||
return e.labels | ||
} | ||
|
||
// HostEndpoint is an endpoint that just has a target but no identifying port information. | ||
type HostEndpoint struct { | ||
endpointBase | ||
} | ||
|
||
func (h *HostEndpoint) String() string { | ||
return fmt.Sprintf("HostEndpoint{id: %v, Target: %v, Labels: %v}", h.ID(), h.target, h.labels) | ||
} | ||
|
||
// NewHostEndpoint creates a HostEndpoint | ||
func NewHostEndpoint(id string, target string, labels map[string]string) *HostEndpoint { | ||
return &HostEndpoint{endpointBase{ | ||
id: id, | ||
target: target, | ||
labels: labels, | ||
}} | ||
} | ||
|
||
var _ Endpoint = (*HostEndpoint)(nil) | ||
|
||
// PortEndpoint is an endpoint that has a target as well as a port. | ||
type PortEndpoint struct { | ||
endpointBase | ||
Port uint16 | ||
} | ||
|
||
func (p *PortEndpoint) String() string { | ||
return fmt.Sprintf("PortEndpoint{ID: %v, Target: %v, Port: %d, Labels: %v}", p.ID(), p.target, p.Port, p.labels) | ||
} | ||
|
||
// NewPortEndpoint creates a PortEndpoint. | ||
func NewPortEndpoint(id string, target string, port uint16, labels map[string]string) *PortEndpoint { | ||
return &PortEndpoint{endpointBase: endpointBase{ | ||
id: id, | ||
target: target, | ||
labels: labels, | ||
}, Port: port} | ||
} | ||
|
||
var _ Endpoint = (*PortEndpoint)(nil) | ||
|
||
// Observable is an interface that provides notification of endpoint changes. | ||
type Observable interface { | ||
// TODO: Stopping. | ||
// ListAndWatch provides initial state sync as well as change notification. | ||
// notify.OnAdd will be called one or more times if there are endpoints discovered. | ||
// (It would not be called if there are no endpoints present.) The endpoint synchronization | ||
// happens asynchronously to this call. | ||
ListAndWatch(notify Notify) | ||
} | ||
|
||
// Notify is the callback for Observer events. | ||
type Notify interface { | ||
// OnAdd is called once or more initially for state sync as well as when further endpoints are added. | ||
OnAdd(added []Endpoint) | ||
// OnRemove is called when one or more endpoints are removed. | ||
OnRemove(removed []Endpoint) | ||
// OnChange is called when one ore more endpoints are modified but the identity is not changed | ||
// (e.g. labels). | ||
OnChange(changed []Endpoint) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
// Copyright 2020, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package receivercreator | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector/component/componenterror" | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" | ||
) | ||
|
||
var _ observer.Notify = (*observerHandler)(nil) | ||
|
||
// observerHandler manages endpoint change notifications. | ||
type observerHandler struct { | ||
sync.Mutex | ||
logger *zap.Logger | ||
// receiverTemplates maps receiver template full name to a receiverTemplate value. | ||
receiverTemplates map[string]receiverTemplate | ||
// receiversByEndpointID is a map of endpoint IDs to a receiver instance. | ||
receiversByEndpointID receiverMap | ||
// runner starts and stops receiver instances. | ||
runner runner | ||
} | ||
|
||
// Shutdown all receivers started at runtime. | ||
func (obs *observerHandler) Shutdown() error { | ||
obs.Lock() | ||
defer obs.Unlock() | ||
|
||
var errs []error | ||
|
||
for _, rcvr := range obs.receiversByEndpointID.Values() { | ||
if err := obs.runner.shutdown(rcvr); err != nil { | ||
// TODO: Should keep track of which receiver the error is associated with | ||
// but require some restructuring. | ||
errs = append(errs, err) | ||
} | ||
} | ||
|
||
if len(errs) > 0 { | ||
return fmt.Errorf("shutdown on %d receivers failed: %v", len(errs), componenterror.CombineErrors(errs)) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// OnAdd responds to endpoint add notifications. | ||
func (obs *observerHandler) OnAdd(added []observer.Endpoint) { | ||
obs.Lock() | ||
defer obs.Unlock() | ||
|
||
for _, e := range added { | ||
for _, template := range obs.receiverTemplates { | ||
if !ruleMatches(template.Rule, e) { | ||
continue | ||
} | ||
rcvr, err := obs.runner.start(template.receiverConfig, userConfigMap{ | ||
endpointConfigKey: e.Target(), | ||
}) | ||
if err != nil { | ||
obs.logger.Error("failed to start receiver", zap.String("receiver", template.fullName)) | ||
continue | ||
} | ||
|
||
obs.receiversByEndpointID.Put(e.ID(), rcvr) | ||
} | ||
} | ||
} | ||
|
||
// OnRemove responds to endpoint removal notifications. | ||
func (obs *observerHandler) OnRemove(removed []observer.Endpoint) { | ||
obs.Lock() | ||
defer obs.Unlock() | ||
|
||
for _, e := range removed { | ||
for _, rcvr := range obs.receiversByEndpointID.Get(e.ID()) { | ||
if err := obs.runner.shutdown(rcvr); err != nil { | ||
obs.logger.Error("failed to stop receiver", zap.Reflect("receiver", rcvr)) | ||
continue | ||
} | ||
} | ||
obs.receiversByEndpointID.RemoveAll(e.ID()) | ||
} | ||
} | ||
|
||
// OnChange responds to endpoint change notifications. | ||
func (obs *observerHandler) OnChange(changed []observer.Endpoint) { | ||
// TODO: optimize to only restart if effective config has changed. | ||
obs.OnRemove(changed) | ||
obs.OnAdd(changed) | ||
} |
Oops, something went wrong.