Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions internal/generator/vector/conf/complex.toml
Original file line number Diff line number Diff line change
Expand Up @@ -725,10 +725,12 @@ source = '''
} else {
._internal.event = parsed
if exists(._internal.event.event) && is_object(._internal.event.event) {
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = ._internal.event.verb
._internal.message = del(._internal.kubernetes.event.message)
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = del(._internal.event.verb)
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
Expand Down
10 changes: 6 additions & 4 deletions internal/generator/vector/conf/complex_http_receiver.toml
Original file line number Diff line number Diff line change
Expand Up @@ -756,10 +756,12 @@ source = '''
} else {
._internal.event = parsed
if exists(._internal.event.event) && is_object(._internal.event.event) {
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = ._internal.event.verb
._internal.message = del(._internal.kubernetes.event.message)
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = del(._internal.event.verb)
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
Expand Down
10 changes: 6 additions & 4 deletions internal/generator/vector/conf/container.toml
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,12 @@ source = '''
} else {
._internal.event = parsed
if exists(._internal.event.event) && is_object(._internal.event.event) {
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = ._internal.event.verb
._internal.message = del(._internal.kubernetes.event.message)
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = del(._internal.event.verb)
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
Expand Down
10 changes: 6 additions & 4 deletions internal/generator/vector/filter/openshift/viaq/v1/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ const (
HandleEventRouterLog = `

if exists(._internal.kubernetes.pod_name) && starts_with(string!(._internal.kubernetes.pod_name), "eventrouter-") {

parsed, err = parse_json(._internal.message)
if err != null {
log("Unable to process EventRouter log: " + err, level: "info")
} else {
._internal.event = parsed
if exists(._internal.event.event) && is_object(._internal.event.event) {
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = ._internal.event.verb
._internal.message = del(._internal.kubernetes.event.message)
._internal.kubernetes.event.verb = del(._internal.event.verb)
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
Expand Down Expand Up @@ -147,5 +150,4 @@ if !exists(._internal.structured) {
SetOpenShiftOnRoot = `
if exists(._internal.openshift) {.openshift = ._internal.openshift}
if exists(._internal.dedot_openshift_labels) {.openshift.labels = del(._internal.dedot_openshift_labels) }
`
)
`)
77 changes: 52 additions & 25 deletions test/functional/normalization/eventrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package normalization

import (
"encoding/json"
"github.com/openshift/cluster-logging-operator/test/framework/functional"
testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"
"strings"
"time"

"github.com/openshift/cluster-logging-operator/test/framework/functional"
"github.com/openshift/cluster-logging-operator/test/helpers/syslog"
testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand All @@ -28,10 +30,10 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func()
templateForAnyKubernetesWithEvents = types.KubernetesWithEvent{
Kubernetes: functional.TemplateForAnyKubernetes,
}
NewEventDataBuilder = func(verb string, podRef *corev1.ObjectReference) types.EventData {
newEvent := types.NewEvent(podRef, corev1.EventTypeNormal, "reason", "amessage")
NewEventDataBuilder = func(verb, message string, podRef *corev1.ObjectReference) types.EventData {
newEvent := types.NewEvent(podRef, corev1.EventTypeNormal, "reason", message)
if verb == "UPDATED" {
oldEvent := types.NewEvent(podRef, corev1.EventTypeWarning, "old_reason", "old_message")
oldEvent := types.NewEvent(podRef, corev1.EventTypeWarning, "old_reason", "old_" + message)
return types.EventData{Verb: "UPDATED", Event: newEvent, OldEvent: oldEvent}
} else {
return types.EventData{Verb: "ADDED", Event: newEvent}
Expand Down Expand Up @@ -71,47 +73,72 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func()

return tmpl
}

parseLogs = func(raw []string, outputType obs.OutputType) ([]types.EventRouterLog, error) {
var logs []types.EventRouterLog
if outputType == obs.OutputTypeHTTP {
err := types.StrictlyParseLogs(utils.ToJsonLogs(raw), &logs)
return logs, err
} else if outputType == obs.OutputTypeSyslog {
jsStr := make([]string, len(raw))
for i, s := range raw {
s, _ := syslog.ParseRFC5424SyslogLogs(s)
jsStr[i] = s.MessagePayload
}
err := types.StrictlyParseLogs(utils.ToJsonLogs(jsStr), &logs)
return logs, err
}
return nil, nil
}
)

BeforeEach(func() {
DescribeTable("should be normalized to the ViaQ data model when sinking to different outputs", func(outputType obs.OutputType, verb, message, expectedMessage string) {
framework = functional.NewCollectorFunctionalFramework()
testruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeApplication).
ToHttpOutput()
// vector only collects logs using pods, namespaces, containers it knows about.
builder := testruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeApplication)
if outputType == obs.OutputTypeHTTP {
builder.ToHttpOutput()
}
if outputType == obs.OutputTypeSyslog {
builder.ToSyslogOutput(obs.SyslogRFC5424)
}

writeMsg = func(msg string) error {
return framework.WriteMessagesToApplicationLog(msg, 1)
}
framework.VisitConfig = func(conf string) string {
return strings.Replace(conf, `"eventrouter-"`, `"functional"`, 1)
}
Expect(framework.Deploy()).To(BeNil())

})
AfterEach(func() {
framework.Cleanup()
})

DescribeTable("should be normalized to the VIAQ data model", func(verb string) {
podRef, err := reference.GetReference(scheme.Scheme, types.NewMockPod())
Expect(err).To(BeNil())
newEventData := NewEventDataBuilder(verb, podRef)
newEventData := NewEventDataBuilder(verb, message, podRef)
jsonBytes, _ := json.Marshal(newEventData)
jsonStr := string(jsonBytes)
msg := functional.NewCRIOLogMessage(timestamp, jsonStr, false)
err = writeMsg(msg)
Expect(err).To(BeNil())

raw, err := framework.ReadRawApplicationLogsFrom(string(obs.OutputTypeHTTP))
raw, err := framework.ReadRawApplicationLogsFrom(string(outputType))
Expect(err).To(BeNil(), "Expected no errors reading the logs")
var logs []types.EventRouterLog
err = types.StrictlyParseLogs(utils.ToJsonLogs(raw), &logs)
logs, err := parseLogs(raw, outputType)
Expect(err).To(BeNil(), "Expected no errors parsing the logs")
var expectedLogTemplate = ExpectedLogTemplateBuilder(newEventData.Event, newEventData.OldEvent)
expectedEventData := newEventData
expectedEventData.Event.Message = expectedMessage
var expectedLogTemplate = ExpectedLogTemplateBuilder(expectedEventData.Event, expectedEventData.OldEvent)
Expect(logs[0]).To(matchers.FitLogFormatTemplate(expectedLogTemplate))
},
Entry("for ADDED events", "ADDED"),
Entry("for UPDATED events", "UPDATED"),
Entry("with HTTP output for ADDED events", obs.OutputTypeHTTP, "ADDED", "simple syslog message", "simple syslog message"),
Entry("with HTTP output for UPDATED events", obs.OutputTypeHTTP, "UPDATED", "simple syslog message", "simple syslog message"),
Entry("with Syslog output for ADDED events", obs.OutputTypeSyslog, "ADDED", "simple syslog message", "simple syslog message"),
Entry("with Syslog output for UPDATED events", obs.OutputTypeSyslog, "UPDATED", "simple syslog message", "simple syslog message"),
Entry("with Syslog output for ADDED events and new line symbol", obs.OutputTypeSyslog, "ADDED", "syslog message\n with new line", "syslog message\\n with new line"),
Entry("with Syslog output for UPDATED events and new line symbol", obs.OutputTypeSyslog, "UPDATED", "syslog message\n with new line", "syslog message\\n with new line"),
)

})
AfterEach(func() {
if framework != nil {
framework.Cleanup()
}
})
})
63 changes: 63 additions & 0 deletions test/helpers/syslog/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package syslog

import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
)
type SyslogMessage struct {
Priority int
Version int
Timestamp time.Time
Hostname string
AppName string
ProcID string
MsgID string
StructuredData string
MessagePayload string
}

// ParseRFC5424SyslogLogs takes raw output and returns structured SyslogMessage objects.
func ParseRFC5424SyslogLogs(rawLog string) (*SyslogMessage, error) {
if rawLog == "" {
return &SyslogMessage{}, nil
}
rawLog = strings.TrimSpace(rawLog)
syslogRegex := regexp.MustCompile(
`^<(\d{1,3})>1\s` + //PRI VERSION
`([^\s]+)\s` + // TIMESTAMP
`([^\s]+)\s` + // HOSTNAME
`([^\s]+)\s` + // APP-NAME
`([^\s]+)\s` + // PROCID
`([^\s]+)\s` + // MSGID
`(-|\[[^\]]+\])\s` + // STRUCTURED-DATA
`(.*)$`) // MESSAGE-PAYLOAD

matches := syslogRegex.FindStringSubmatch(rawLog)

if len(matches) != 9 {
return nil, fmt.Errorf("failed to parse Syslog line: expected 9 submatches (got %d) in line: %s", len(matches), rawLog)
}
priority, err := strconv.Atoi(matches[1])
if err != nil {
return nil, err
}
ts, err := time.Parse(time.RFC3339Nano, matches[2])
if err != nil {
return nil, err
}
msg := &SyslogMessage{
Priority: priority,
Version: 1,
Timestamp: ts,
Hostname: matches[3],
AppName: matches[4],
ProcID: matches[5],
MsgID: matches[6],
StructuredData: matches[7],
MessagePayload: matches[8],
}
return msg, nil
}
63 changes: 63 additions & 0 deletions test/helpers/syslog/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package syslog

import (
"fmt"
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("[test][helpers][syslog] ParseRFC5424SyslogLogs", func() {

const (
testTime = "2025-11-24T10:30:00.123456Z"
expectedAppName = "eventrouter"
expectedHost = "my-test-node"
)

DescribeTable("should correctly parse valid RFC 5424 log lines",
func(rawLogLine, expectedJSONPayload, expectedStructuredData string, expectedPriority int) {
msg, err := ParseRFC5424SyslogLogs(rawLogLine)

Expect(err).NotTo(HaveOccurred(), "Parsing should succeed without error")
Expect(msg.Priority).To(Equal(expectedPriority), "Priority should match")
Expect(msg.Version).To(Equal(1), "Version should be 1")
Expect(msg.Hostname).To(Equal(expectedHost), "Hostname should match")
Expect(msg.AppName).To(Equal(expectedAppName), "App-Name should match")
Expect(msg.StructuredData).To(Equal(expectedStructuredData), "Structured Data should match")

expectedTime, _ := time.Parse(time.RFC3339Nano, testTime)
Expect(msg.Timestamp).To(BeTemporally("~", expectedTime, time.Second), "Timestamp should be correctly parsed")

},
Entry("when SD is hyphen and payload is simple JSON",
fmt.Sprintf("<14>1 %s %s %s - - - %s", testTime, expectedHost, expectedAppName, `{"key":"value","event":"added"}`),
`{"key":"value","event":"added"}`,
"-",
14,
),
)

It("should return an error for malformed Syslog lines", func() {
// A log line missing the version number
malformedLog := fmt.Sprintf("<14>%s %s %s - - - %s", testTime, expectedHost, expectedAppName, `{"key":"value"}`)
msg, err := ParseRFC5424SyslogLogs(malformedLog)
Expect(err).To(HaveOccurred(), "Parsing should fail for a malformed line")
Expect(msg).To(BeNil(), "Should not return any parsed messages")
Expect(err.Error()).To(ContainSubstring("expected 9 submatches"), "Error message should indicate regex failure")
})

It("should return an error for badly formatted timestamps", func() {
badTimeLog := fmt.Sprintf("<14>1 %s %s %s - - - %s", "24/11/2025 10:30", expectedHost, expectedAppName, `{"key":"value"}`)
msg, err := ParseRFC5424SyslogLogs(badTimeLog)
Expect(err).To(HaveOccurred(), "Parsing should fail due to bad timestamp format")
Expect(msg).To(BeNil(), "Should not return any parsed messages")
})
})

func TestSyslogParser(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Syslog Parser Unit Test Suite")
}
1 change: 1 addition & 0 deletions test/helpers/syslog/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syslog
import (
_ "embed"
"fmt"

obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
"github.com/openshift/cluster-logging-operator/internal/runtime"
"github.com/openshift/cluster-logging-operator/test/framework/functional"
Expand Down