diff --git a/service/worker/indexer/esProcessor.go b/service/worker/indexer/esProcessor.go index a1df575f06d..a3f7938be44 100644 --- a/service/worker/indexer/esProcessor.go +++ b/service/worker/indexer/esProcessor.go @@ -146,6 +146,28 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka tag.WorkflowID(wid), tag.WorkflowRunID(rid), tag.WorkflowDomainID(domainID)) + + // check if it is a delete request and status code + // 404 means the document does not exist + // 409 means means the document's version does not match (or if the document has been updated or deleted by another process) + // this can happen during the data migration, the doc was deleted in the old index but not exists in the new index + if err.Status == 409 || err.Status == 404 { + status := err.Status + req, err := request.Source() + if err == nil { + if p.isDeleteRequest(req) { + p.logger.Info("Delete request encountered a version conflict. Acknowledging to prevent retry.", + tag.ESResponseStatus(status), tag.ESRequest(request.String()), + tag.WorkflowID(wid), + tag.WorkflowRunID(rid), + tag.WorkflowDomainID(domainID)) + p.ackKafkaMsg(key) + } + } else { + p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String())) + p.scope.IncCounter(metrics.ESProcessorCorruptedData) + } + } p.nackKafkaMsg(key) } else { p.logger.Error("ES request failed", tag.ESRequest(request.String())) @@ -224,7 +246,7 @@ func (p *ESProcessorImpl) retrieveKafkaKey(request bulk.GenericBulkableRequest) } var key string - if len(req) == 2 { // index or update requests + if !p.isDeleteRequest(req) { // index or update requests var body map[string]interface{} if err := json.Unmarshal([]byte(req[1]), &body); err != nil { p.logger.Error("Unmarshal index request body err.", tag.Error(err)) @@ -287,6 +309,16 @@ func (p *ESProcessorImpl) hashFn(key interface{}) uint32 { return uint32(common.WorkflowIDToHistoryShard(id, numOfShards)) } +func (p *ESProcessorImpl) isDeleteRequest(request []string) bool { + // The Source() method typically returns a slice of strings, where each string represents a part of the bulk request in JSON format. + // For delete operations, the Source() method typically returns only one part + // The metadata that specifies the delete action, including _index and _id. + // "{\"delete\":{\"_index\":\"my-index\",\"_id\":\"1\"}}" + // For index/update operations, the Source() method typically returns two parts + // reference: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-bulk.html + return len(request) == 1 +} + // 409 - Version Conflict // 404 - Not Found func isResponseSuccess(status int) bool { diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go index 125b7a81d08..795b7d02ba2 100644 --- a/service/worker/indexer/esProcessor_test.go +++ b/service/worker/indexer/esProcessor_test.go @@ -278,6 +278,44 @@ func (s *esProcessorSuite) TestBulkAfterAction_Error() { s.esProcessor.bulkAfterAction(0, requests, response, &bulk.GenericError{Details: fmt.Errorf("some error")}) } +func (s *esProcessorSuite) TestBulkAfterAction_Error_Nack() { + version := int64(3) + testKey := "testKey" + request := &mocks2.GenericBulkableRequest{} + request.On("String").Return("") + request.On("Source").Return([]string{`{"delete":{"_index":"test-index","_id":"testKey"}}`}, nil) + requests := []bulk.GenericBulkableRequest{request} + + mFailed := map[string]*bulk.GenericBulkResponseItem{ + "delete": { + Index: testIndex, + Type: testType, + ID: testID, + Version: version, + Status: 409, + }, + } + response := &bulk.GenericBulkResponse{ + Took: 3, + Errors: true, + Items: []map[string]*bulk.GenericBulkResponseItem{mFailed}, + } + + wid := "test-workflowID" + rid := "test-runID" + domainID := "test-domainID" + payload := s.getEncodedMsg(wid, rid, domainID) + + mockKafkaMsg := &msgMocks.Message{} + mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch) + s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal) + mockKafkaMsg.On("Nack").Return(nil).Once() + mockKafkaMsg.On("Ack").Return(nil).Once() // Expect Ack to be called + mockKafkaMsg.On("Value").Return(payload).Once() + s.mockScope.On("IncCounter", metrics.ESProcessorFailures).Once() + s.esProcessor.bulkAfterAction(0, requests, response, &bulk.GenericError{Status: 404, Details: fmt.Errorf("some error")}) +} + func (s *esProcessorSuite) TestAckKafkaMsg() { key := "test-key" // no msg in map, nothing called @@ -402,3 +440,50 @@ func (s *esProcessorSuite) TestIsErrorRetriable() { s.Equal(test.expected, isResponseRetriable(test.input.Status)) } } + +func (s *esProcessorSuite) TestIsDeleteRequest() { + tests := []struct { + request bulk.GenericBulkableRequest + bIsDelete bool + }{ + { + request: bulk.NewBulkIndexRequest(). + ID("request.ID"). + Index("request.Index"). + Version(int64(0)). + VersionType("request.VersionType").Doc("request.Doc"), + bIsDelete: false, + }, + { + request: bulk.NewBulkDeleteRequest(). + ID("request.ID"). + Index("request.Index"), + bIsDelete: true, + }, + } + for _, test := range tests { + req, _ := test.request.Source() + s.Equal(test.bIsDelete, s.esProcessor.isDeleteRequest(req)) + } +} + +func (s *esProcessorSuite) TestIsDeleteRequest_Error() { + request := &MockBulkableRequest{} + s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return() + req, err := request.Source() + s.False(s.esProcessor.isDeleteRequest(req)) + s.Error(err) +} + +// MockBulkableRequest is a mock implementation of the GenericBulkableRequest interface +type MockBulkableRequest struct{} + +// String returns a mock string +func (m *MockBulkableRequest) String() string { + return "mock request" +} + +// Source returns an error to simulate a failure +func (m *MockBulkableRequest) Source() ([]string, error) { + return nil, fmt.Errorf("simulated source error") +}