Skip to content

Commit

Permalink
Implement CWLogs log group classes (aws#968)
Browse files Browse the repository at this point in the history
  • Loading branch information
movence authored Nov 29, 2023
1 parent 398295b commit 333b1c8
Show file tree
Hide file tree
Showing 46 changed files with 607 additions and 128 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ replace github.com/go-kit/kit => github.com/go-kit/kit v0.12.1-0.20220808180842-
// openshift removed all tags from their repo, use the pseudoversion from the release-3.9 branch HEAD
replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37

replace github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.47.12
replace github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.48.6

require (
github.com/BurntSushi/toml v1.3.2
github.com/Jeffail/gabs v1.4.0
github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20231023230448-f645697bf350
github.com/aws/aws-sdk-go v1.47.12
github.com/aws/aws-sdk-go v1.48.6
github.com/aws/aws-sdk-go-v2 v1.23.0
github.com/aws/aws-sdk-go-v2/config v1.25.1
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.47.12 h1:1daICVijigVEXCzhg27A5d7hbkR4wODPGn9GHyBclKM=
github.com/aws/aws-sdk-go v1.47.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.48.6 h1:hnL/TE3eRigirDLrdRE9AWE1ALZSVLAsC4wK8TGsMqk=
github.com/aws/aws-sdk-go v1.48.6/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.9.1/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.23.0 h1:PiHAzmiQQr6JULBUdvR8fKlA+UPKLT/8KbiqpFBWiAo=
Expand Down
6 changes: 4 additions & 2 deletions logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ type LogSrc interface {
Destination() string
Description() string
Retention() int
Class() string
Stop()
}

// A LogBackend is able to return a LogDest of a given name.
// The same name should always return the same LogDest.
type LogBackend interface {
CreateDest(string, string, int) LogDest
CreateDest(string, string, int, string) LogDest
}

// A LogDest represents a final endpoint where log events are published to.
Expand Down Expand Up @@ -114,13 +115,14 @@ func (l *LogAgent) Run(ctx context.Context) {
logStream := src.Stream()
description := src.Description()
retention := src.Retention()
logGroupClass := src.Class()
backend, ok := l.backends[dname]
if !ok {
log.Printf("E! [logagent] Failed to find destination %s for log source %s/%s(%s) ", dname, logGroup, logStream, description)
continue
}
retention = l.checkRetentionAlreadyAttempted(retention, logGroup)
dest := backend.CreateDest(logGroup, logStream, retention)
dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass)
l.destNames[dest] = dname
log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention)
go l.runSrcToDest(src, dest)
Expand Down
2 changes: 2 additions & 0 deletions plugins/inputs/logfile/fileconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type FileConfig struct {
LogGroupName string `toml:"log_group_name"`
//log stream name
LogStreamName string `toml:"log_stream_name"`
//log group class
LogGroupClass string `toml:"log_group_class"`

//The regex of the timestampFromLogLine presents in the log entry
TimestampRegex string `toml:"timestamp_regex"`
Expand Down
33 changes: 33 additions & 0 deletions plugins/inputs/logfile/fileconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent/tool/util"
)

func TestFileConfigInit(t *testing.T) {
Expand All @@ -21,6 +23,7 @@ func TestFileConfigInit(t *testing.T) {
TimestampLayout: []string{"02 Jan 2006 15:04:05"},
Timezone: "UTC",
MultiLineStartPattern: "{timestamp_regex}",
LogGroupClass: util.StandardLogGroupClass,
}

err := fileConfig.init()
Expand All @@ -38,6 +41,7 @@ func TestFileConfigInit(t *testing.T) {
assert.True(t, fileConfig.MultiLineStartPatternP == fileConfig.TimestampRegexP, "The multiline start pattern should be the same as the timestampFromLogLine pattern.")

assert.Equal(t, time.UTC, fileConfig.TimezoneLoc, "The timezone location should be UTC.")
assert.Equal(t, util.StandardLogGroupClass, fileConfig.LogGroupClass)

assert.Nil(t, fileConfig.Filters)
}
Expand Down Expand Up @@ -70,6 +74,35 @@ func TestFileConfigInitFailureCase(t *testing.T) {
assert.Equal(t, "multi_line_start_pattern has issue, regexp: Compile( (\\d{2} \\w{3} \\d{4} \\d{2}:\\d{2}:\\d{2}+) ): error parsing regexp: invalid nested repetition operator: `{2}+`", err.Error())
}

func TestInfrequent_accessAndEmptyLogGroupClassInit(t *testing.T) {
fileConfig := &FileConfig{
FilePath: "/tmp/logfile.log",
LogGroupName: "logfile.log",
TimestampRegex: "(\\d{2} \\w{3} \\d{4} \\d{2}:\\d{2}:\\d{2}+)",
TimestampLayout: []string{"02 Jan 2006 15:04:05"},
Timezone: "UTC",
MultiLineStartPattern: "{timestamp_regex}",
LogGroupClass: util.InfrequentAccessLogGroupClass,
}

err := fileConfig.init()
assert.NotNil(t, err)
assert.Equal(t, util.InfrequentAccessLogGroupClass, fileConfig.LogGroupClass)

fileConfig = &FileConfig{
FilePath: "/tmp/logfile.log",
LogGroupName: "logfile.log",
TimestampRegex: "(\\d{2} \\w{3} \\d{4} \\d{2}:\\d{2}:\\d{2}+)",
TimestampLayout: []string{"02 Jan 2006 15:04:05"},
Timezone: "UTC",
MultiLineStartPattern: "{timestamp_regex}",
}

err = fileConfig.init()
assert.NotNil(t, err)
assert.Equal(t, "", fileConfig.LogGroupClass)
}

func TestLogGroupName(t *testing.T) {
filepath := "/tmp/logfile.log.2017-06-19-13"
expectLogGroup := "/tmp/logfile.log"
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/logfile/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {
groupName, streamName,
t.Destination,
t.getStateFilePath(filename),
fileconfig.LogGroupClass,
tailer,
fileconfig.AutoRemoval,
mlCheck,
Expand Down
11 changes: 9 additions & 2 deletions plugins/inputs/logfile/tailersrc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func (le LogEvent) Done() {
}

type tailerSrc struct {
group, stream string
group string
stream string
class string
destination string
stateFilePath string
tailer *tail.Tail
Expand All @@ -81,7 +83,7 @@ type tailerSrc struct {
var _ logs.LogSrc = (*tailerSrc)(nil)

func NewTailerSrc(
group, stream, destination, stateFilePath string,
group, stream, destination, stateFilePath, logClass string,
tailer *tail.Tail,
autoRemoval bool,
isMultilineStartFn func(string) bool,
Expand All @@ -97,6 +99,7 @@ func NewTailerSrc(
stream: stream,
destination: destination,
stateFilePath: stateFilePath,
class: logClass,
tailer: tailer,
autoRemoval: autoRemoval,
isMLStart: isMultilineStartFn,
Expand Down Expand Up @@ -141,6 +144,10 @@ func (ts *tailerSrc) Destination() string {
func (ts *tailerSrc) Retention() int {
return ts.retentionInDays
}

func (ts *tailerSrc) Class() string {
return ts.class
}
func (ts *tailerSrc) Done(offset fileOffset) {
// ts.offsetCh will only be blocked when the runSaveState func has exited,
// which only happens when the original file has been removed, thus making
Expand Down
7 changes: 5 additions & 2 deletions plugins/inputs/logfile/tailersrc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/inputs/logfile/tail"
"github.com/aws/amazon-cloudwatch-agent/profiler"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)

type tailerTestResources struct {
Expand Down Expand Up @@ -59,8 +60,8 @@ func TestTailerSrc(t *testing.T) {
require.Equal(t, beforeCount+1, tail.OpenFileCount.Load())
ts := NewTailerSrc(
"groupName", "streamName",
"destination",
statefile.Name(),
"destination", statefile.Name(),
util.InfrequentAccessLogGroupClass,
tailer,
false, // AutoRemoval
regexp.MustCompile("^[\\S]").MatchString,
Expand Down Expand Up @@ -171,6 +172,7 @@ func TestOffsetDoneCallBack(t *testing.T) {
"groupName", "streamName",
"destination",
statefile.Name(),
util.InfrequentAccessLogGroupClass,
tailer,
false, // AutoRemoval
regexp.MustCompile("^[\\S]").MatchString,
Expand Down Expand Up @@ -388,6 +390,7 @@ func setupTailer(t *testing.T, multiLineFn func(string) bool, maxEventSize int)
t.Name(),
t.Name(),
"destination",
util.InfrequentAccessLogGroupClass,
statefile.Name(),
tailer,
false, // AutoRemoval
Expand Down
2 changes: 2 additions & 0 deletions plugins/inputs/windows_event_log/windows_event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type EventConfig struct {
BatchReadSize int `toml:"batch_read_size"`
LogGroupName string `toml:"log_group_name"`
LogStreamName string `toml:"log_stream_name"`
LogGroupClass string `toml:"log_group_class"`
Destination string `toml:"destination"`
Retention int `toml:"retention_in_days"`
}
Expand Down Expand Up @@ -108,6 +109,7 @@ func (s *Plugin) Start(acc telegraf.Accumulator) error {
stateFilePath,
eventConfig.BatchReadSize,
eventConfig.Retention,
eventConfig.LogGroupClass,
)
err = eventLog.Init()
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion plugins/inputs/windows_event_log/wineventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type windowsEventLog struct {
levels []string
logGroupName string
logStreamName string
logGroupClass string
renderFormat string
maxToRead int // Maximum number returned in one read.
destination string
Expand All @@ -48,12 +49,13 @@ type windowsEventLog struct {
startOnce sync.Once
}

func NewEventLog(name string, levels []string, logGroupName, logStreamName, renderFormat, destination, stateFilePath string, maximumToRead int, retention int) *windowsEventLog {
func NewEventLog(name string, levels []string, logGroupName, logStreamName, renderFormat, destination, stateFilePath string, maximumToRead int, retention int, logGroupClass string) *windowsEventLog {
eventLog := &windowsEventLog{
name: name,
levels: levels,
logGroupName: logGroupName,
logStreamName: logStreamName,
logGroupClass: logGroupClass,
renderFormat: renderFormat,
maxToRead: maximumToRead,
destination: destination,
Expand Down Expand Up @@ -99,6 +101,11 @@ func (w *windowsEventLog) Destination() string {
func (w *windowsEventLog) Retention() int {
return w.retention
}

func (w *windowsEventLog) Class() string {
return w.logGroupClass
}

func (w *windowsEventLog) Stop() {
close(w.done)
}
Expand Down
17 changes: 9 additions & 8 deletions plugins/inputs/windows_event_log/wineventlog/wineventlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ var (
STATE_FILE_PATH = "fake"
BATCH_SIZE = 99
RETENTION = 42
LOG_GROUP_CLASS = "standard"
)

// TestNewEventLog verifies constructor's default values.
func TestNewEventLog(t *testing.T) {
elog := NewEventLog(NAME, LEVELS, GROUP_NAME, STREAM_NAME, RENDER_FMT, DEST,
STATE_FILE_PATH, BATCH_SIZE, RETENTION)
STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
assert.Equal(t, NAME, elog.name)
assert.Equal(t, uint64(0), elog.eventOffset)
assert.Zero(t, elog.eventHandle)
Expand All @@ -44,26 +45,26 @@ func TestNewEventLog(t *testing.T) {
func TestOpen(t *testing.T) {
// Happy path.
elog := NewEventLog(NAME, LEVELS, GROUP_NAME, STREAM_NAME, RENDER_FMT, DEST,
STATE_FILE_PATH, BATCH_SIZE, RETENTION)
STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
assert.NoError(t, elog.Open())
assert.NotZero(t, elog.eventHandle)
assert.NoError(t, elog.Close())
// Bad event log source name does not cause Open() to fail.
// But eventHandle will be 0 and Close() will fail because of it.
elog = NewEventLog("FakeBadElogName", LEVELS, GROUP_NAME, STREAM_NAME,
RENDER_FMT, DEST, STATE_FILE_PATH, BATCH_SIZE, RETENTION)
RENDER_FMT, DEST, STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
assert.NoError(t, elog.Open())
assert.Zero(t, elog.eventHandle)
assert.Error(t, elog.Close())
// bad LEVELS does not cause Open() to fail.
elog = NewEventLog(NAME, []string{"498"}, GROUP_NAME, STREAM_NAME,
RENDER_FMT, DEST, STATE_FILE_PATH, BATCH_SIZE, RETENTION)
RENDER_FMT, DEST, STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
assert.NoError(t, elog.Open())
assert.NotZero(t, elog.eventHandle)
assert.NoError(t, elog.Close())
// bad wlog.eventOffset does not cause Open() to fail.
elog = NewEventLog(NAME, []string{"498"}, GROUP_NAME, STREAM_NAME,
RENDER_FMT, DEST, STATE_FILE_PATH, BATCH_SIZE, RETENTION)
RENDER_FMT, DEST, STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
elog.eventOffset = 9987
assert.NoError(t, elog.Open())
assert.NotZero(t, elog.eventHandle)
Expand All @@ -74,7 +75,7 @@ func TestOpen(t *testing.T) {
// event log source.
func TestReadGoodSource(t *testing.T) {
elog := NewEventLog(NAME, LEVELS, GROUP_NAME, STREAM_NAME, RENDER_FMT, DEST,
STATE_FILE_PATH, BATCH_SIZE, RETENTION)
STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
assert.NoError(t, elog.Open())
seekToEnd(t, elog)
writeEvents(t, 10, true, "CWA_UnitTest111", 777)
Expand All @@ -87,7 +88,7 @@ func TestReadGoodSource(t *testing.T) {
// unregistered event log source.
func TestReadBadSource(t *testing.T) {
elog := NewEventLog(NAME, LEVELS, GROUP_NAME, STREAM_NAME, RENDER_FMT, DEST,
STATE_FILE_PATH, BATCH_SIZE, RETENTION)
STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
assert.NoError(t, elog.Open())
seekToEnd(t, elog)
writeEvents(t, 10, false, "CWA_UnitTest222", 888)
Expand All @@ -101,7 +102,7 @@ func TestReadBadSource(t *testing.T) {
// unregistered source too.
func TestReadWithBothSources(t *testing.T) {
elog := NewEventLog(NAME, LEVELS, GROUP_NAME, STREAM_NAME, RENDER_FMT, DEST,
STATE_FILE_PATH, BATCH_SIZE, RETENTION)
STATE_FILE_PATH, BATCH_SIZE, RETENTION, LOG_GROUP_CLASS)
assert.NoError(t, elog.Open())
seekToEnd(t, elog)
writeEvents(t, 10, true, "CWA_UnitTest111", 777)
Expand Down
11 changes: 7 additions & 4 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
"github.com/aws/amazon-cloudwatch-agent/internal"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)

const (
LogGroupNameTag = "log_group_name"
LogStreamNameTag = "log_stream_name"
LogGroupClassTag = "log_group_class"
LogTimestampField = "log_timestamp"
LogEntryField = "value"

Expand Down Expand Up @@ -103,7 +105,7 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
return nil
}

func (c *CloudWatchLogs) CreateDest(group, stream string, retention int) logs.LogDest {
func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGroupClass string) logs.LogDest {
if group == "" {
group = c.LogGroupName
}
Expand All @@ -118,6 +120,7 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int) logs.Lo
Group: group,
Stream: stream,
Retention: retention,
Class: logGroupClass,
}
return c.getDest(t)
}
Expand Down Expand Up @@ -203,7 +206,7 @@ func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error)
logStream = c.LogStreamName
}

return Target{logGroup, logStream, -1}, nil
return Target{logGroup, logStream, util.StandardLogGroupClass, -1}, nil
}

func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent {
Expand Down Expand Up @@ -354,8 +357,8 @@ func (cd *cwDest) setRetryer(r request.Retryer) {
}

type Target struct {
Group, Stream string
Retention int
Group, Stream, Class string
Retention int
}

// Description returns a one-sentence description on the Output
Expand Down
Loading

0 comments on commit 333b1c8

Please sign in to comment.