Skip to content

Commit

Permalink
Log workflow info also when visibility data goes to DLQ (#3138)
Browse files Browse the repository at this point in the history
* Log workflow info also when visibility data goes to DLQ

* Log workflow info also when visibility data goes to DLQ

* Log workflow info also when visibility data goes to DLQ

* addressed comments

* addressed comments

Co-authored-by: Venkat <venkat@uber.com>
  • Loading branch information
mkolodezny and venkat1109 authored Mar 27, 2020
1 parent 9b46c2e commit a933ae0
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 10 deletions.
44 changes: 36 additions & 8 deletions service/worker/indexer/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/olivere/elastic"
"github.com/uber-go/tally"

"github.com/uber/cadence/.gen/go/indexer"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/collection"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -65,6 +67,7 @@ type (
config *Config
logger log.Logger
metricsClient metrics.Client
msgEncoder codec.BinaryEncoder
}

kafkaMessageWithMetrics struct { // value of esProcessorImpl.mapToKafkaMsg
Expand All @@ -84,11 +87,12 @@ const (

// NewESProcessorAndStart create new ESProcessor and start
func NewESProcessorAndStart(config *Config, client es.Client, processorName string,
logger log.Logger, metricsClient metrics.Client) (ESProcessor, error) {
logger log.Logger, metricsClient metrics.Client, msgEncoder codec.BinaryEncoder) (ESProcessor, error) {
p := &esProcessorImpl{
config: config,
logger: logger.WithTags(tag.ComponentIndexerESProcessor),
metricsClient: metricsClient,
msgEncoder: msgEncoder,
}

params := &es.BulkProcessorParameters{
Expand Down Expand Up @@ -161,8 +165,10 @@ func (p *esProcessorImpl) bulkAfterAction(id int64, requests []elastic.BulkableR
case isResponseSuccess(resp.Status):
p.ackKafkaMsg(key)
case !isResponseRetriable(resp.Status):
wid, rid, domainID := p.getMsgWithInfo(key)
p.logger.Error("ES request failed.",
tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)))
tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)), tag.WorkflowID(wid), tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.nackKafkaMsg(key)
default: // bulk processor will retry
p.logger.Info("ES request retried.", tag.ESResponseStatus(resp.Status))
Expand All @@ -181,13 +187,9 @@ func (p *esProcessorImpl) nackKafkaMsg(key string) {
}

func (p *esProcessorImpl) ackKafkaMsgHelper(key string, nack bool) {
msg, ok := p.mapToKafkaMsg.Get(key)
kafkaMsg, ok := p.getKafkaMsg(key)
if !ok {
return // duplicate kafka message
}
kafkaMsg, ok := msg.(*kafkaMessageWithMetrics)
if !ok { // must be bug in code and bad deployment
p.logger.Fatal("Message is not kafka message.", tag.ESKey(key))
return
}

if nack {
Expand All @@ -199,6 +201,32 @@ func (p *esProcessorImpl) ackKafkaMsgHelper(key string, nack bool) {
p.mapToKafkaMsg.Remove(key)
}

func (p *esProcessorImpl) getKafkaMsg(key string) (kafkaMsg *kafkaMessageWithMetrics, ok bool) {
msg, ok := p.mapToKafkaMsg.Get(key)
if !ok {
return // duplicate kafka message
}
kafkaMsg, ok = msg.(*kafkaMessageWithMetrics)
if !ok { // must be bug in code and bad deployment
p.logger.Fatal("Message is not kafka message.", tag.ESKey(key))
}
return kafkaMsg, ok
}

func (p *esProcessorImpl) getMsgWithInfo(key string) (wid string, rid string, domainID string) {
kafkaMsg, ok := p.getKafkaMsg(key)
if !ok {
return
}

var msg indexer.Message
if err := p.msgEncoder.Decode(kafkaMsg.message.Value(), &msg); err != nil {
p.logger.Error("failed to deserialize kafka message.", tag.Error(err))
return
}
return msg.GetWorkflowID(), msg.GetRunID(), msg.GetDomainID()
}

func (p *esProcessorImpl) hashFn(key interface{}) uint32 {
id, ok := key.(string)
if !ok {
Expand Down
52 changes: 51 additions & 1 deletion service/worker/indexer/esProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/uber/cadence/.gen/go/indexer"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/collection"
es "github.com/uber/cadence/common/elasticsearch"
esMocks "github.com/uber/cadence/common/elasticsearch/mocks"
Expand Down Expand Up @@ -86,6 +89,7 @@ func (s *esProcessorSuite) SetupTest() {
config: config,
logger: loggerimpl.NewLogger(zapLogger),
metricsClient: s.mockMetricClient,
msgEncoder: codec.NewThriftRWEncoder(),
}
p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
p.processor = s.mockBulkProcessor
Expand Down Expand Up @@ -120,7 +124,7 @@ func (s *esProcessorSuite) TestNewESProcessorAndStart() {
s.NotNil(input.AfterFunc)
return true
})).Return(&elastic.BulkProcessor{}, nil).Once()
p, err := NewESProcessorAndStart(config, s.mockESClient, processorName, s.esProcessor.logger, &mmocks.Client{})
p, err := NewESProcessorAndStart(config, s.mockESClient, processorName, s.esProcessor.logger, &mmocks.Client{}, codec.NewThriftRWEncoder())
s.NoError(err)

processor, ok := p.(*esProcessorImpl)
Expand Down Expand Up @@ -240,10 +244,16 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack() {
Items: []map[string]*elastic.BulkResponseItem{mFailed},
}

wid := "test-workflowID"
rid := "test-runID"
domainID := "test-domainID"
payload := s.getEncodedMsg(wid, rid, domainID)

mockKafkaMsg := &msgMocks.Message{}
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Value").Return(payload).Once()
s.esProcessor.bulkAfterAction(0, requests, response, nil)
mockKafkaMsg.AssertExpectations(s.T())
}
Expand Down Expand Up @@ -318,6 +328,46 @@ func (s *esProcessorSuite) TestHashFn() {
s.NotEqual(uint32(0), s.esProcessor.hashFn("test"))
}

func (s *esProcessorSuite) getEncodedMsg(wid string, rid string, domainID string) []byte {
indexMsg := &indexer.Message{
DomainID: common.StringPtr(domainID),
WorkflowID: common.StringPtr(wid),
RunID: common.StringPtr(rid),
}
payload, err := s.esProcessor.msgEncoder.Encode(indexMsg)
s.NoError(err)
return payload
}

func (s *esProcessorSuite) TestGetMsgWithInfo() {
testKey := "test-key"
testWid := "test-workflowID"
testRid := "test-runID"
testDomainid := "test-domainID"
payload := s.getEncodedMsg(testWid, testRid, testDomainid)

mockKafkaMsg := &msgMocks.Message{}
mockKafkaMsg.On("Value").Return(payload).Once()
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)
wid, rid, domainID := s.esProcessor.getMsgWithInfo(testKey)
s.Equal(testWid, wid)
s.Equal(testRid, rid)
s.Equal(testDomainid, domainID)
}

func (s *esProcessorSuite) TestGetMsgInfo_Error() {
testKey := "test-key"
mockKafkaMsg := &msgMocks.Message{}
mockKafkaMsg.On("Value").Return([]byte{}).Once()
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)
wid, rid, domainID := s.esProcessor.getMsgWithInfo(testKey)
s.Equal("", wid)
s.Equal("", rid)
s.Equal("", domainID)
}

func (s *esProcessorSuite) TestGetKeyForKafkaMsg() {
request := elastic.NewBulkIndexRequest()
s.PanicsWithValue("KafkaKey not found", func() { s.esProcessor.getKeyForKafkaMsg(request) })
Expand Down
2 changes: 1 addition & 1 deletion service/worker/indexer/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (p *indexProcessor) Start() error {
return err
}

esProcessor, err := NewESProcessorAndStart(p.config, p.esClient, p.esProcessorName, p.logger, p.metricsClient)
esProcessor, err := NewESProcessorAndStart(p.config, p.esClient, p.esProcessorName, p.logger, p.metricsClient, p.msgEncoder)
if err != nil {
p.logger.Info("", tag.LifeCycleStartFailed, tag.Error(err))
return err
Expand Down

0 comments on commit a933ae0

Please sign in to comment.