Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - running CI #42793

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1bef168
wip
AndersonQ Jan 30, 2025
c1802e8
wip
AndersonQ Jan 31, 2025
5734db9
WIP - metrics added to input_metrics.json on filestream
AndersonQ Feb 3, 2025
df0bb94
wip - remove logs when publishing events
AndersonQ Feb 4, 2025
3520ff9
wip
AndersonQ Feb 4, 2025
f275100
wip
AndersonQ Feb 5, 2025
dbd124e
WIP - read and write to the beat instance scoped metric namespace
AndersonQ Feb 5, 2025
44d316f
wip
AndersonQ Feb 6, 2025
534fd00
clean up
AndersonQ Feb 6, 2025
632c1a9
fix tests
AndersonQ Feb 6, 2025
5544f4b
license headers
AndersonQ Feb 6, 2025
007b2d0
add tests
AndersonQ Feb 7, 2025
9266d0b
clean up
AndersonQ Feb 7, 2025
8c0d9da
using stream_id instead of input_id
AndersonQ Feb 7, 2025
bcb0925
add TODO
AndersonQ Feb 10, 2025
9752245
add bench
AndersonQ Feb 10, 2025
2a69020
.
AndersonQ Feb 10, 2025
c4afef9
passing down inputID
AndersonQ Feb 10, 2025
514287d
add integration test
AndersonQ Feb 11, 2025
75dcabe
add total events per input, clean up and refactor
AndersonQ Feb 11, 2025
e4ffbf1
add tests for the inputs/ monitoring endpoint
AndersonQ Feb 11, 2025
689d62b
add inputID when creating the pipeline client
AndersonQ Feb 11, 2025
a93b906
passing down the input context on awss3 input
AndersonQ Feb 11, 2025
4281c7e
wip
AndersonQ Feb 11, 2025
7f2c5b4
wip - where to add tests
AndersonQ Feb 11, 2025
6984ad3
wip
AndersonQ Feb 12, 2025
4bd2b6e
wip: cel: integration tests
AndersonQ Feb 13, 2025
56cbe18
add cel integration tests and move tests to x-pack
AndersonQ Feb 14, 2025
6a0d179
add httpjson test
AndersonQ Feb 14, 2025
f48ee59
clean up
AndersonQ Feb 17, 2025
8d09ef5
fix tests
AndersonQ Feb 17, 2025
b9991e2
add license header
AndersonQ Feb 17, 2025
c0348b4
add changelog
AndersonQ Feb 17, 2025
9fc4e2e
add docs for each input
AndersonQ Feb 17, 2025
999c1e2
making the linter happy
AndersonQ Feb 19, 2025
a504e9f
add eventID to publisher.Event
AndersonQ Feb 19, 2025
f1568b3
add new methods to collect metric by input
AndersonQ Feb 19, 2025
6ddf4bb
added metrics to ES output - so far so good
AndersonQ Feb 19, 2025
3330e9a
add ObserverInputAware interface
AndersonQ Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
AndersonQ committed Feb 17, 2025
commit f27510075924cf2c453f68b0c60f0e7edbcd870c
40 changes: 28 additions & 12 deletions libbeat/monitoring/inputmon/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ const (
)

type handler struct {
registry *monitoring.Registry
registryDataset *monitoring.Registry
registryInternal *monitoring.Registry
}

// AttachHandler attaches an HTTP handler to the given mux.Router to handle
// requests to /inputs.
func AttachHandler(r *mux.Router) error {
return attachHandler(r, globalRegistry())
return attachHandler(r, globalDatasetRegistry(), globalInternalRegistry())
}

func attachHandler(r *mux.Router, registry *monitoring.Registry) error {
h := &handler{registry: registry}
func attachHandler(r *mux.Router, datasetReg, internalReg *monitoring.Registry) error {
h := &handler{registryDataset: datasetReg, registryInternal: internalReg}
r = r.PathPrefix(route).Subrouter()
return r.StrictSlash(true).Handle("/", validationHandler("GET", []string{"pretty", "type"}, h.allInputs)).GetError()
}
Expand All @@ -64,17 +65,15 @@ func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) {
return
}

filtered := filteredSnapshot(h.registry, requestedType)
// global := filteredSnapshot(globalRegistry(), "")
//
// logp.L().Info("global:", global)
// logp.L().Info("filtered:", filtered)
filtered := filteredSnapshot(
h.registryDataset, h.registryInternal, requestedType)

w.Header().Set(contentType, applicationJSON)
serveJSON(w, filtered, requestedPretty)
}

func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string]any {
metrics := monitoring.CollectStructSnapshot(r, monitoring.Full, false)
func filteredSnapshot(dataset, internal *monitoring.Registry, requestedType string) []map[string]any {
metrics := monitoring.CollectStructSnapshot(dataset, monitoring.Full, false)

filtered := make([]map[string]any, 0, len(metrics))
for _, ifc := range metrics {
Expand All @@ -84,7 +83,8 @@ func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string
}

// Require all entries to have an 'input' and 'id' to be accessed through this API.
if id, ok := m["id"].(string); !ok || id == "" {
id, ok := m["id"].(string)
if !ok || id == "" {
continue
}

Expand All @@ -93,11 +93,27 @@ func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string
continue
}

// merge the internal namespace if found
mergeInternalMetrics(internal, id, m)

filtered = append(filtered, m)
}
return filtered
}

// mergeInternalMetrics looks
func mergeInternalMetrics(internal *monitoring.Registry, id string, m map[string]any) {
intInput := monitoring.CollectStructSnapshot(internal, monitoring.Full, false)
if rawInputPublishMetrics, found := intInput[id]; found {
inputPublishMetrics, inputMetrincsOk := rawInputPublishMetrics.(map[string]any)
if inputMetrincsOk {
for k, v := range inputPublishMetrics {
m[k] = v
}
}
}
}

func serveJSON(w http.ResponseWriter, value any, pretty bool) {
w.Header().Set(contentType, applicationJSON)
enc := json.NewEncoder(w)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/monitoring/inputmon/httphandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func BenchmarkHandlers(b *testing.B) {
monitoring.NewInt(reg, "gauge").Set(int64(i))
}

h := &handler{registry: reg}
h := &handler{registryDataset: reg}

b.Run("allInputs", func(b *testing.B) {
req := httptest.NewRequest(http.MethodGet, "/inputs/", nil)
Expand Down
24 changes: 17 additions & 7 deletions libbeat/monitoring/inputmon/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (

"github.com/gofrs/uuid/v5"

libbeatmonitoring "github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

// NewInputRegistry returns a new monitoring.Registry for metrics related to
// an input instance. The returned registry will be initialized with a static
// an input instance. The returned registryDataset will be initialized with a static
// string values for the input and id. When the input stops it should invoke
// the returned cancel function to unregister the metrics. For testing purposes
// an optional monitoring.Registry may be provided as an alternative to using
Expand All @@ -39,12 +40,12 @@ func NewInputRegistry(inputType, id string, optionalParent *monitoring.Registry)
// Use the default registry unless one was provided (this would be for testing).
parentRegistry := optionalParent
if parentRegistry == nil {
parentRegistry = globalRegistry()
parentRegistry = globalDatasetRegistry()
}

// If an ID has not been assigned to an input then metrics cannot be exposed
// in the global metric registry. The returned registry still behaves the same.
if (id == "" || inputType == "") && parentRegistry == globalRegistry() {
if (id == "" || inputType == "") && parentRegistry == globalDatasetRegistry() {
// Null route metrics without ID or input type.
parentRegistry = monitoring.NewRegistry()
}
Expand Down Expand Up @@ -76,12 +77,21 @@ func sanitizeID(id string) string {
return strings.ReplaceAll(id, ".", "_")
}

func globalRegistry() *monitoring.Registry {
return monitoring.GetNamespace("dataset").GetRegistry()
func globalDatasetRegistry() *monitoring.Registry {
return globalRegistry("dataset")
}

func globalInternalRegistry() *monitoring.Registry {
return globalRegistry(libbeatmonitoring.RegistryNameInternalInputs)
}

func globalRegistry(namespace string) *monitoring.Registry {
return monitoring.GetNamespace(namespace).GetRegistry()
}

// MetricSnapshotJSON returns a snapshot of the input metric values from the
// global 'dataset' monitoring namespace encoded as a JSON array (pretty formatted).
// global 'dataset' and 'internal' monitoring namespace merged and encoded as a
// JSON array (pretty formatted).
func MetricSnapshotJSON() ([]byte, error) {
return json.MarshalIndent(filteredSnapshot(globalRegistry(), ""), "", " ")
return json.MarshalIndent(filteredSnapshot(globalDatasetRegistry(), globalInternalRegistry(), ""), "", " ")
}
16 changes: 8 additions & 8 deletions libbeat/monitoring/inputmon/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func TestNewInputMonitor(t *testing.T) {
{Input: inputType, ID: "", PublicMetrics: false},
{Input: "", ID: "", PublicMetrics: false},

{Input: inputType, ID: id, OptionalParent: globalRegistry(), PublicMetrics: true},
{Input: "", ID: id, OptionalParent: globalRegistry(), PublicMetrics: false},
{Input: inputType, ID: "", OptionalParent: globalRegistry(), PublicMetrics: false},
{Input: "", ID: "", OptionalParent: globalRegistry(), PublicMetrics: false},
{Input: inputType, ID: id, OptionalParent: globalDatasetRegistry(), PublicMetrics: true},
{Input: "", ID: id, OptionalParent: globalDatasetRegistry(), PublicMetrics: false},
{Input: inputType, ID: "", OptionalParent: globalDatasetRegistry(), PublicMetrics: false},
{Input: "", ID: "", OptionalParent: globalDatasetRegistry(), PublicMetrics: false},

{Input: inputType, ID: id, OptionalParent: monitoring.NewRegistry(), PublicMetrics: false},
{Input: "", ID: id, OptionalParent: monitoring.NewRegistry(), PublicMetrics: false},
Expand All @@ -66,12 +66,12 @@ func TestNewInputMonitor(t *testing.T) {
assert.NotNil(t, reg)

// Verify that metrics are registered when a custom parent registry is given.
if tc.OptionalParent != nil && tc.OptionalParent != globalRegistry() {
if tc.OptionalParent != nil && tc.OptionalParent != globalDatasetRegistry() {
assert.NotNil(t, tc.OptionalParent.Get(tc.ID))
}

// Verify whether the metrics are exposed in the global registry which makes the public.
parent := globalRegistry().GetRegistry(tc.ID)
parent := globalDatasetRegistry().GetRegistry(tc.ID)
if tc.PublicMetrics {
assert.NotNil(t, parent)
} else {
Expand All @@ -82,9 +82,9 @@ func TestNewInputMonitor(t *testing.T) {
}

func TestMetricSnapshotJSON(t *testing.T) {
require.NoError(t, globalRegistry().Clear())
require.NoError(t, globalDatasetRegistry().Clear())
t.Cleanup(func() {
require.NoError(t, globalRegistry().Clear())
require.NoError(t, globalDatasetRegistry().Clear())
})

r, cancel := NewInputRegistry("test", "my-id", nil)
Expand Down
5 changes: 5 additions & 0 deletions libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package monitoring

const (
RegistryNameInternalInputs = "internal.inputs"
)
40 changes: 18 additions & 22 deletions libbeat/publisher/pipeline/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/elastic/beats/v7/libbeat/beat"
libbeatmonitoring "github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand Down Expand Up @@ -201,6 +202,7 @@ func (o *metricsObserver) failedPublishEvent(e beat.Event) {
input.inputEventsDropped.Inc()
}

// TODO: remove metricName
func (o *metricsObserver) ensureInputMetric(e beat.Event, metricName string) *inputVars {
// TODO:
// - find the right global registry to add the metrics to. dataset.inputID sanitized. See inputmon.NewInputRegistry()
Expand All @@ -217,24 +219,7 @@ func (o *metricsObserver) ensureInputMetric(e beat.Event, metricName string) *in
return nil
}

// rawFieldInput, err := e.Fields.GetValue(beat.FieldsKeyInput)
// if err != nil {
// return nil // again, nothing we can do about it
// }
// fieldInput, ok := rawFieldInput.(mapstr.M)
// if !ok {
// // again, nothing we can do about it
// return nil
// }
// rawType, err := fieldInput.GetValue("type")
// if err != nil {
// return nil // again, nothing we can do about it
// }
// fieldType, ok := rawType.(string)
// if !ok {
// return nil // again, nothing we can do about it
// }

// for debug, remove it
datasetReg := monitoring.GetNamespace("dataset").GetRegistry()
sanatizedID := strings.ReplaceAll(inputID, ".", "_")
inputReg := datasetReg.GetRegistry(sanatizedID)
Expand All @@ -247,14 +232,25 @@ func (o *metricsObserver) ensureInputMetric(e beat.Event, metricName string) *in

input, found := o.vars.inputs[inputID]
if !found {
reg := o.metrics.GetRegistry("pipeline")
intReg := monitoring.
GetNamespace(libbeatmonitoring.RegistryNameInternalInputs).
GetRegistry()
reg := intReg.GetRegistry(inputID)
if reg == nil {
reg = intReg.NewRegistry(inputID)
}

input = inputVars{
inputEventsFiltered: monitoring.NewUint(reg, "inputs."+inputID+".events.filtered"),
inputEventsPublished: monitoring.NewUint(reg, "inputs."+inputID+".events.published"),
inputEventsDropped: monitoring.NewUint(reg, "events_dropped_total"),
inputEventsFiltered: monitoring.NewUint(reg, "events_filtered_total"),
inputEventsPublished: monitoring.NewUint(reg, "events_published_total"),
}
o.vars.inputs[inputID] = input
}

reg := monitoring.GetRegistry(libbeatmonitoring.RegistryNameInternalInputs)
if reg != nil {
_ = reg
}
return &input
}

Expand Down