Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug es-analyzer II: CadenceChangeVersion workflow #6209

Merged
merged 11 commits into from
Aug 7, 2024
32 changes: 16 additions & 16 deletions service/worker/esanalyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test0", 1},
{"test1", 2},
{"test0", float64(1)},
{"test1", float64(2)},
}, nil).Times(1)
},
expectedErr: nil,
Expand Down Expand Up @@ -657,16 +657,16 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1},
{"test-wf-version1", 20},
{"test-wf-version0", float64(10)},
{"test-wf-version1", float64(20)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version3", 10},
{"test-wf-version4", 2},
{"test-wf-version3", float64(10)},
{"test-wf-version4", float64(2)},
}, nil).Times(1)
},
expectedErr: nil,
Expand Down Expand Up @@ -718,8 +718,8 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: domain error"),
Expand All @@ -731,8 +731,8 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1)
},
Expand All @@ -745,16 +745,16 @@ func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
{"test-wf-type0", float64(100)},
{"test-wf-type1", float64(200)},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1.5},
{"test-wf-version0", float64(3.14)},
{"test-wf-version1", 20},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: " +
"test-wf-type0: error: error parsing workflow count for workflow version test-wf-version0"),
"test-wf-type0: error: error parsing workflow count for cadence version test-wf-version1"),
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,17 @@ func (w *Workflow) emitWorkflowTypeCountMetricsPinot(domainName string, logger *
var domainWorkflowTypeCount DomainWorkflowTypeCount
for _, row := range response {
workflowType := row[0].(string)
workflowCount, ok := row[1].(int)

// even though the count is a int, it is returned as a float64, need to pay attention to this
workflowCount, ok := row[1].(float64)
if !ok {
logger.Error("Error parsing workflow count",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
zap.Float64("WorkflowCount", workflowCount),
zap.String("WorkflowCountType", fmt.Sprintf("%T", row[1])),
zap.String("raw data", fmt.Sprintf("%#v", response)),
)
return fmt.Errorf("error parsing workflow count for workflow type %s", workflowType)
}
Expand Down
18 changes: 12 additions & 6 deletions service/worker/esanalyzer/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ WHERE DomainID = '%s'
AND CloseStatus = -1
AND StartTime > 0
AND WorkflowType = '%s'
GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion
GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY')
ORDER BY count DESC
LIMIT 10
`, w.analyzer.pinotTableName, domain.GetInfo().ID, wfType), nil
Expand Down Expand Up @@ -246,12 +246,15 @@ func (w *Workflow) emitWorkflowVersionMetricsPinot(domainName string, logger *za
var domainWorkflowVersionCount DomainWorkflowVersionCount
for _, row := range response {
workflowType := row[0].(string)
workflowCount, ok := row[1].(int)
workflowCount, ok := row[1].(float64)
if !ok {
logger.Error("Error parsing workflow count",
logger.Error("error parsing workflow count for cadence version",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
zap.Float64("WorkflowCount", workflowCount),
zap.String("WorkflowCountType", fmt.Sprintf("%T", row[1])),
zap.String("raw data", fmt.Sprintf("%#v", response)),
)
return fmt.Errorf("error parsing workflow count for workflow type %s", workflowType)
}
Expand Down Expand Up @@ -314,14 +317,17 @@ func (w *Workflow) queryWorkflowVersionsWithType(domainName string, wfType strin
var workflowVersions WorkflowVersionCount
for _, row := range response {
workflowVersion := row[0].(string)
workflowCount, ok := row[1].(int)
workflowCount, ok := row[1].(float64)
if !ok {
logger.Error("Error parsing workflow count",
logger.Error("error parsing workflow count for cadence version",
zap.Error(err),
zap.String("WorkflowVersion", workflowVersion),
zap.String("DomainName", domainName),
zap.Float64("WorkflowCount", workflowCount),
zap.String("WorkflowCountType", fmt.Sprintf("%T", row[1])),
zap.String("raw data", fmt.Sprintf("%#v", response)),
)
return WorkflowVersionCount{}, fmt.Errorf("error parsing workflow count for workflow version %s", workflowVersion)
return WorkflowVersionCount{}, fmt.Errorf("error parsing workflow count for cadence version %s", workflowVersion)
}
workflowVersions.WorkflowVersions = append(workflowVersions.WorkflowVersions, EsAggregateCount{
AggregateKey: workflowVersion,
Expand Down
Loading