Skip to content

Commit

Permalink
Debug es-analyzer II: CadenceChangeVersion workflow (uber#6209)
Browse files Browse the repository at this point in the history
* testing branch, will not commit to main

* add debug messages

* print workflow count type

* find workflowCountType 02

* change workflowTypeCount to float64

* change cadence version query

* change read mode logic back

* update tests

* add a comment

* add comment
  • Loading branch information
bowenxia authored and sankari165 committed Aug 9, 2024
1 parent 7b27e4f commit a520a8a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
32 changes: 16 additions & 16 deletions service/worker/esanalyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test0", 1},
{"test1", 2},
{"test0", float64(1)},
{"test1", float64(2)},
}, nil).Times(1)
},
expectedErr: nil,
Expand Down Expand Up @@ -657,16 +657,16 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1},
{"test-wf-version1", 20},
{"test-wf-version0", float64(10)},
{"test-wf-version1", float64(20)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version3", 10},
{"test-wf-version4", 2},
{"test-wf-version3", float64(10)},
{"test-wf-version4", float64(2)},
}, nil).Times(1)
},
expectedErr: nil,
Expand Down Expand Up @@ -718,8 +718,8 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: domain error"),
Expand All @@ -731,8 +731,8 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1)
},
Expand All @@ -745,16 +745,16 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1.5},
{"test-wf-version0", float64(3.14)},
{"test-wf-version1", 20},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: " +
"test-wf-type0: error: error parsing workflow count for workflow version test-wf-version0"),
"test-wf-type0: error: error parsing workflow count for cadence version test-wf-version1"),
},
}

Expand Down
7 changes: 6 additions & 1 deletion service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,17 @@ func (w *Workflow) emitWorkflowTypeCountMetricsPinot(domainName string, logger *
var domainWorkflowTypeCount DomainWorkflowTypeCount
for _, row := range response {
workflowType := row[0].(string)
workflowCount, ok := row[1].(int)

// even though the count is a int, it is returned as a float64, need to pay attention to this
workflowCount, ok := row[1].(float64)
if !ok {
logger.Error("Error parsing workflow count",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
zap.Float64("WorkflowCount", workflowCount),
zap.String("WorkflowCountType", fmt.Sprintf("%T", row[1])),
zap.String("raw data", fmt.Sprintf("%#v", response)),
)
return fmt.Errorf("error parsing workflow count for workflow type %s", workflowType)
}
Expand Down
18 changes: 12 additions & 6 deletions service/worker/esanalyzer/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ WHERE DomainID = '%s'
AND CloseStatus = -1
AND StartTime > 0
AND WorkflowType = '%s'
GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion
GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY')
ORDER BY count DESC
LIMIT 10
`, w.analyzer.pinotTableName, domain.GetInfo().ID, wfType), nil
Expand Down Expand Up @@ -246,12 +246,15 @@ func (w *Workflow) emitWorkflowVersionMetricsPinot(domainName string, logger *za
var domainWorkflowVersionCount DomainWorkflowVersionCount
for _, row := range response {
workflowType := row[0].(string)
workflowCount, ok := row[1].(int)
workflowCount, ok := row[1].(float64)
if !ok {
logger.Error("Error parsing workflow count",
logger.Error("error parsing workflow count for cadence version",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
zap.Float64("WorkflowCount", workflowCount),
zap.String("WorkflowCountType", fmt.Sprintf("%T", row[1])),
zap.String("raw data", fmt.Sprintf("%#v", response)),
)
return fmt.Errorf("error parsing workflow count for workflow type %s", workflowType)
}
Expand Down Expand Up @@ -314,14 +317,17 @@ func (w *Workflow) queryWorkflowVersionsWithType(domainName string, wfType strin
var workflowVersions WorkflowVersionCount
for _, row := range response {
workflowVersion := row[0].(string)
workflowCount, ok := row[1].(int)
workflowCount, ok := row[1].(float64)
if !ok {
logger.Error("Error parsing workflow count",
logger.Error("error parsing workflow count for cadence version",
zap.Error(err),
zap.String("WorkflowVersion", workflowVersion),
zap.String("DomainName", domainName),
zap.Float64("WorkflowCount", workflowCount),
zap.String("WorkflowCountType", fmt.Sprintf("%T", row[1])),
zap.String("raw data", fmt.Sprintf("%#v", response)),
)
return WorkflowVersionCount{}, fmt.Errorf("error parsing workflow count for workflow version %s", workflowVersion)
return WorkflowVersionCount{}, fmt.Errorf("error parsing workflow count for cadence version %s", workflowVersion)
}
workflowVersions.WorkflowVersions = append(workflowVersions.WorkflowVersions, EsAggregateCount{
AggregateKey: workflowVersion,
Expand Down

0 comments on commit a520a8a

Please sign in to comment.