Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/observer/k8sobserver] add k8s.ingress endpoint #33005

Merged
merged 12 commits into from
Jun 27, 2024
27 changes: 27 additions & 0 deletions .chloggen/feat_k8sobserver_ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobserver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for k8s.ingress endpoint.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32971]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
39 changes: 39 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
PodType EndpointType = "pod"
// K8sServiceType is a service endpoint.
K8sServiceType EndpointType = "k8s.service"
// K8sIngressType is a ingress endpoint.
K8sIngressType EndpointType = "k8s.ingress"
// K8sNodeType is a Kubernetes Node endpoint.
K8sNodeType EndpointType = "k8s.node"
// HostPortType is a hostport endpoint.
Expand Down Expand Up @@ -129,6 +131,43 @@ func (s *K8sService) Type() EndpointType {
return K8sServiceType
}

// K8sIngress is a discovered k8s ingress.
type K8sIngress struct {
// Name of the ingress.
Name string
// UID is the unique ID in the cluster for the ingress.
UID string
// Labels is a map of user-specified metadata.
Labels map[string]string
// Annotations is a map of user-specified metadata.
Annotations map[string]string
// Namespace must be unique for ingress with same name.
Namespace string
// Scheme represents whether the ingress path is accessible via HTTPS or HTTP.
Scheme string
// Host is the fully qualified domain name of a network host
Host string
// Path that map requests to backends
Path string
}

func (s *K8sIngress) Env() EndpointEnv {
return map[string]any{
"uid": s.UID,
"name": s.Name,
"labels": s.Labels,
"annotations": s.Annotations,
"namespace": s.Namespace,
"scheme": s.Scheme,
"host": s.Host,
"path": s.Path,
}
}

func (s *K8sIngress) Type() EndpointType {
return K8sIngressType
}

// Pod is a discovered k8s pod.
type Pod struct {
// Name of the pod.
Expand Down
6 changes: 4 additions & 2 deletions extension/observer/k8sobserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<!-- end autogenerated section -->

The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
Kubernetes pod, port, service and node endpoints via the Kubernetes API.
Kubernetes pod, port, service, ingress and node endpoints via the Kubernetes API.

## Example Config

Expand All @@ -25,6 +25,7 @@ extensions:
observe_pods: true
observe_nodes: true
observe_services: true
observe_ingresses: true

receivers:
receiver_creator:
Expand Down Expand Up @@ -70,4 +71,5 @@ All fields are optional.
| node | string | <no value> | The node name to limit the discovery of pod, port, and node endpoints. Providing no value (the default) results in discovering endpoints for all available nodes. |
| observe_pods | bool | `true` | Whether to report observer pod and port endpoints. If `true` and `node` is specified it will only discover pod and port endpoints whose `spec.nodeName` matches the provided node name. If `true` and `node` isn't specified, it will discover all available pod and port endpoints. Please note that Collector connectivity to pods from other nodes is dependent on your cluster configuration and isn't guaranteed. |
| observe_nodes | bool | `false` | Whether to report observer k8s.node endpoints. If `true` and `node` is specified it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and `node` isn't specified, it will discover all available node endpoints. Please note that Collector connectivity to nodes is dependent on your cluster configuration and isn't guaranteed.|
| observe_services | bool | `false` | Whether to report observer k8s.service endpoints.|
| observe_services | bool | `false` | Whether to report observer k8s.service endpoints.|
| observe_ingresses | bool | `false` | Whether to report observer k8s.ingress endpoints.|
6 changes: 4 additions & 2 deletions extension/observer/k8sobserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ type Config struct {
ObserveNodes bool `mapstructure:"observe_nodes"`
// ObserveServices determines whether to report observer service and port endpoints. `false` by default.
ObserveServices bool `mapstructure:"observe_services"`
// ObserveIngresses determines whether to report observer ingress. `false` by default.
ObserveIngresses bool `mapstructure:"observe_ingresses"`
}

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
if !cfg.ObservePods && !cfg.ObserveNodes && !cfg.ObserveServices {
return fmt.Errorf("one of observe_pods, observe_nodes and observe_services must be true")
if !cfg.ObservePods && !cfg.ObserveNodes && !cfg.ObserveServices && !cfg.ObserveIngresses {
return fmt.Errorf("one of observe_pods, observe_nodes, observe_services and observe_ingresses must be true")
}
return nil
}
13 changes: 7 additions & 6 deletions extension/observer/k8sobserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "observe-all"),
expected: &Config{
Node: "",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
ObservePods: true,
ObserveNodes: true,
ObserveServices: true,
Node: "",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
ObservePods: true,
ObserveNodes: true,
ObserveServices: true,
ObserveIngresses: true,
},
},
{
Expand All @@ -52,7 +53,7 @@ func TestLoadConfig(t *testing.T) {
},
{
id: component.NewIDWithName(metadata.Type, "invalid_no_observing"),
expectedErr: "one of observe_pods, observe_nodes and observe_services must be true",
expectedErr: "one of observe_pods, observe_nodes, observe_services and observe_ingresses must be true",
},
}
for _, tt := range tests {
Expand Down
20 changes: 20 additions & 0 deletions extension/observer/k8sobserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
Expand Down Expand Up @@ -49,6 +50,8 @@ func (h *handler) OnAdd(objectInterface any, _ bool) {
endpoints = convertPodToEndpoints(h.idNamespace, object)
case *v1.Service:
endpoints = convertServiceToEndpoints(h.idNamespace, object)
case *networkingv1.Ingress:
endpoints = convertIngressToEndpoints(h.idNamespace, object)
case *v1.Node:
endpoints = append(endpoints, convertNodeToEndpoint(h.idNamespace, object))
default: // unsupported
Expand Down Expand Up @@ -92,6 +95,19 @@ func (h *handler) OnUpdate(oldObjectInterface, newObjectInterface any) {
newEndpoints[e.ID] = e
}

case *networkingv1.Ingress:
newIngress, ok := newObjectInterface.(*networkingv1.Ingress)
if !ok {
h.logger.Warn("skip updating endpoint for ingress as the update is of different type", zap.Any("oldIngress", oldObjectInterface), zap.Any("newObject", newObjectInterface))
return
}
for _, e := range convertIngressToEndpoints(h.idNamespace, oldObject) {
oldEndpoints[e.ID] = e
}
for _, e := range convertIngressToEndpoints(h.idNamespace, newIngress) {
newEndpoints[e.ID] = e
}

case *v1.Node:
newNode, ok := newObjectInterface.(*v1.Node)
if !ok {
Expand Down Expand Up @@ -165,6 +181,10 @@ func (h *handler) OnDelete(objectInterface any) {
if object != nil {
endpoints = convertServiceToEndpoints(h.idNamespace, object)
}
case *networkingv1.Ingress:
if object != nil {
endpoints = convertIngressToEndpoints(h.idNamespace, object)
}
case *v1.Node:
if object != nil {
endpoints = append(endpoints, convertNodeToEndpoint(h.idNamespace, object))
Expand Down
64 changes: 64 additions & 0 deletions extension/observer/k8sobserver/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,70 @@ func TestServiceEndpointsChanged(t *testing.T) {
}, th.ListEndpoints())
}

func TestIngressEndpointsAdded(t *testing.T) {
th := newTestHandler()
th.OnAdd(ingress, true)
assert.ElementsMatch(t, []observer.Endpoint{
{
ID: "test-1/ingress-1-UID/host-1/",
Target: "https://host-1/",
Details: &observer.K8sIngress{
Name: "application-ingress",
Namespace: "default",
UID: "test-1/ingress-1-UID/host-1/",
Labels: map[string]string{"env": "prod"},
Scheme: "https",
Host: "host-1",
Path: "/",
},
}}, th.ListEndpoints())
}

func TestIngressEndpointsRemoved(t *testing.T) {
th := newTestHandler()
th.OnAdd(ingress, true)
th.OnDelete(ingress)
assert.Empty(t, th.ListEndpoints())
}

func TestIngressEndpointsChanged(t *testing.T) {
th := newTestHandler()
// Nothing changed.
th.OnUpdate(ingress, ingress)
require.Empty(t, th.ListEndpoints())

// Labels changed.
changedLabels := ingress.DeepCopy()
changedLabels.Labels["new-label"] = "value"
th.OnUpdate(ingress, changedLabels)

endpoints := th.ListEndpoints()
require.ElementsMatch(t,
[]observer.EndpointID{"test-1/ingress-1-UID/host-1/"},
[]observer.EndpointID{endpoints[0].ID},
)

// Running state changed, one added and one removed.
updatedIngress := ingress.DeepCopy()
updatedIngress.Labels["updated-label"] = "true"
th.OnUpdate(ingress, updatedIngress)
require.ElementsMatch(t, []observer.Endpoint{
{
ID: "test-1/ingress-1-UID/host-1/",
Target: "https://host-1/",
Details: &observer.K8sIngress{
Name: "application-ingress",
Namespace: "default",
UID: "test-1/ingress-1-UID/host-1/",
Labels: map[string]string{"env": "prod", "updated-label": "true"},
Scheme: "https",
Host: "host-1",
Path: "/",
},
},
}, th.ListEndpoints())
}

func TestNodeEndpointsAdded(t *testing.T) {
th := newTestHandler()
th.OnAdd(node1V1, true)
Expand Down
103 changes: 103 additions & 0 deletions extension/observer/k8sobserver/ingress_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"

import (
"fmt"
"net/url"
"strings"

v1 "k8s.io/api/networking/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

// convertIngressToEndpoints converts a ingress instance into a slice of endpoints. The endpoints
// include an endpoint for each path that is mapped to an ingress.
func convertIngressToEndpoints(idNamespace string, ingress *v1.Ingress) []observer.Endpoint {
endpoints := []observer.Endpoint{}

// Loop through every ingress rule to get every defined path.
for _, rule := range ingress.Spec.Rules {
scheme := getScheme(rule.Host, getTLSHosts(ingress))

if rule.HTTP != nil {
// Create endpoint for each ingress rule.
for _, path := range rule.HTTP.Paths {
endpointID := observer.EndpointID(fmt.Sprintf("%s/%s/%s%s", idNamespace, ingress.UID, rule.Host, path.Path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to escape the backslashes here and be explicit with the ID format to avoid any kind of unlikely ID collision?

Suggested change
endpointID := observer.EndpointID(fmt.Sprintf("%s/%s/%s%s", idNamespace, ingress.UID, rule.Host, path.Path))
endpointID := observer.EndpointID(fmt.Sprintf("%s/%s/%s/%s", idNamespace, ingress.UID, rule.Host, escBackslashes(path.Path)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know; that was my initial question. How do other components deal with that ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just need to ensure the uniqueness of the endpoint. I don't think the escaping is required. We shouldn't run into any conflicts without it

endpoints = append(endpoints, observer.Endpoint{
ID: endpointID,
Target: (&url.URL{
Scheme: scheme,
Host: rule.Host,
Path: path.Path,
}).String(),
Details: &observer.K8sIngress{
Name: ingress.Name,
UID: string(endpointID),
Labels: ingress.Labels,
Annotations: ingress.Annotations,
Namespace: ingress.Namespace,
Scheme: scheme,
Host: rule.Host,
Path: path.Path,
},
})
}
}

}

return endpoints
}

// getTLSHosts return a list of tls hosts for an ingress ressource.
func getTLSHosts(i *v1.Ingress) []string {
var hosts []string

for _, tls := range i.Spec.TLS {
hosts = append(hosts, tls.Hosts...)
}

return hosts
}

// matchesHostPattern returns true if the host matches the host pattern or wildcard pattern.
func matchesHostPattern(pattern string, host string) bool {
// if host match the pattern (host pattern).
if pattern == host {
return true
}

// if string does not contains any dot, don't do the next part as it's for wildcard pattern.
if !strings.Contains(host, ".") {
return false
}

patternParts := strings.Split(pattern, ".")
hostParts := strings.Split(host, ".")

// If the first part of the pattern is not a wildcard pattern.
if patternParts[0] != "*" {
return false
}

// If host and pattern without wildcard part does not match.
if strings.Join(patternParts[1:], ".") != strings.Join(hostParts[1:], ".") {
return false
}

return true
}

// getScheme return the scheme of an ingress host based on tls configuration.
func getScheme(host string, tlsHosts []string) string {
for _, pattern := range tlsHosts {
if matchesHostPattern(pattern, host) {
return "https"
}
}

return "http"
}
Loading