Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug related to wrong state.ID in awss3 direct s3 input #32164

Merged
merged 12 commits into from
Jul 11, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

*Filebeat*

- Fix wrong state ID in states registry for awss3 s3 direct input. {pull}32164[32164]
- cisco/asa: fix handling of user names when there are Security Group Tags present. {issue}32009[32009] {pull}32196[32196]

*Heartbeat*
Expand Down
33 changes: 24 additions & 9 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"

"github.com/elastic/beats/v7/libbeat/beat"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
Expand Down Expand Up @@ -66,17 +67,22 @@ func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqs.Messag
}

type s3PagerConstant struct {
mutex *sync.Mutex
objects []s3.Object
currentIndex int
}

var _ s3Pager = (*s3PagerConstant)(nil)

func (c *s3PagerConstant) Next(ctx context.Context) bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.currentIndex < len(c.objects)
}

func (c *s3PagerConstant) CurrentPage() *s3.ListObjectsOutput {
c.mutex.Lock()
defer c.mutex.Unlock()
ret := &s3.ListObjectsOutput{}
pageSize := 1000
if len(c.objects) < c.currentIndex+pageSize {
Expand All @@ -90,6 +96,8 @@ func (c *s3PagerConstant) CurrentPage() *s3.ListObjectsOutput {
}

func (c *s3PagerConstant) Err() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.currentIndex >= len(c.objects) {
c.currentIndex = 0
}
Expand All @@ -99,6 +107,7 @@ func (c *s3PagerConstant) Err() error {
func newS3PagerConstant(listPrefix string) *s3PagerConstant {
lastModified := time.Now()
ret := &s3PagerConstant{
mutex: new(sync.Mutex),
currentIndex: 0,
}

Expand Down Expand Up @@ -259,21 +268,19 @@ func TestBenchmarkInputSQS(t *testing.T) {
func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult {
return testing.Benchmark(func(b *testing.B) {
log := logp.NewLogger(inputName)
log.Infof("benchmark with %d number of workers", numberOfWorkers)
metricRegistry := monitoring.NewRegistry()
metrics := newInputMetrics(metricRegistry, "test_id")

client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) {
event.Private.(*awscommon.EventACKTracker).ACK()
})

defer close(client.Channel)
conf := makeBenchmarkConfig(t)
defer func() {
_ = client.Close()
}()

storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
store, err := storeReg.Get("test")
if err != nil {
t.Fatalf("Failed to access store: %v", err)
}
config := makeBenchmarkConfig(t)

b.ResetTimer()
start := time.Now()
Expand All @@ -296,13 +303,21 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
listPrefix := fmt.Sprintf("list_prefix_%d", i)
s3API := newConstantS3(t)
s3API.pagerConstant = newS3PagerConstant(listPrefix)

storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
store, err := storeReg.Get("test")
if err != nil {
errChan <- fmt.Errorf("Failed to access store: %w", err)
return
}

err = store.Set(awsS3WriteCommitPrefix+"bucket"+listPrefix, &commitWriteState{time.Time{}})
if err != nil {
errChan <- err
return
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, config.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newS3Poller(log *logp.Logger,
}

func (p *s3Poller) handlePurgingLock(info s3ObjectInfo, isStored bool) {
id := info.name + info.key
id := stateID(info.name, info.key, info.etag, info.lastModified)
previousState := p.states.FindPreviousByID(id)
if !previousState.IsEmpty() {
if isStored {
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/awss3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type state struct {
Error bool `json:"error" struct:"error"`
}

func stateID(bucket, key, etag string, lastModified time.Time) string {
return bucket + key + etag + lastModified.String()
}

// newState creates a new s3 object state
func newState(bucket, key, etag, listPrefix string, lastModified time.Time) state {
s := state{
Expand All @@ -41,7 +45,7 @@ func newState(bucket, key, etag, listPrefix string, lastModified time.Time) stat
Error: false,
}

s.ID = s.Bucket + s.Key + s.Etag + s.LastModified.String()
s.ID = stateID(s.Bucket, s.Key, s.Etag, s.LastModified)

return s
}
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *states) Update(newState state, listingID string) {
s.Lock()
defer s.Unlock()

id := newState.Bucket + newState.Key
id := newState.ID
index := s.findPrevious(id)

if index >= 0 {
Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *states) Update(newState state, listingID string) {
func (s *states) FindPrevious(newState state) state {
s.RLock()
defer s.RUnlock()
id := newState.Bucket + newState.Key
id := newState.ID
i := s.findPrevious(id)
if i < 0 {
return state{}
Expand All @@ -227,7 +227,7 @@ func (s *states) FindPreviousByID(id string) state {
func (s *states) IsNew(state state) bool {
s.RLock()
defer s.RUnlock()
id := state.Bucket + state.Key
id := state.ID
i := s.findPrevious(id)

if i < 0 {
Expand Down
87 changes: 75 additions & 12 deletions x-pack/filebeat/input/awss3/states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,77 @@ var inputCtx = v2.Context{
Cancelation: context.Background(),
}

func TestStatesIsNew(t *testing.T) {
type stateTestCase struct {
states func() *states
state state
expected bool
}
lastModified := time.Date(2022, time.June, 30, 14, 13, 00, 0, time.UTC)
tests := map[string]stateTestCase{
"with empty states": {
states: func() *states {
return newStates(inputCtx)
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
expected: true,
},
"not existing state": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "")
return states
},
state: newState("bucket1", "key1", "etag1", "listPrefix1", lastModified),
expected: true,
},
"existing state": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "")
return states
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified),
expected: false,
},
"with different etag": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag1", "listPrefix", lastModified), "")
return states
},
state: newState("bucket", "key", "etag2", "listPrefix", lastModified),
expected: true,
},
"with different lastmodified": {
states: func() *states {
states := newStates(inputCtx)
states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "")
return states
},
state: newState("bucket", "key", "etag", "listPrefix", lastModified.Add(1*time.Second)),
expected: true,
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
states := test.states()
isNew := states.IsNew(test.state)
assert.Equal(t, test.expected, isNew)
})
}
}

func TestStatesDelete(t *testing.T) {
type stateTestCase struct {
states func() *states
deleteID string
expected []state
}

lastModified := time.Date(2021, time.July, 22, 18, 38, 0o0, 0, time.UTC)
lastModified := time.Date(2021, time.July, 22, 18, 38, 00, 0, time.UTC)
tests := map[string]stateTestCase{
"delete empty states": {
states: func() *states {
Expand All @@ -45,7 +108,7 @@ func TestStatesDelete(t *testing.T) {
deleteID: "an id",
expected: []state{
{
ID: "bucketkeyetag" + lastModified.String(),
ID: stateID("bucket", "key", "etag", lastModified),
Bucket: "bucket",
Key: "key",
Etag: "etag",
Expand All @@ -60,7 +123,7 @@ func TestStatesDelete(t *testing.T) {
states.Update(newState("bucket", "key", "etag", "listPrefix", lastModified), "")
return states
},
deleteID: "bucketkey",
deleteID: stateID("bucket", "key", "etag", lastModified),
expected: []state{},
},
"delete first": {
Expand All @@ -71,18 +134,18 @@ func TestStatesDelete(t *testing.T) {
states.Update(newState("bucket", "key3", "etag3", "listPrefix", lastModified), "")
return states
},
deleteID: "bucketkey1",
deleteID: "bucketkey1etag1" + lastModified.String(),
expected: []state{
{
ID: "bucketkey3etag3" + lastModified.String(),
ID: stateID("bucket", "key3", "etag3", lastModified),
Bucket: "bucket",
Key: "key3",
Etag: "etag3",
ListPrefix: "listPrefix",
LastModified: lastModified,
},
{
ID: "bucketkey2etag2" + lastModified.String(),
ID: stateID("bucket", "key2", "etag2", lastModified),
Bucket: "bucket",
Key: "key2",
Etag: "etag2",
Expand All @@ -99,18 +162,18 @@ func TestStatesDelete(t *testing.T) {
states.Update(newState("bucket", "key3", "etag3", "listPrefix", lastModified), "")
return states
},
deleteID: "bucketkey3",
deleteID: "bucketkey3etag3" + lastModified.String(),
expected: []state{
{
ID: "bucketkey1etag1" + lastModified.String(),
ID: stateID("bucket", "key1", "etag1", lastModified),
Bucket: "bucket",
Key: "key1",
Etag: "etag1",
ListPrefix: "listPrefix",
LastModified: lastModified,
},
{
ID: "bucketkey2etag2" + lastModified.String(),
ID: stateID("bucket", "key2", "etag2", lastModified),
Bucket: "bucket",
Key: "key2",
Etag: "etag2",
Expand All @@ -127,18 +190,18 @@ func TestStatesDelete(t *testing.T) {
states.Update(newState("bucket", "key3", "etag3", "listPrefix", lastModified), "")
return states
},
deleteID: "bucketkey2",
deleteID: "bucketkey2etag2" + lastModified.String(),
expected: []state{
{
ID: "bucketkey1etag1" + lastModified.String(),
ID: stateID("bucket", "key1", "etag1", lastModified),
Bucket: "bucket",
Key: "key1",
Etag: "etag1",
ListPrefix: "listPrefix",
LastModified: lastModified,
},
{
ID: "bucketkey3etag3" + lastModified.String(),
ID: stateID("bucket", "key3", "etag3", lastModified),
Bucket: "bucket",
Key: "key3",
Etag: "etag3",
Expand Down