From 9352e0d36f91b7d4bbc01943014e06b12737a63a Mon Sep 17 00:00:00 2001 From: "Mengyi Zhou (bjrara)" Date: Tue, 9 Mar 2021 11:53:19 -0800 Subject: [PATCH] Fix concurrency issues in emf exporter (#2571) --- exporter/awsemfexporter/cwlog_client_test.go | 4 +- exporter/awsemfexporter/metric_translator.go | 2 +- exporter/awsemfexporter/pusher.go | 288 ++++++++++--------- exporter/awsemfexporter/pusher_test.go | 185 +++++++----- 4 files changed, 272 insertions(+), 207 deletions(-) diff --git a/exporter/awsemfexporter/cwlog_client_test.go b/exporter/awsemfexporter/cwlog_client_test.go index 3cd2542a34ed..b22a01f294a3 100644 --- a/exporter/awsemfexporter/cwlog_client_test.go +++ b/exporter/awsemfexporter/cwlog_client_test.go @@ -32,14 +32,14 @@ import ( "go.uber.org/zap" ) -func NewAlwaysPassMockLogClient() LogClient { +func NewAlwaysPassMockLogClient(putLogEventsFunc func(args mock.Arguments)) LogClient { logger := zap.NewNop() svc := new(mockCloudWatchLogsClient) svc.On("PutLogEvents", mock.Anything).Return( &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken}, - nil) + nil).Run(putLogEventsFunc) svc.On("CreateLogGroup", mock.Anything).Return(new(cloudwatchlogs.CreateLogGroupOutput), nil) diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index c1bc4fdafae0..fbaeaa4bed9f 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -315,7 +315,7 @@ func translateCWMetricToEMF(cWMetric *CWMetrics) *LogEvent { } metricCreationTime := cWMetric.TimestampMs - logEvent := NewLogEvent( + logEvent := newLogEvent( metricCreationTime, string(pleMsg), ) diff --git a/exporter/awsemfexporter/pusher.go b/exporter/awsemfexporter/pusher.go index 5f9668b8dc9b..a5be6c33bc2a 100644 --- a/exporter/awsemfexporter/pusher.go +++ b/exporter/awsemfexporter/pusher.go @@ -15,7 +15,9 @@ package awsemfexporter import ( + "errors" "sort" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -24,93 +26,140 @@ import ( ) const ( - //http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html - //In truncation logic, it assuming this constant value is larger than PerEventHeaderBytes + len(TruncatedSuffix) - MaxEventPayloadBytes = 1024 * 256 //256KB + // http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html + // In truncation logic, it assuming this constant value is larger than PerEventHeaderBytes + len(TruncatedSuffix) + DefaultMaxEventPayloadBytes = 1024 * 256 //256KB // http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html MaxRequestEventCount = 10000 PerEventHeaderBytes = 26 MaxRequestPayloadBytes = 1024 * 1024 * 1 - logEventChanBufferSize = 10000 // 1 request can handle max 10000 log entries - minPusherIntervalInMillis = 200 // 5 TPS + minPusherIntervalMs = 200 // 5 TPS - logEventBatchPushChanBufferSize = 2 // processing part does not need to be blocked by the current put log event request - TruncatedSuffix = "[Truncated...]" + TruncatedSuffix = "[Truncated...]" LogEventTimestampLimitInPast = 14 * 24 * time.Hour //None of the log events in the batch can be older than 14 days LogEventTimestampLimitInFuture = -2 * time.Hour //None of the log events in the batch can be more than 2 hours in the future. ) -//Struct to present a log event. +var ( + maxEventPayloadBytes = DefaultMaxEventPayloadBytes +) + +// Struct to present a log event. type LogEvent struct { InputLogEvent *cloudwatchlogs.InputLogEvent - //The time which log generated. + // The time which log generated. LogGeneratedTime time.Time } -//Calculate the log event payload bytes. -func (logEvent *LogEvent) eventPayloadBytes() int { - return len(*logEvent.InputLogEvent.Message) + PerEventHeaderBytes +// Create a new log event +// logType will be propagated to logEventBatch and used by pusher to determine which client to call PutLogEvent +func newLogEvent(timestampMs int64, message string) *LogEvent { + logEvent := &LogEvent{ + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(timestampMs), + Message: aws.String(message)}, + } + return logEvent } -func (logEvent *LogEvent) truncateIfNeeded(logger *zap.Logger) bool { - if logEvent.eventPayloadBytes() > MaxEventPayloadBytes { +func (logEvent *LogEvent) Validate(logger *zap.Logger) error { + if logEvent.eventPayloadBytes() > maxEventPayloadBytes { logger.Warn("logpusher: the single log event size is larger than the max event payload allowed. Truncate the log event.", - zap.Int("SingleLogEventSize", logEvent.eventPayloadBytes()), zap.Int("MaxEventPayloadBytes", MaxEventPayloadBytes)) - newPayload := (*logEvent.InputLogEvent.Message)[0:(MaxEventPayloadBytes - PerEventHeaderBytes - len(TruncatedSuffix))] + zap.Int("SingleLogEventSize", logEvent.eventPayloadBytes()), zap.Int("maxEventPayloadBytes", maxEventPayloadBytes)) + + newPayload := (*logEvent.InputLogEvent.Message)[0:(maxEventPayloadBytes - PerEventHeaderBytes - len(TruncatedSuffix))] newPayload += TruncatedSuffix logEvent.InputLogEvent.Message = &newPayload - return true } - return false -} -//Create a new log event -//logType will be propagated to logEventBatch and used by pusher to determine which client to call PutLogEvent -func NewLogEvent(timestampInMillis int64, message string) *LogEvent { - logEvent := &LogEvent{ - InputLogEvent: &cloudwatchlogs.InputLogEvent{ - Timestamp: aws.Int64(timestampInMillis), - Message: aws.String(message)}, + if *logEvent.InputLogEvent.Timestamp == int64(0) { + logEvent.InputLogEvent.Timestamp = aws.Int64(logEvent.LogGeneratedTime.UnixNano() / int64(time.Millisecond)) } - return logEvent + if len(*logEvent.InputLogEvent.Message) == 0 { + return errors.New("empty log event message") + } + + //http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents + //* None of the log events in the batch can be more than 2 hours in the + //future. + //* None of the log events in the batch can be older than 14 days or the + //retention period of the log group. + currentTime := time.Now().UTC() + utcTime := time.Unix(0, *logEvent.InputLogEvent.Timestamp*int64(time.Millisecond)).UTC() + duration := currentTime.Sub(utcTime) + if duration > LogEventTimestampLimitInPast || duration < LogEventTimestampLimitInFuture { + err := errors.New("the log entry's timestamp is older than 14 days or more than 2 hours in the future") + logger.Error("discard log entry with invalid timestamp", + zap.Error(err), zap.String("LogEventTimestamp", utcTime.String()), zap.String("CurrentTime", currentTime.String())) + return err + } + return nil +} + +// Calculate the log event payload bytes. +func (logEvent *LogEvent) eventPayloadBytes() int { + return len(*logEvent.InputLogEvent.Message) + PerEventHeaderBytes } -//Struct to present a log event batch +// Struct to present a log event batch type LogEventBatch struct { PutLogEventsInput *cloudwatchlogs.PutLogEventsInput //the total bytes already in this log event batch byteTotal int //min timestamp recorded in this log event batch (ms) - minTimestampInMillis int64 + minTimestampMs int64 //max timestamp recorded in this log event batch (ms) - maxTimestampInMillis int64 + maxTimestampMs int64 +} - creationTime time.Time +// Create a new log event batch if needed. +func newLogEventBatch(logGroupName, logStreamName *string) *LogEventBatch { + return &LogEventBatch{ + PutLogEventsInput: &cloudwatchlogs.PutLogEventsInput{ + LogGroupName: logGroupName, + LogStreamName: logStreamName, + LogEvents: make([]*cloudwatchlogs.InputLogEvent, 0, MaxRequestEventCount)}, + } } -/** - * A batch of log events in a single request cannot span more than 24 hours. - * Otherwise, the operation fails. - */ -func (logEventBatch *LogEventBatch) timestampWithin24Hours(targetInMillis *int64) bool { - //new log event batch - if logEventBatch.minTimestampInMillis == 0 || logEventBatch.maxTimestampInMillis == 0 { +func (batch LogEventBatch) exceedsLimit(nextByteTotal int) bool { + return len(batch.PutLogEventsInput.LogEvents) == cap(batch.PutLogEventsInput.LogEvents) || + batch.byteTotal+nextByteTotal > maxEventPayloadBytes +} + +// isActive checks whether the logEventBatch spans more than 24 hours. Returns +// false if the condition does not match, and this batch should not be processed +// any further. +func (batch *LogEventBatch) isActive(targetTimestampMs *int64) bool { + // new log event batch + if batch.minTimestampMs == 0 || batch.maxTimestampMs == 0 { return true } - if *targetInMillis-logEventBatch.minTimestampInMillis > 24*3600*1e3 { + if *targetTimestampMs-batch.minTimestampMs > 24*3600*1e3 { return false } - if logEventBatch.maxTimestampInMillis-*targetInMillis > 24*3600*1e3 { + if batch.maxTimestampMs-*targetTimestampMs > 24*3600*1e3 { return false } return true } -//Sort the log events based on the timestamp. -func (logEventBatch *LogEventBatch) sortLogEvents() { - inputLogEvents := logEventBatch.PutLogEventsInput.LogEvents +func (batch *LogEventBatch) append(event *LogEvent) { + batch.PutLogEventsInput.LogEvents = append(batch.PutLogEventsInput.LogEvents, event.InputLogEvent) + batch.byteTotal += event.eventPayloadBytes() + if batch.minTimestampMs == 0 || batch.minTimestampMs > *event.InputLogEvent.Timestamp { + batch.minTimestampMs = *event.InputLogEvent.Timestamp + } + if batch.maxTimestampMs == 0 || batch.maxTimestampMs < *event.InputLogEvent.Timestamp { + batch.maxTimestampMs = *event.InputLogEvent.Timestamp + } +} + +// Sort the log events based on the timestamp. +func (batch *LogEventBatch) sortLogEvents() { + inputLogEvents := batch.PutLogEventsInput.LogEvents sort.Stable(ByTimestamp(inputLogEvents)) } @@ -128,31 +177,30 @@ func (inputLogEvents ByTimestamp) Less(i, j int) bool { return *inputLogEvents[i].Timestamp < *inputLogEvents[j].Timestamp } -//Pusher is one per log group +// Pusher is created by log group and log stream type Pusher interface { AddLogEntry(logEvent *LogEvent) error ForceFlush() error } -//Struct of pusher implemented Pusher interface. +// Struct of pusher implemented Pusher interface. type pusher struct { logger *zap.Logger - //log group name for the current pusher + // log group name of the current pusher logGroupName *string - //log stream name for the current pusher + // log stream name of the current pusher logStreamName *string - svcStructuredLog LogClient - streamToken string //no init value - - logEventChan chan *LogEvent - pushChan chan *LogEventBatch + batchUpdateLock sync.Mutex + logEventBatch *LogEventBatch - logEventBatch *LogEventBatch - retryCnt int + pushLock sync.Mutex + streamToken string // no init value + svcStructuredLog LogClient + retryCnt int } -//Create a pusher instance and start the instance afterwards +// NewPusher creates a pusher instance func NewPusher(logGroupName, logStreamName *string, retryCnt int, svcStructuredLog LogClient, logger *zap.Logger) Pusher { @@ -162,22 +210,21 @@ func NewPusher(logGroupName, logStreamName *string, retryCnt int, if retryCnt > 0 { pusher.retryCnt = retryCnt } + return pusher } -//Only create a pusher, but not start the instance. +// Only create a pusher, but not start the instance. func newPusher(logGroupName, logStreamName *string, svcStructuredLog LogClient, logger *zap.Logger) *pusher { pusher := &pusher{ logGroupName: logGroupName, logStreamName: logStreamName, svcStructuredLog: svcStructuredLog, - logEventChan: make(chan *LogEvent, logEventChanBufferSize), - pushChan: make(chan *LogEventBatch, logEventBatchPushChanBufferSize), logger: logger, } + pusher.logEventBatch = newLogEventBatch(logGroupName, logStreamName) - pusher.logEventBatch = pusher.newLogEventBatch() return pusher } @@ -190,32 +237,48 @@ func newPusher(logGroupName, logStreamName *string, func (p *pusher) AddLogEntry(logEvent *LogEvent) error { var err error if logEvent != nil { - logEvent.truncateIfNeeded(p.logger) - if *logEvent.InputLogEvent.Timestamp == int64(0) { - logEvent.InputLogEvent.Timestamp = aws.Int64(logEvent.LogGeneratedTime.UnixNano() / int64(time.Millisecond)) + err = logEvent.Validate(p.logger) + if err != nil { + return err + } + prevBatch := p.addLogEvent(logEvent) + if prevBatch != nil { + err = p.pushLogEventBatch(prevBatch) } - err = p.addLogEvent(logEvent) } return err } func (p *pusher) ForceFlush() error { - return p.flushLogEventBatch() + prevBatch := p.renewLogEventBatch() + if prevBatch != nil { + return p.pushLogEventBatch(prevBatch) + } + return nil } func (p *pusher) pushLogEventBatch(req interface{}) error { - //http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents - //* The log events in the batch must be in chronological ordered by their - //timestamp (the time the event occurred, expressed as the number of milliseconds - //since Jan 1, 1970 00:00:00 UTC). + p.pushLock.Lock() + defer p.pushLock.Unlock() + + // http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents + // The log events in the batch must be in chronological ordered by their + // timestamp (the time the event occurred, expressed as the number of milliseconds + // since Jan 1, 1970 00:00:00 UTC). logEventBatch := req.(*LogEventBatch) logEventBatch.sortLogEvents() putLogEventsInput := logEventBatch.PutLogEventsInput if p.streamToken == "" { - //log part and retry logic are already done inside the CreateStream + var err error + // log part and retry logic are already done inside the CreateStream // when the error is not nil, the stream token is "", which is handled in the below logic. - p.streamToken, _ = p.svcStructuredLog.CreateStream(p.logGroupName, p.logStreamName) + p.streamToken, err = p.svcStructuredLog.CreateStream(p.logGroupName, p.logStreamName) + // TODO Known issue: createStream will fail if the corresponding logGroup and logStream has been created. + // The retry mechanism helps get the first stream token, yet the first batch will be sent twice in this situation. + if err != nil { + p.logger.Warn("Failed to create stream token", zap.Error(err)) + } } if p.streamToken != "" { @@ -241,80 +304,41 @@ func (p *pusher) pushLogEventBatch(req interface{}) error { p.streamToken = *tmpToken } diff := time.Since(startTime) - if timeLeft := minPusherIntervalInMillis*time.Millisecond - diff; timeLeft > 0 { + if timeLeft := minPusherIntervalMs*time.Millisecond - diff; timeLeft > 0 { time.Sleep(timeLeft) } return nil } -//Create a new log event batch if needed. -func (p *pusher) newLogEventBatch() *LogEventBatch { - logEventBatch := &LogEventBatch{ - PutLogEventsInput: &cloudwatchlogs.PutLogEventsInput{ - LogGroupName: p.logGroupName, - LogStreamName: p.logStreamName, - LogEvents: make([]*cloudwatchlogs.InputLogEvent, 0, MaxRequestEventCount)}, - creationTime: time.Now(), +func (p *pusher) addLogEvent(logEvent *LogEvent) *LogEventBatch { + if logEvent == nil { + return nil } - return logEventBatch -} -//Determine if a new log event batch is needed. -func (p *pusher) newLogEventBatchIfNeeded(logEvent *LogEvent) error { - var err error - logEventBatch := p.logEventBatch - if len(logEventBatch.PutLogEventsInput.LogEvents) == cap(logEventBatch.PutLogEventsInput.LogEvents) || - logEvent != nil && (logEventBatch.byteTotal+logEvent.eventPayloadBytes() > MaxRequestPayloadBytes || !logEventBatch.timestampWithin24Hours(logEvent.InputLogEvent.Timestamp)) { - err = p.pushLogEventBatch(logEventBatch) - p.logEventBatch = p.newLogEventBatch() - } - return err -} + p.batchUpdateLock.Lock() + defer p.batchUpdateLock.Unlock() -func (p *pusher) flushLogEventBatch() error { - var err error - if len(p.logEventBatch.PutLogEventsInput.LogEvents) > 0 { - logEventBatch := p.logEventBatch - err = p.pushLogEventBatch(logEventBatch) - p.logEventBatch = p.newLogEventBatch() + var prevBatch *LogEventBatch + currentBatch := p.logEventBatch + if currentBatch.exceedsLimit(logEvent.eventPayloadBytes()) || !currentBatch.isActive(logEvent.InputLogEvent.Timestamp) { + prevBatch = currentBatch + currentBatch = newLogEventBatch(p.logGroupName, p.logStreamName) } - return err -} + currentBatch.append(logEvent) + p.logEventBatch = currentBatch -//Add the log event onto the log event batch -func (p *pusher) addLogEvent(logEvent *LogEvent) error { - var err error - if len(*logEvent.InputLogEvent.Message) == 0 { - return nil - } + return prevBatch +} - //http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents - //* None of the log events in the batch can be more than 2 hours in the - //future. - //* None of the log events in the batch can be older than 14 days or the - //retention period of the log group. - currentTime := time.Now().UTC() - utcTime := time.Unix(0, *logEvent.InputLogEvent.Timestamp*int64(time.Millisecond)).UTC() - duration := currentTime.Sub(utcTime) - if duration > LogEventTimestampLimitInPast || duration < LogEventTimestampLimitInFuture { - p.logger.Error("logpusher: the log entry's timestamp is older than 14 days or more than 2 hours in the future. Discard the log entry.", - zap.String("LogGroupName", *p.logGroupName), zap.String("LogEventTimestamp", utcTime.String()), zap.String("CurrentTime", currentTime.String())) - return err - } +func (p *pusher) renewLogEventBatch() *LogEventBatch { + p.batchUpdateLock.Lock() + defer p.batchUpdateLock.Unlock() - err = p.newLogEventBatchIfNeeded(logEvent) - if err != nil { - return err + var prevBatch *LogEventBatch + if len(p.logEventBatch.PutLogEventsInput.LogEvents) > 0 { + prevBatch = p.logEventBatch + p.logEventBatch = newLogEventBatch(p.logGroupName, p.logStreamName) } - logEventBatch := p.logEventBatch - logEventBatch.PutLogEventsInput.LogEvents = append(logEventBatch.PutLogEventsInput.LogEvents, logEvent.InputLogEvent) - logEventBatch.byteTotal += logEvent.eventPayloadBytes() - if logEventBatch.minTimestampInMillis == 0 || logEventBatch.minTimestampInMillis > *logEvent.InputLogEvent.Timestamp { - logEventBatch.minTimestampInMillis = *logEvent.InputLogEvent.Timestamp - } - if logEventBatch.maxTimestampInMillis == 0 || logEventBatch.maxTimestampInMillis < *logEvent.InputLogEvent.Timestamp { - logEventBatch.maxTimestampInMillis = *logEvent.InputLogEvent.Timestamp - } - return nil + return prevBatch } diff --git a/exporter/awsemfexporter/pusher_test.go b/exporter/awsemfexporter/pusher_test.go index b61a975d3720..bb6906b8758c 100644 --- a/exporter/awsemfexporter/pusher_test.go +++ b/exporter/awsemfexporter/pusher_test.go @@ -20,24 +20,101 @@ import ( "math/rand" "os" "strings" + "sync" "testing" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "go.uber.org/zap" ) +func TestConcurrentPushAndFlush(t *testing.T) { + maxEventPayloadBytes = 128 + + concurrency := 10 + current := time.Now().UnixNano() / 1e6 + collection := map[string]interface{}{} + + pusher, _ := newMockPusherWithEventCheck(func(msg string) { + if _, ok := collection[msg]; ok { + t.Errorf("Sending duplicated event message %s", msg) + } else { + collection[msg] = struct{}{} + } + }) + + wg := sync.WaitGroup{} + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func(ii int) { + for j := 0; j < 10; j++ { + pusher.AddLogEntry(newLogEvent(current, fmt.Sprintf("batch-%d-%d", ii, j))) + } + time.Sleep(1000 * time.Millisecond) + pusher.ForceFlush() + wg.Done() + }(i) + } + wg.Wait() + assert.Equal(t, concurrency*10, len(collection)) + + maxEventPayloadBytes = DefaultMaxEventPayloadBytes +} + +func newMockPusherWithEventCheck(check func(msg string)) (Pusher, string) { + logger := zap.NewNop() + tmpfolder, _ := ioutil.TempDir("", "") + svc := NewAlwaysPassMockLogClient(func(args mock.Arguments) { + input := args.Get(0).(*cloudwatchlogs.PutLogEventsInput) + for _, event := range input.LogEvents { + eventMsg := *event.Message + check(eventMsg) + } + }) + p := newPusher(&logGroup, &logStreamName, svc, logger) + return p, tmpfolder +} + // // LogEvent Tests // func TestLogEvent_eventPayloadBytes(t *testing.T) { testMessage := "test message" - logEvent := NewLogEvent(0, testMessage) + logEvent := newLogEvent(0, testMessage) assert.Equal(t, len(testMessage)+PerEventHeaderBytes, logEvent.eventPayloadBytes()) } +func TestValidateLogEventWithMutating(t *testing.T) { + maxEventPayloadBytes = 64 + + logger := zap.NewNop() + logEvent := newLogEvent(0, "abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789") + logEvent.LogGeneratedTime = time.Now() + err := logEvent.Validate(logger) + assert.Nil(t, err) + assert.True(t, *logEvent.InputLogEvent.Timestamp > int64(0)) + assert.Equal(t, 64-PerEventHeaderBytes, len(*logEvent.InputLogEvent.Message)) + + maxEventPayloadBytes = DefaultMaxEventPayloadBytes +} + +func TestValidateLogEventFailed(t *testing.T) { + logger := zap.NewNop() + logEvent := newLogEvent(0, "") + err := logEvent.Validate(logger) + assert.NotNil(t, err) + assert.Equal(t, "empty log event message", err.Error()) + + invalidTimestamp := time.Now().AddDate(0, -1, 0) + logEvent = newLogEvent(invalidTimestamp.Unix()*1e3, "test") + err = logEvent.Validate(logger) + assert.NotNil(t, err) + assert.Equal(t, "the log entry's timestamp is older than 14 days or more than 2 hours in the future", err.Error()) +} + // // LogEventBatch Tests // @@ -45,27 +122,27 @@ func TestLogEventBatch_timestampWithin24Hours(t *testing.T) { min := time.Date(2017, time.June, 20, 23, 38, 0, 0, time.Local) max := min.Add(23 * time.Hour) logEventBatch := &LogEventBatch{ - maxTimestampInMillis: max.UnixNano() / 1e6, - minTimestampInMillis: min.UnixNano() / 1e6, + maxTimestampMs: max.UnixNano() / 1e6, + minTimestampMs: min.UnixNano() / 1e6, } //less than the min target := min.Add(-1 * time.Hour) - assert.True(t, logEventBatch.timestampWithin24Hours(aws.Int64(target.UnixNano()/1e6))) + assert.True(t, logEventBatch.isActive(aws.Int64(target.UnixNano()/1e6))) target = target.Add(-1 * time.Millisecond) - assert.False(t, logEventBatch.timestampWithin24Hours(aws.Int64(target.UnixNano()/1e6))) + assert.False(t, logEventBatch.isActive(aws.Int64(target.UnixNano()/1e6))) //more than the max target = max.Add(1 * time.Hour) - assert.True(t, logEventBatch.timestampWithin24Hours(aws.Int64(target.UnixNano()/1e6))) + assert.True(t, logEventBatch.isActive(aws.Int64(target.UnixNano()/1e6))) target = target.Add(1 * time.Millisecond) - assert.False(t, logEventBatch.timestampWithin24Hours(aws.Int64(target.UnixNano()/1e6))) + assert.False(t, logEventBatch.isActive(aws.Int64(target.UnixNano()/1e6))) //in between min and max target = min.Add(2 * time.Hour) - assert.True(t, logEventBatch.timestampWithin24Hours(aws.Int64(target.UnixNano()/1e6))) + assert.True(t, logEventBatch.isActive(aws.Int64(target.UnixNano()/1e6))) } func TestLogEventBatch_sortLogEvents(t *testing.T) { @@ -76,7 +153,7 @@ func TestLogEventBatch_sortLogEvents(t *testing.T) { for i := 0; i < totalEvents; i++ { timestamp := rand.Int() - logEvent := NewLogEvent( + logEvent := newLogEvent( int64(timestamp), fmt.Sprintf("message%v", timestamp)) fmt.Printf("logEvents[%d].Timestamp=%d.\n", i, timestamp) @@ -100,7 +177,7 @@ func TestLogEventBatch_sortLogEvents(t *testing.T) { func newMockPusher() (*pusher, string) { logger := zap.NewNop() tmpfolder, _ := ioutil.TempDir("", "") - svc := NewAlwaysPassMockLogClient() + svc := NewAlwaysPassMockLogClient(func(args mock.Arguments) {}) p := newPusher(&logGroup, &logStreamName, svc, logger) return p, tmpfolder } @@ -109,16 +186,16 @@ func newMockPusher() (*pusher, string) { // Pusher Tests // -var timestampInMillis = time.Now().UnixNano() / 1e6 +var timestampMs = time.Now().UnixNano() / 1e6 var msg = "test log message" func TestPusher_newLogEventBatch(t *testing.T) { p, tmpFolder := newMockPusher() defer os.RemoveAll(tmpFolder) - logEventBatch := p.newLogEventBatch() - assert.Equal(t, int64(0), logEventBatch.maxTimestampInMillis) - assert.Equal(t, int64(0), logEventBatch.minTimestampInMillis) + logEventBatch := newLogEventBatch(p.logGroupName, p.logStreamName) + assert.Equal(t, int64(0), logEventBatch.maxTimestampMs) + assert.Equal(t, int64(0), logEventBatch.minTimestampMs) assert.Equal(t, 0, logEventBatch.byteTotal) assert.Equal(t, 0, len(logEventBatch.PutLogEventsInput.LogEvents)) assert.Equal(t, p.logStreamName, logEventBatch.PutLogEventsInput.LogStreamName) @@ -126,12 +203,12 @@ func TestPusher_newLogEventBatch(t *testing.T) { assert.Equal(t, (*string)(nil), logEventBatch.PutLogEventsInput.SequenceToken) } -func TestPusher_newLogEventBatchIfNeeded(t *testing.T) { +func TestPusher_addLogEventBatch(t *testing.T) { p, tmpFolder := newMockPusher() defer os.RemoveAll(tmpFolder) cap := cap(p.logEventBatch.PutLogEventsInput.LogEvents) - logEvent := NewLogEvent(timestampInMillis, msg) + logEvent := newLogEvent(timestampMs, msg) for i := 0; i < cap; i++ { p.logEventBatch.PutLogEventsInput.LogEvents = append(p.logEventBatch.PutLogEventsInput.LogEvents, logEvent.InputLogEvent) @@ -139,77 +216,41 @@ func TestPusher_newLogEventBatchIfNeeded(t *testing.T) { assert.Equal(t, cap, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - p.newLogEventBatchIfNeeded(logEvent) + assert.NotNil(t, p.addLogEvent(logEvent)) //the actual log event add operation happens after the func newLogEventBatchIfNeeded - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) + assert.Equal(t, 1, len(p.logEventBatch.PutLogEventsInput.LogEvents)) p.logEventBatch.byteTotal = MaxRequestPayloadBytes - logEvent.eventPayloadBytes() + 1 - p.newLogEventBatchIfNeeded(logEvent) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) + assert.NotNil(t, p.addLogEvent(logEvent)) + assert.Equal(t, 1, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - p.logEventBatch.minTimestampInMillis, p.logEventBatch.maxTimestampInMillis = timestampInMillis, timestampInMillis - p.newLogEventBatchIfNeeded(NewLogEvent(timestampInMillis+(time.Hour*24+time.Millisecond*1).Nanoseconds()/1e6, msg)) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) + p.logEventBatch.minTimestampMs, p.logEventBatch.maxTimestampMs = timestampMs, timestampMs + assert.NotNil(t, p.addLogEvent(newLogEvent(timestampMs+(time.Hour*24+time.Millisecond*1).Nanoseconds()/1e6, msg))) + assert.Equal(t, 1, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - //even the event batch is expired, the total byte is sitll 0 at this time. - p.newLogEventBatchIfNeeded(nil) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) + assert.Nil(t, p.addLogEvent(nil)) + assert.Equal(t, 1, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - //even the event batch is expired, the total byte is sitll 0 at this time. - p.newLogEventBatchIfNeeded(logEvent) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) + assert.NotNil(t, p.addLogEvent(logEvent)) + assert.Equal(t, 1, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - //the previous sleep is still in effect at this step. p.logEventBatch.byteTotal = 1 - p.newLogEventBatchIfNeeded(nil) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - -} - -func TestPusher_addLogEvent(t *testing.T) { - p, tmpFolder := newMockPusher() - defer os.RemoveAll(tmpFolder) - - p.addLogEvent(NewLogEvent(time.Now().Add(-(14*24+1)*time.Hour).UnixNano()/1e6, msg)) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - assert.Equal(t, int64(0), p.logEventBatch.minTimestampInMillis) - assert.Equal(t, int64(0), p.logEventBatch.maxTimestampInMillis) - assert.Equal(t, 0, p.logEventBatch.byteTotal) - - p.addLogEvent(NewLogEvent(time.Now().Add((2+1)*time.Hour).UnixNano()/1e6, msg)) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - assert.Equal(t, int64(0), p.logEventBatch.minTimestampInMillis) - assert.Equal(t, int64(0), p.logEventBatch.maxTimestampInMillis) - assert.Equal(t, 0, p.logEventBatch.byteTotal) - - p.addLogEvent(NewLogEvent(timestampInMillis, "")) - assert.Equal(t, 0, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - assert.Equal(t, int64(0), p.logEventBatch.minTimestampInMillis) - assert.Equal(t, int64(0), p.logEventBatch.maxTimestampInMillis) - assert.Equal(t, 0, p.logEventBatch.byteTotal) - - p.addLogEvent(NewLogEvent(timestampInMillis, msg)) + assert.Nil(t, p.addLogEvent(nil)) assert.Equal(t, 1, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - assert.Equal(t, timestampInMillis, p.logEventBatch.minTimestampInMillis) - assert.Equal(t, timestampInMillis, p.logEventBatch.maxTimestampInMillis) - assert.Equal(t, len(msg)+PerEventHeaderBytes, p.logEventBatch.byteTotal) - - p.addLogEvent(NewLogEvent(timestampInMillis+1, msg+"1")) - assert.Equal(t, 2, len(p.logEventBatch.PutLogEventsInput.LogEvents)) - assert.Equal(t, timestampInMillis, p.logEventBatch.minTimestampInMillis) - assert.Equal(t, timestampInMillis+1, p.logEventBatch.maxTimestampInMillis) - assert.Equal(t, len(msg)+len(msg+"1")+2*PerEventHeaderBytes, p.logEventBatch.byteTotal) + } -func TestPusher_truncateLogEvent(t *testing.T) { +func TestAddLogEventWithValidation(t *testing.T) { p, tmpFolder := newMockPusher() defer os.RemoveAll(tmpFolder) - largeEventContent := strings.Repeat("a", MaxEventPayloadBytes) + largeEventContent := strings.Repeat("a", DefaultMaxEventPayloadBytes) - logEvent := NewLogEvent(timestampInMillis, largeEventContent) - expectedTruncatedContent := (*logEvent.InputLogEvent.Message)[0:(MaxEventPayloadBytes-PerEventHeaderBytes-len(TruncatedSuffix))] + TruncatedSuffix + logEvent := newLogEvent(timestampMs, largeEventContent) + expectedTruncatedContent := (*logEvent.InputLogEvent.Message)[0:(DefaultMaxEventPayloadBytes-PerEventHeaderBytes-len(TruncatedSuffix))] + TruncatedSuffix p.AddLogEntry(logEvent) - assert.Equal(t, expectedTruncatedContent, *logEvent.InputLogEvent.Message) + + logEvent = newLogEvent(timestampMs, "") + assert.NotNil(t, p.addLogEvent(logEvent)) }