diff --git a/common/log/tag/values.go b/common/log/tag/values.go index 51437e132c8..bc0b065cbbd 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -252,7 +252,6 @@ var ( StoreOperationGetClosedWorkflowExecution = storeOperation("get-closed-wf-execution") StoreOperationVisibilityDeleteWorkflowExecution = storeOperation("vis-delete-wf-execution") StoreOperationListWorkflowExecutions = storeOperation("list-wf-executions") - StoreOperationListAllWorkflowExecutions = storeOperation("list-all-wf-executions") StoreOperationScanWorkflowExecutions = storeOperation("scan-wf-executions") StoreOperationCountWorkflowExecutions = storeOperation("count-wf-executions") StoreOperationDeleteUninitializedWorkflowExecution = storeOperation("delete-uninitialized-wf-execution") diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 40ea006019e..8d443672d3b 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -270,8 +270,6 @@ const ( PersistenceDeleteUninitializedWorkflowExecutionScope // PersistenceListWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer PersistenceListWorkflowExecutionsScope - // PersistenceListAllWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer - PersistenceListAllWorkflowExecutionsScope // PersistenceScanWorkflowExecutionsScope tracks ScanWorkflowExecutions calls made by service to persistence layer PersistenceScanWorkflowExecutionsScope // PersistenceCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer @@ -753,8 +751,6 @@ const ( ElasticsearchGetClosedWorkflowExecutionScope // ElasticsearchListWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer ElasticsearchListWorkflowExecutionsScope - // ElasticsearchListAllWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer - ElasticsearchListAllWorkflowExecutionsScope // ElasticsearchScanWorkflowExecutionsScope tracks ScanWorkflowExecutions calls made by service to persistence layer ElasticsearchScanWorkflowExecutionsScope // ElasticsearchCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer @@ -790,8 +786,6 @@ const ( PinotGetClosedWorkflowExecutionScope // PinotListWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer PinotListWorkflowExecutionsScope - // PinotListAllWorkflowExecutionsScope tracks ListAllWorkflowExecutions calls made by service to persistence layer - PinotListAllWorkflowExecutionsScope // PinotScanWorkflowExecutionsScope tracks ScanWorkflowExecutions calls made by service to persistence layer PinotScanWorkflowExecutionsScope // PinotCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer @@ -1441,7 +1435,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceVisibilityDeleteWorkflowExecutionScope: {operation: "VisibilityDeleteWorkflowExecution"}, PersistenceDeleteUninitializedWorkflowExecutionScope: {operation: "VisibilityDeleteUninitializedWorkflowExecution"}, PersistenceListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"}, - PersistenceListAllWorkflowExecutionsScope: {operation: "ListAllWorkflowExecutions"}, PersistenceScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"}, PersistenceCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"}, PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes"}, @@ -1685,7 +1678,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ ElasticsearchListClosedWorkflowExecutionsByStatusScope: {operation: "ListClosedWorkflowExecutionsByStatus"}, ElasticsearchGetClosedWorkflowExecutionScope: {operation: "GetClosedWorkflowExecution"}, ElasticsearchListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"}, - ElasticsearchListAllWorkflowExecutionsScope: {operation: "ListAllWorkflowExecutions"}, ElasticsearchScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"}, ElasticsearchCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"}, ElasticsearchDeleteWorkflowExecutionsScope: {operation: "DeleteWorkflowExecution"}, @@ -1703,7 +1695,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PinotListClosedWorkflowExecutionsByStatusScope: {operation: "ListClosedWorkflowExecutionsByStatus"}, PinotGetClosedWorkflowExecutionScope: {operation: "GetClosedWorkflowExecution"}, PinotListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"}, - PinotListAllWorkflowExecutionsScope: {operation: "ListAllWorkflowExecutions"}, PinotScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"}, PinotCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"}, PinotDeleteWorkflowExecutionsScope: {operation: "DeleteWorkflowExecution"}, diff --git a/common/mocks/VisibilityManager.go b/common/mocks/VisibilityManager.go index f3f73adae40..e45c4f4aad0 100644 --- a/common/mocks/VisibilityManager.go +++ b/common/mocks/VisibilityManager.go @@ -156,36 +156,6 @@ func (_m *VisibilityManager) GetName() string { return r0 } -// ListAllWorkflowExecutions provides a mock function with given fields: ctx, request -func (_m *VisibilityManager) ListAllWorkflowExecutions(ctx context.Context, request *persistence.ListAllWorkflowExecutionsRequest) (*persistence.ListWorkflowExecutionsResponse, error) { - ret := _m.Called(ctx, request) - - if len(ret) == 0 { - panic("no return value specified for ListAllWorkflowExecutions") - } - - var r0 *persistence.ListWorkflowExecutionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *persistence.ListAllWorkflowExecutionsRequest) (*persistence.ListWorkflowExecutionsResponse, error)); ok { - return rf(ctx, request) - } - if rf, ok := ret.Get(0).(func(context.Context, *persistence.ListAllWorkflowExecutionsRequest) *persistence.ListWorkflowExecutionsResponse); ok { - r0 = rf(ctx, request) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*persistence.ListWorkflowExecutionsResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *persistence.ListAllWorkflowExecutionsRequest) error); ok { - r1 = rf(ctx, request) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // ListClosedWorkflowExecutions provides a mock function with given fields: ctx, request func (_m *VisibilityManager) ListClosedWorkflowExecutions(ctx context.Context, request *persistence.ListWorkflowExecutionsRequest) (*persistence.ListWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) diff --git a/common/persistence/dataVisibilityManagerInterfaces.go b/common/persistence/dataVisibilityManagerInterfaces.go index c2fdc5ad604..0e15451ecbe 100644 --- a/common/persistence/dataVisibilityManagerInterfaces.go +++ b/common/persistence/dataVisibilityManagerInterfaces.go @@ -133,16 +133,6 @@ type ( NextPageToken []byte } - // ListAllWorkflowExecutionsRequest is used to list all executions in a domain - ListAllWorkflowExecutionsRequest struct { - ListWorkflowExecutionsRequest - PartialMatch bool - StatusFilter []types.WorkflowExecutionCloseStatus - WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID - SortColumn string // This should be a valid search attribute - SortOrder string // DESC or ASC - } - // ListWorkflowExecutionsByQueryRequest is used to list executions in a domain ListWorkflowExecutionsByQueryRequest struct { DomainUUID string @@ -235,7 +225,6 @@ type ( ListClosedWorkflowExecutionsByStatus(ctx context.Context, request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error) DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*ListWorkflowExecutionsResponse, error) - ListAllWorkflowExecutions(ctx context.Context, request *ListAllWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*ListWorkflowExecutionsResponse, error) CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error) // NOTE: GetClosedWorkflowExecution is only for persistence testing, currently no index is supported for filtering by RunID diff --git a/common/persistence/dataVisibilityManagerInterfaces_mock.go b/common/persistence/dataVisibilityManagerInterfaces_mock.go index 13bedc3bdda..4b6c49ba259 100644 --- a/common/persistence/dataVisibilityManagerInterfaces_mock.go +++ b/common/persistence/dataVisibilityManagerInterfaces_mock.go @@ -140,21 +140,6 @@ func (mr *MockVisibilityManagerMockRecorder) GetName() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetName", reflect.TypeOf((*MockVisibilityManager)(nil).GetName)) } -// ListAllWorkflowExecutions mocks base method. -func (m *MockVisibilityManager) ListAllWorkflowExecutions(arg0 context.Context, arg1 *ListAllWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListAllWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*ListWorkflowExecutionsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListAllWorkflowExecutions indicates an expected call of ListAllWorkflowExecutions. -func (mr *MockVisibilityManagerMockRecorder) ListAllWorkflowExecutions(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllWorkflowExecutions", reflect.TypeOf((*MockVisibilityManager)(nil).ListAllWorkflowExecutions), arg0, arg1) -} - // ListClosedWorkflowExecutions mocks base method. func (m *MockVisibilityManager) ListClosedWorkflowExecutions(arg0 context.Context, arg1 *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/data_store_interfaces.go b/common/persistence/data_store_interfaces.go index 42f1cf70bbb..c05a7c9c58e 100644 --- a/common/persistence/data_store_interfaces.go +++ b/common/persistence/data_store_interfaces.go @@ -175,7 +175,6 @@ type ( GetClosedWorkflowExecution(ctx context.Context, request *InternalGetClosedWorkflowExecutionRequest) (*InternalGetClosedWorkflowExecutionResponse, error) DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error) - ListAllWorkflowExecutions(ctx context.Context, request *InternalListAllWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error) ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error) CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error) DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error @@ -702,16 +701,6 @@ type ( WorkflowTypeName string } - // InternalListAllWorkflowExecutionsByTypeRequest is used to list all open and closed executions with specific filters in a domain - InternalListAllWorkflowExecutionsByTypeRequest struct { - InternalListWorkflowExecutionsRequest - StatusFilter []types.WorkflowExecutionCloseStatus - PartialMatch bool - WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID - SortColumn string // This should be a valid search attribute - SortOrder string // DESC or ASC - } - // InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution InternalGetClosedWorkflowExecutionResponse struct { Execution *InternalVisibilityWorkflowExecutionInfo diff --git a/common/persistence/elasticsearch/es_visibility_metric_clients.go b/common/persistence/elasticsearch/es_visibility_metric_clients.go index 17d755ead8c..d5d7687b0bb 100644 --- a/common/persistence/elasticsearch/es_visibility_metric_clients.go +++ b/common/persistence/elasticsearch/es_visibility_metric_clients.go @@ -276,25 +276,6 @@ func (p *visibilityMetricsClient) GetClosedWorkflowExecution( return response, err } -func (p *visibilityMetricsClient) ListAllWorkflowExecutions( - ctx context.Context, - request *p.ListAllWorkflowExecutionsRequest, -) (*p.ListWorkflowExecutionsResponse, error) { - - scopeWithDomainTag := p.metricClient.Scope(metrics.ElasticsearchListAllWorkflowExecutionsScope, metrics.DomainTag(request.Domain)) - scopeWithDomainTag.IncCounter(metrics.ElasticsearchRequestsPerDomain) - - sw := scopeWithDomainTag.StartTimer(metrics.ElasticsearchLatencyPerDomain) - response, err := p.persistence.ListAllWorkflowExecutions(ctx, request) - sw.Stop() - - if err != nil { - p.updateErrorMetric(scopeWithDomainTag, metrics.ElasticsearchListAllWorkflowExecutionsScope, err) - } - - return response, err -} - func (p *visibilityMetricsClient) ListWorkflowExecutions( ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest, diff --git a/common/persistence/elasticsearch/es_visibility_store.go b/common/persistence/elasticsearch/es_visibility_store.go index f2f889de7ce..c9a42f1c48d 100644 --- a/common/persistence/elasticsearch/es_visibility_store.go +++ b/common/persistence/elasticsearch/es_visibility_store.go @@ -50,8 +50,7 @@ import ( ) const ( - esPersistenceName = "elasticsearch" - oneMicroSecondInNano = int64(time.Microsecond / time.Nanosecond) + esPersistenceName = "elasticsearch" ) type ( @@ -443,41 +442,6 @@ func (v *esVisibilityStore) ListWorkflowExecutions( return resp, nil } -func (v *esVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) { - isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { - return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) - } - - if request.PageSize == 0 { - request.PageSize = 1000 - } - - token, err := es.GetNextPageToken(request.NextPageToken) - if err != nil { - return nil, err - } - - queryDSL, err := v.getESQueryDSLForListAll(request, token) - if err != nil { - return nil, &types.BadRequestError{Message: fmt.Sprintf("Error when building query: %v", err)} - } - - resp, err := v.esClient.SearchByQuery(ctx, &es.SearchByQueryRequest{ - Index: v.index, - Query: queryDSL, - NextPageToken: request.NextPageToken, - PageSize: request.PageSize, - Filter: isRecordValid, - MaxResultWindow: v.config.ESIndexMaxResultWindow(), - }) - if err != nil { - return nil, &types.InternalServiceError{ - Message: fmt.Sprintf("ListAllWorkflowExecutions failed, %v", err), - } - } - return resp, nil -} - func (v *esVisibilityStore) ScanWorkflowExecutions( ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest, @@ -599,11 +563,6 @@ func (v *esVisibilityStore) getESQueryDSL(request *p.ListWorkflowExecutionsByQue return v.processedDSLfromSQL(sql, request.DomainUUID, token) } -func (v *esVisibilityStore) getESQueryDSLForListAll(request *p.InternalListAllWorkflowExecutionsByTypeRequest, token *es.ElasticVisibilityPageToken) (string, error) { - sql := getSQLFromListAllRequest(request) - return v.processedDSLfromSQL(sql, request.DomainUUID, token) -} - func (v *esVisibilityStore) processedDSLfromSQL(sql, domainUUID string, token *es.ElasticVisibilityPageToken) (string, error) { dsl, err := getCustomizedDSLFromSQL(sql, domainUUID) if err != nil { @@ -643,56 +602,6 @@ func getSQLFromListRequest(request *p.ListWorkflowExecutionsByQueryRequest) stri return sql } -func getSQLFromListAllRequest(request *p.InternalListAllWorkflowExecutionsByTypeRequest) string { - sql := "select * from dummy " - var earliestTimeStr, latestTimeStr, timeRange, searchString, whereClause, orderByClause string - - if !request.EarliestTime.IsZero() && !request.LatestTime.IsZero() { - earliestTimeStr = strconv.FormatInt(request.EarliestTime.UnixNano()-oneMicroSecondInNano, 10) - latestTimeStr = strconv.FormatInt(request.LatestTime.UnixNano()+oneMicroSecondInNano, 10) - timeRange = fmt.Sprintf("%s between %s and %s", es.StartTime, earliestTimeStr, latestTimeStr) - whereClause = addToWhereClause(whereClause, timeRange) - } - - statusFilters := make([]string, 0, len(request.StatusFilter)) - for _, status := range request.StatusFilter { - statusFilters = append(statusFilters, fmt.Sprintf("%s = %d", es.CloseStatus, status)) - } - statusFilterString := strings.Join(statusFilters, " or ") - whereClause = addToWhereClause(whereClause, statusFilterString) - - if request.WorkflowSearchValue != "" { - searchString = fmt.Sprintf("%s = \"%s\" or %s = \"%s\" or %s = \"%s\" ", - es.WorkflowID, request.WorkflowSearchValue, - es.RunID, request.WorkflowSearchValue, - es.WorkflowType, request.WorkflowSearchValue) - whereClause = addToWhereClause(whereClause, searchString) - } - - if whereClause != "" { - sql += whereClause - } - - if request.SortColumn != "" && request.SortOrder != "" { - orderByClause = fmt.Sprintf(" order by %s %s", request.SortColumn, request.SortOrder) - sql += orderByClause - } - - return fmt.Sprintf("%s limit %d", sql, request.PageSize) -} - -func addToWhereClause(whereClause, condition string) string { - if condition == "" { - return whereClause - } - if whereClause == "" { - return fmt.Sprintf("where (%s)", condition) - } - - whereClause += fmt.Sprintf(" and (%s)", condition) - return whereClause -} - func getSQLFromCountRequest(request *p.CountWorkflowExecutionsRequest) string { var sql string if strings.TrimSpace(request.Query) == "" { diff --git a/common/persistence/elasticsearch/es_visibility_store_test.go b/common/persistence/elasticsearch/es_visibility_store_test.go index ec28b3c96d4..bd7a3d8987c 100644 --- a/common/persistence/elasticsearch/es_visibility_store_test.go +++ b/common/persistence/elasticsearch/es_visibility_store_test.go @@ -359,60 +359,6 @@ func (s *ESVisibilitySuite) TestListOpenWorkflowExecutionsByType() { s.True(strings.Contains(err.Error(), "ListOpenWorkflowExecutionsByType failed")) } -func (s *ESVisibilitySuite) TestListAllWorkflowExecutions() { - s.mockESClient.On("SearchByQuery", mock.Anything, mock.MatchedBy(func(input *es.SearchByQueryRequest) bool { - s.True(strings.Contains(input.Query, `{"match_phrase":{"WorkflowID":{"query":"123"}}}`)) - s.True(strings.Contains(input.Query, `{"match_phrase":{"WorkflowType":{"query":"123"}}}`)) - s.True(strings.Contains(input.Query, `{"match_phrase":{"RunID":{"query":"123"}}}`)) - s.Equal(esIndexMaxResultWindow, input.MaxResultWindow) - return true - })).Return(testSearchResult, nil).Once() - - request := &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.UnixMilli(testEarliestTime), - LatestTime: time.UnixMilli(testLatestTime), - PageSize: 10, - }, - StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusCanceled, types.WorkflowExecutionCloseStatusFailed}, - WorkflowSearchValue: "123", - SortOrder: "DESC", - SortColumn: es.WorkflowID, - } - - ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout) - defer cancel() - - _, err := s.visibilityStore.ListAllWorkflowExecutions(ctx, request) - s.NoError(err) - - s.mockESClient.On("SearchByQuery", mock.Anything, mock.Anything).Return(nil, errTestESSearch).Once() - _, err = s.visibilityStore.ListAllWorkflowExecutions(ctx, request) - s.Error(err) - _, ok := err.(*types.InternalServiceError) - s.True(ok) - s.True(strings.Contains(err.Error(), "ListAllWorkflowExecutions failed")) - - invalidRequest := &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.UnixMilli(testEarliestTime), - LatestTime: time.UnixMilli(testLatestTime), - }, - SortColumn: ")", - SortOrder: "DESC", - } - _, err = s.visibilityStore.ListAllWorkflowExecutions(ctx, invalidRequest) - s.Error(err) - _, ok = err.(*types.BadRequestError) - s.True(ok) - s.True(strings.Contains(err.Error(), "Error when building query")) - -} - func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByType() { s.mockESClient.On("Search", mock.Anything, mock.MatchedBy(func(input *es.SearchRequest) bool { s.False(input.IsOpen) diff --git a/common/persistence/nosql/nosql_visibility_store.go b/common/persistence/nosql/nosql_visibility_store.go index 13e5d8e1043..78df3bc85de 100644 --- a/common/persistence/nosql/nosql_visibility_store.go +++ b/common/persistence/nosql/nosql_visibility_store.go @@ -372,13 +372,6 @@ func (v *nosqlVisibilityStore) ListWorkflowExecutions( return nil, persistence.ErrVisibilityOperationNotSupported } -func (v *nosqlVisibilityStore) ListAllWorkflowExecutions( - ctx context.Context, - request *persistence.InternalListAllWorkflowExecutionsByTypeRequest, -) (*persistence.InternalListWorkflowExecutionsResponse, error) { - return nil, persistence.ErrVisibilityOperationNotSupported -} - func (v *nosqlVisibilityStore) ScanWorkflowExecutions( _ context.Context, _ *persistence.ListWorkflowExecutionsByQueryRequest) (*persistence.InternalListWorkflowExecutionsResponse, error) { diff --git a/common/persistence/pinot/pinot_visibility_metric_clients.go b/common/persistence/pinot/pinot_visibility_metric_clients.go index 848aa4ebbc4..44f15d469d7 100644 --- a/common/persistence/pinot/pinot_visibility_metric_clients.go +++ b/common/persistence/pinot/pinot_visibility_metric_clients.go @@ -256,25 +256,6 @@ func (p *pinotVisibilityMetricsClient) ListClosedWorkflowExecutionsByStatus( return response, err } -func (p *pinotVisibilityMetricsClient) ListAllWorkflowExecutions( - ctx context.Context, - request *p.ListAllWorkflowExecutionsRequest, -) (*p.ListWorkflowExecutionsResponse, error) { - - scopeWithDomainTag := p.metricClient.Scope(metrics.PinotListAllWorkflowExecutionsScope, metrics.DomainTag(request.Domain)) - scopeWithDomainTag.IncCounter(metrics.PinotRequestsPerDomain) - - sw := scopeWithDomainTag.StartTimer(metrics.PinotLatencyPerDomain) - defer sw.Stop() - response, err := p.persistence.ListAllWorkflowExecutions(ctx, request) - - if err != nil { - p.updateErrorMetric(scopeWithDomainTag, metrics.PinotListAllWorkflowExecutionsScope, err) - } - - return response, err -} - func (p *pinotVisibilityMetricsClient) GetClosedWorkflowExecution( ctx context.Context, request *p.GetClosedWorkflowExecutionRequest, diff --git a/common/persistence/pinot/pinot_visibility_store.go b/common/persistence/pinot/pinot_visibility_store.go index cebcb65c78d..7ab433f5984 100644 --- a/common/persistence/pinot/pinot_visibility_store.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -363,28 +363,6 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co return v.pinotClient.Search(req) } -func (v *pinotVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) { - isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { - return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) - } - - query, err := v.getListAllWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request) - if err != nil { - v.logger.Error(fmt.Sprintf("failed to build list all workflow executions query %v", err)) - return nil, err - } - - req := &pnt.SearchRequest{ - Query: query, - IsOpen: true, - Filter: isRecordValid, - MaxResultWindow: v.config.ESIndexMaxResultWindow(), - ListRequest: &request.InternalListWorkflowExecutionsRequest, - } - - return v.pinotClient.Search(req) -} - func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) @@ -685,7 +663,6 @@ func isTimeStruct(value []byte) ([]byte, error) { type PinotQuery struct { query string filters PinotQueryFilter - search PinotQuerySearchField sorters string limits string } @@ -694,50 +671,10 @@ type PinotQueryFilter struct { string } -type PinotQuerySearchField struct { - string -} - -func (s *PinotQuerySearchField) checkFirstSearchField() { - if s.string == "" { - s.string = "( " - } else { - s.string += "OR " - } -} - -func (s *PinotQuerySearchField) lastSearchField() { - if s.string != "" { - s.string += " )" - } -} - -func (s *PinotQuerySearchField) resetSearchField() { - s.string = "" -} - -func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) { - s.checkFirstSearchField() - switch val.(type) { - case string: - s.string += fmt.Sprintf("%s = '%s'\n", obj, val) - case int32: - s.string += fmt.Sprintf("%s = %d\n", obj, val) - default: - s.string += fmt.Sprintf("%s = %v\n", obj, val) - } -} - -func (s *PinotQuerySearchField) addMatch(obj string, val interface{}) { - s.checkFirstSearchField() - s.string += fmt.Sprintf("REGEXP_LIKE(%s, '^.*%s.*$')\n", obj, val) -} - func NewPinotQuery(tableName string) PinotQuery { return PinotQuery{ query: fmt.Sprintf("SELECT *\nFROM %s\n", tableName), filters: PinotQueryFilter{}, - search: PinotQuerySearchField{}, sorters: "", limits: "", } @@ -773,16 +710,6 @@ func (q *PinotQuery) addOffsetAndLimits(offset int, limit int) { q.limits += fmt.Sprintf("LIMIT %d, %d\n", offset, limit) } -func (q *PinotQuery) addStatusFilters(status []types.WorkflowExecutionCloseStatus) { - for _, s := range status { - q.search.addEqual(CloseStatus, int32(s)) - } - - q.search.lastSearchField() - q.filters.addQuery(q.search.string) - q.search.resetSearchField() -} - func (f *PinotQueryFilter) checkFirstFilter() { if f.string == "" { f.string = "WHERE " @@ -1053,67 +980,6 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor return query.String(), nil } -func (v *pinotVisibilityStore) getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (string, error) { - if request == nil { - return "", nil - } - - query := NewPinotQuery(tableName) - - query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) - - if !request.EarliestTime.IsZero() && !request.LatestTime.IsZero() { - earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano - latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano - query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds - } - - if v.validSortInput(request.SortColumn, request.SortOrder) { - query.addPinotSorter(request.SortColumn, request.SortOrder) - } else { - // fallback to sorting by StartTime in descending order - query.addPinotSorter(StartTime, DescendingOrder) - } - - token, err := pnt.GetNextPageToken(request.NextPageToken) - if err != nil { - return "", fmt.Errorf("next page token: %w", err) - } - - from := token.From - pageSize := request.PageSize - query.addOffsetAndLimits(from, pageSize) - - if request.StatusFilter != nil { - query.addStatusFilters(request.StatusFilter) - } - - if request.WorkflowSearchValue != "" { - if request.PartialMatch { - query.search.addMatch(WorkflowID, request.WorkflowSearchValue) - query.search.addMatch(WorkflowType, request.WorkflowSearchValue) - query.search.addMatch(RunID, request.WorkflowSearchValue) - } else { - query.search.addEqual(WorkflowID, request.WorkflowSearchValue) - query.search.addEqual(WorkflowType, request.WorkflowSearchValue) - query.search.addEqual(RunID, request.WorkflowSearchValue) - } - - query.search.lastSearchField() - query.filters.addQuery(query.search.string) - } - - return query.String(), nil -} - -func (v *pinotVisibilityStore) validSortInput(sortColumn, sortOrder string) bool { - validSortColumn := v.pinotQueryValidator.IsValidSearchAttributes(sortColumn) - validSortOrder := sortOrder == DescendingOrder || sortOrder == AscendingOrder - - return validSortColumn && validSortOrder -} - func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) { if request == nil { return "", nil diff --git a/common/persistence/pinot/pinot_visibility_store_test.go b/common/persistence/pinot/pinot_visibility_store_test.go index 6cdef6c11f2..1334a57e01e 100644 --- a/common/persistence/pinot/pinot_visibility_store_test.go +++ b/common/persistence/pinot/pinot_visibility_store_test.go @@ -938,89 +938,6 @@ func TestListWorkflowExecutions(t *testing.T) { } } -func TestListAllWorkflowExecutions(t *testing.T) { - tests := map[string]struct { - request *p.InternalListAllWorkflowExecutionsByTypeRequest - expectedResp *p.InternalListWorkflowExecutionsResponse - pinotClientMockAffordance func(mockPinotClient *pnt.MockGenericClient) - expectedError error - }{ - "successful request": { - request: &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: 10, - NextPageToken: nil, - }, - PartialMatch: false, - WorkflowSearchValue: "", - }, - expectedResp: &p.InternalListWorkflowExecutionsResponse{ - Executions: []*p.InternalVisibilityWorkflowExecutionInfo{ - { - DomainID: DomainID, - }, - }, - }, - pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { - mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) - mockPinotClient.EXPECT().Search(gomock.Any()).Return(&pnt.SearchResponse{ - Executions: []*p.InternalVisibilityWorkflowExecutionInfo{ - { - DomainID: DomainID, - }, - }, - }, nil).Times(1) - }, - expectedError: nil, - }, - "error request": { - request: &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: 10, - NextPageToken: []byte("error"), - }, - PartialMatch: false, - WorkflowSearchValue: "", - }, - expectedResp: nil, - pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { - mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) - }, - expectedError: errNextPageToken, - }, - } - for name, test := range tests { - t.Run(name, func(t *testing.T) { - ctrl := gomock.NewController(t) - mockPinotClient := pnt.NewMockGenericClient(ctrl) - mockProducer := &mocks.KafkaProducer{} - mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ - ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), - ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), - }, mockProducer, log.NewNoop()) - visibilityStore := mgr.(*pinotVisibilityStore) - - test.pinotClientMockAffordance(mockPinotClient) - - resp, err := visibilityStore.ListAllWorkflowExecutions(context.Background(), test.request) - assert.Equal(t, test.expectedResp, resp) - if test.expectedError != nil { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestScanWorkflowExecutions(t *testing.T) { errorRequest := &p.ListWorkflowExecutionsByQueryRequest{ NextPageToken: []byte("error"), @@ -1771,159 +1688,6 @@ LIMIT 0, 10 }) } } -func TestGetListAllWorkflowExecutionsQuery(t *testing.T) { - tests := map[string]struct { - inputRequest *p.InternalListAllWorkflowExecutionsByTypeRequest - expectResult string - expectError error - }{ - "complete request with exact match and default sorting": { - inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - }, - PartialMatch: false, - WorkflowSearchValue: "123", - StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusCompleted, types.WorkflowExecutionCloseStatusTimedOut}, - }, - expectResult: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND ( CloseStatus = 0 -OR CloseStatus = 5 - ) -AND ( WorkflowID = '123' -OR WorkflowType = '123' -OR RunID = '123' - ) -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - expectError: nil, - }, - "complete request with exact match with valid custom sorting": { - inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - }, - PartialMatch: false, - WorkflowSearchValue: "123", - StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusTerminated}, - SortColumn: CloseTime, - SortOrder: AscendingOrder, - }, - expectResult: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND ( CloseStatus = 3 - ) -AND ( WorkflowID = '123' -OR WorkflowType = '123' -OR RunID = '123' - ) -Order BY CloseTime ASC -LIMIT 0, 10 -`, testTableName), - expectError: nil, - }, - "complete request with exact match with invalid custom sorting": { - inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - }, - PartialMatch: false, - WorkflowSearchValue: "123", - SortColumn: "EndTime", - SortOrder: AscendingOrder, - }, - expectResult: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND ( WorkflowID = '123' -OR WorkflowType = '123' -OR RunID = '123' - ) -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - expectError: nil, - }, - "complete request with partial match and default sorting": { - inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - }, - PartialMatch: true, - WorkflowSearchValue: "123", - }, - expectResult: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND ( REGEXP_LIKE(WorkflowID, '^.*123.*$') -OR REGEXP_LIKE(WorkflowType, '^.*123.*$') -OR REGEXP_LIKE(RunID, '^.*123.*$') - ) -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - expectError: nil, - }, - "empty request": { - inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{}, - expectResult: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = '' -AND IsDeleted = false -Order BY StartTime DESC -LIMIT 0, 0 -`, testTableName), - expectError: nil, - }, - } - ctrl := gomock.NewController(t) - mockPinotClient := pnt.NewMockGenericClient(ctrl) - mockProducer := &mocks.KafkaProducer{} - mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ - ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), - ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), - }, mockProducer, log.NewNoop()) - visibilityStore := mgr.(*pinotVisibilityStore) - for name, test := range tests { - t.Run(name, func(t *testing.T) { - actualResult, actualError := visibilityStore.getListAllWorkflowExecutionsQuery(testTableName, test.inputRequest) - assert.Equal(t, test.expectResult, actualResult) - assert.NoError(t, actualError) - }) - } -} func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { tests := map[string]struct { diff --git a/common/persistence/pinot_visibility_dual_manager.go b/common/persistence/pinot_visibility_dual_manager.go index 736688649c1..de4c80b8e50 100644 --- a/common/persistence/pinot_visibility_dual_manager.go +++ b/common/persistence/pinot_visibility_dual_manager.go @@ -299,14 +299,6 @@ func (v *pinotVisibilityDualManager) ListWorkflowExecutions( return manager.ListWorkflowExecutions(ctx, request) } -func (v *pinotVisibilityDualManager) ListAllWorkflowExecutions( - ctx context.Context, - request *ListAllWorkflowExecutionsRequest, -) (*ListWorkflowExecutionsResponse, error) { - manager := v.chooseVisibilityManagerForRead(request.Domain) - return manager.ListAllWorkflowExecutions(ctx, request) -} - func (v *pinotVisibilityDualManager) ScanWorkflowExecutions( ctx context.Context, request *ListWorkflowExecutionsByQueryRequest, diff --git a/common/persistence/pinot_visibility_dual_manager_test.go b/common/persistence/pinot_visibility_dual_manager_test.go index 40a94725d08..385696da8b4 100644 --- a/common/persistence/pinot_visibility_dual_manager_test.go +++ b/common/persistence/pinot_visibility_dual_manager_test.go @@ -724,85 +724,6 @@ func TestPinotDualListClosedWorkflowExecutions(t *testing.T) { } } -func TestPinotDualListAllWorkflowExecutions(t *testing.T) { - request := &ListAllWorkflowExecutionsRequest{ - ListWorkflowExecutionsRequest: ListWorkflowExecutionsRequest{ - Domain: "test-domain", - }, - PartialMatch: false, - WorkflowSearchValue: "", - } - - // put this outside because need to use it as an input of the table tests - ctrl := gomock.NewController(t) - - tests := map[string]struct { - request *ListAllWorkflowExecutionsRequest - mockDBVisibilityManager VisibilityManager - mockPinotVisibilityManager VisibilityManager - mockDBVisibilityManagerAccordance func(mockDBVisibilityManager *MockVisibilityManager) - mockPinotVisibilityManagerAccordance func(mockPinotVisibilityManager *MockVisibilityManager) - readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter - expectedError error - }{ - "Case1-1: success case with DB visibility is not nil": { - request: request, - mockDBVisibilityManager: NewMockVisibilityManager(ctrl), - mockDBVisibilityManagerAccordance: func(mockDBVisibilityManager *MockVisibilityManager) { - mockDBVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - expectedError: nil, - }, - "Case1-2: success case with Pinot visibility is not nil": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAccordance: func(mockPinotVisibilityManager *MockVisibilityManager) { - mockPinotVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - expectedError: nil, - }, - "Case2-1: success case with DB visibility is not nil and read mod is false": { - request: request, - mockDBVisibilityManager: NewMockVisibilityManager(ctrl), - mockDBVisibilityManagerAccordance: func(mockDBVisibilityManager *MockVisibilityManager) { - mockDBVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - expectedError: nil, - }, - "Case2-2: success case with Pinot visibility is not nil and read mod is false": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAccordance: func(mockPinotVisibilityManager *MockVisibilityManager) { - mockPinotVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - expectedError: nil, - }, - } - for name, test := range tests { - t.Run(name, func(t *testing.T) { - if test.mockDBVisibilityManager != nil { - test.mockDBVisibilityManagerAccordance(test.mockDBVisibilityManager.(*MockVisibilityManager)) - } - if test.mockPinotVisibilityManager != nil { - test.mockPinotVisibilityManagerAccordance(test.mockPinotVisibilityManager.(*MockVisibilityManager)) - } - visibilityManager := NewPinotVisibilityDualManager(test.mockDBVisibilityManager, - test.mockPinotVisibilityManager, test.readModeIsFromES, nil, log.NewNoop()) - - _, err := visibilityManager.ListAllWorkflowExecutions(context.Background(), test.request) - if test.expectedError != nil { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestPinotDualListOpenWorkflowExecutionsByType(t *testing.T) { request := &ListWorkflowExecutionsByTypeRequest{ ListWorkflowExecutionsRequest: ListWorkflowExecutionsRequest{ diff --git a/common/persistence/sql/sql_visibility_store.go b/common/persistence/sql/sql_visibility_store.go index 89e7073b3cf..678d80c929a 100644 --- a/common/persistence/sql/sql_visibility_store.go +++ b/common/persistence/sql/sql_visibility_store.go @@ -319,13 +319,6 @@ func (s *sqlVisibilityStore) ListWorkflowExecutions( return nil, p.ErrVisibilityOperationNotSupported } -func (s *sqlVisibilityStore) ListAllWorkflowExecutions( - _ context.Context, - _ *p.InternalListAllWorkflowExecutionsByTypeRequest, -) (*p.InternalListWorkflowExecutionsResponse, error) { - return nil, p.ErrVisibilityOperationNotSupported -} - func (s *sqlVisibilityStore) ScanWorkflowExecutions( _ context.Context, _ *p.ListWorkflowExecutionsByQueryRequest, diff --git a/common/persistence/visibility_dual_manager.go b/common/persistence/visibility_dual_manager.go index 6a5c9b9b7c7..464bf97dfcc 100644 --- a/common/persistence/visibility_dual_manager.go +++ b/common/persistence/visibility_dual_manager.go @@ -299,14 +299,6 @@ func (v *visibilityDualManager) ListWorkflowExecutions( return manager.ListWorkflowExecutions(ctx, request) } -func (v *visibilityDualManager) ListAllWorkflowExecutions( - ctx context.Context, - request *ListAllWorkflowExecutionsRequest, -) (*ListWorkflowExecutionsResponse, error) { - manager := v.chooseVisibilityManagerForRead(request.Domain) - return manager.ListAllWorkflowExecutions(ctx, request) -} - func (v *visibilityDualManager) ScanWorkflowExecutions( ctx context.Context, request *ListWorkflowExecutionsByQueryRequest, diff --git a/common/persistence/visibility_dual_manager_test.go b/common/persistence/visibility_dual_manager_test.go index d1deb4c35c5..901faaf719d 100644 --- a/common/persistence/visibility_dual_manager_test.go +++ b/common/persistence/visibility_dual_manager_test.go @@ -713,64 +713,6 @@ func TestDualListClosedWorkflowExecutions(t *testing.T) { } } -func TestDualListAllWorkflowExecutions(t *testing.T) { - request := &ListAllWorkflowExecutionsRequest{ - ListWorkflowExecutionsRequest: ListWorkflowExecutionsRequest{Domain: "test-domain"}, - PartialMatch: false, - WorkflowSearchValue: "", - } - - ctrl := gomock.NewController(t) - - tests := map[string]struct { - request *ListAllWorkflowExecutionsRequest - mockDBVisibilityManager VisibilityManager - mockESVisibilityManager VisibilityManager - mockDBVisibilityManagerAccordance func(mockDBVisibilityManager *MockVisibilityManager) - mockESVisibilityManagerAccordance func(mockESVisibilityManager *MockVisibilityManager) - readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter - expectedError error - }{ - "Case1-1: success case with DB visibility is not nil": { - request: request, - mockDBVisibilityManager: NewMockVisibilityManager(ctrl), - mockDBVisibilityManagerAccordance: func(mockDBVisibilityManager *MockVisibilityManager) { - mockDBVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - expectedError: nil, - }, - "Case1-2: success case with ES visibility is not nil": { - request: request, - mockESVisibilityManager: NewMockVisibilityManager(ctrl), - mockESVisibilityManagerAccordance: func(mockESVisibilityManager *MockVisibilityManager) { - mockESVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - expectedError: nil, - }, - } - for name, test := range tests { - t.Run(name, func(t *testing.T) { - if test.mockDBVisibilityManager != nil { - test.mockDBVisibilityManagerAccordance(test.mockDBVisibilityManager.(*MockVisibilityManager)) - } - if test.mockESVisibilityManager != nil { - test.mockESVisibilityManagerAccordance(test.mockESVisibilityManager.(*MockVisibilityManager)) - } - visibilityManager := NewVisibilityDualManager(test.mockDBVisibilityManager, - test.mockESVisibilityManager, test.readModeIsFromES, nil, log.NewNoop()) - - _, err := visibilityManager.ListAllWorkflowExecutions(context.Background(), test.request) - if test.expectedError != nil { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestDualListOpenWorkflowExecutionsByType(t *testing.T) { request := &ListWorkflowExecutionsByTypeRequest{ ListWorkflowExecutionsRequest: ListWorkflowExecutionsRequest{ diff --git a/common/persistence/visibility_single_manager.go b/common/persistence/visibility_single_manager.go index 2f35c3e6732..503f29ff621 100644 --- a/common/persistence/visibility_single_manager.go +++ b/common/persistence/visibility_single_manager.go @@ -262,28 +262,6 @@ func (v *visibilityManagerImpl) ListClosedWorkflowExecutionsByStatus( return v.convertInternalListResponse(internalResp), nil } -func (v *visibilityManagerImpl) ListAllWorkflowExecutions( - ctx context.Context, - request *ListAllWorkflowExecutionsRequest, -) (*ListWorkflowExecutionsResponse, error) { - internalListRequest := v.toInternalListWorkflowExecutionsRequest(&request.ListWorkflowExecutionsRequest) - internalRequest := &InternalListAllWorkflowExecutionsByTypeRequest{ - PartialMatch: request.PartialMatch, - WorkflowSearchValue: request.WorkflowSearchValue, - SortColumn: request.SortColumn, - SortOrder: request.SortOrder, - } - copy(internalRequest.StatusFilter, request.StatusFilter) - if internalListRequest != nil { - internalRequest.InternalListWorkflowExecutionsRequest = *internalListRequest - } - internalResp, err := v.persistence.ListAllWorkflowExecutions(ctx, internalRequest) - if err != nil { - return nil, err - } - return v.convertInternalListResponse(internalResp), nil -} - func (v *visibilityManagerImpl) GetClosedWorkflowExecution( ctx context.Context, request *GetClosedWorkflowExecutionRequest, diff --git a/common/persistence/visibility_single_manager_test.go b/common/persistence/visibility_single_manager_test.go index d1e4cea24c1..b347780f152 100644 --- a/common/persistence/visibility_single_manager_test.go +++ b/common/persistence/visibility_single_manager_test.go @@ -523,46 +523,6 @@ func TestListClosedWorkflowExecutions(t *testing.T) { } } -func TestListAllWorkflowExecutions(t *testing.T) { - tests := map[string]struct { - request *ListAllWorkflowExecutionsRequest - visibilityStoreAffordance func(mockVisibilityStore *MockVisibilityStore) - expectedError error - }{ - "Case1: error case": { - request: &ListAllWorkflowExecutionsRequest{}, - visibilityStoreAffordance: func(mockVisibilityStore *MockVisibilityStore) { - mockVisibilityStore.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).Times(1) - }, - expectedError: fmt.Errorf("error"), - }, - "Case2: normal case": { - request: &ListAllWorkflowExecutionsRequest{}, - visibilityStoreAffordance: func(mockVisibilityStore *MockVisibilityStore) { - mockVisibilityStore.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - expectedError: nil, - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - ctrl := gomock.NewController(t) - mockVisibilityStore := NewMockVisibilityStore(ctrl) - visibilityManager := NewVisibilityManagerImpl(mockVisibilityStore, log.NewNoop()) - - test.visibilityStoreAffordance(mockVisibilityStore) - - _, err := visibilityManager.ListAllWorkflowExecutions(context.Background(), test.request) - if test.expectedError != nil { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestListOpenWorkflowExecutionsByType(t *testing.T) { errorRequest := &ListWorkflowExecutionsByTypeRequest{} diff --git a/common/persistence/visibility_store_mock.go b/common/persistence/visibility_store_mock.go index 6ac08582301..699590a31d6 100644 --- a/common/persistence/visibility_store_mock.go +++ b/common/persistence/visibility_store_mock.go @@ -140,21 +140,6 @@ func (mr *MockVisibilityStoreMockRecorder) GetName() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetName", reflect.TypeOf((*MockVisibilityStore)(nil).GetName)) } -// ListAllWorkflowExecutions mocks base method. -func (m *MockVisibilityStore) ListAllWorkflowExecutions(arg0 context.Context, arg1 *InternalListAllWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListAllWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*InternalListWorkflowExecutionsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListAllWorkflowExecutions indicates an expected call of ListAllWorkflowExecutions. -func (mr *MockVisibilityStoreMockRecorder) ListAllWorkflowExecutions(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllWorkflowExecutions", reflect.TypeOf((*MockVisibilityStore)(nil).ListAllWorkflowExecutions), arg0, arg1) -} - // ListClosedWorkflowExecutions mocks base method. func (m *MockVisibilityStore) ListClosedWorkflowExecutions(arg0 context.Context, arg1 *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/visibility_triple_manager.go b/common/persistence/visibility_triple_manager.go index a528dcfaf44..1119a3b27b9 100644 --- a/common/persistence/visibility_triple_manager.go +++ b/common/persistence/visibility_triple_manager.go @@ -616,30 +616,6 @@ func (v *visibilityTripleManager) ListWorkflowExecutions( return manager.ListWorkflowExecutions(ctx, request) } -func (v *visibilityTripleManager) ListAllWorkflowExecutions( - ctx context.Context, - request *ListAllWorkflowExecutionsRequest, -) (*ListWorkflowExecutionsResponse, error) { - override := ctx.Value(ContextKey) - v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - closeStatus: 6, // 6 means not set closeStatus. - earliestTime: request.EarliestTime, - latestTime: request.LatestTime, - }, request.Domain, override != nil) - - // get another manager for double read - shadowMgr := v.getShadowMgrForDoubleRead(request.Domain) - // call the API for latency comparison - if shadowMgr != nil { - go shadow(shadowMgr.ListAllWorkflowExecutions, request, v.logger) - } - - manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) - return manager.ListAllWorkflowExecutions(ctx, request) -} - func (v *visibilityTripleManager) ScanWorkflowExecutions( ctx context.Context, request *ListWorkflowExecutionsByQueryRequest, diff --git a/common/persistence/visibility_triple_manager_test.go b/common/persistence/visibility_triple_manager_test.go index 4e3b0a02583..8f50aae38bc 100644 --- a/common/persistence/visibility_triple_manager_test.go +++ b/common/persistence/visibility_triple_manager_test.go @@ -1090,207 +1090,6 @@ func TestPinotTripleListOpenWorkflowExecutions(t *testing.T) { } } -func TestPinotTripleListAllWorkflowExecutions(t *testing.T) { - request := &ListAllWorkflowExecutionsRequest{ - ListWorkflowExecutionsRequest: ListWorkflowExecutionsRequest{Domain: "test-domain"}, - PartialMatch: false, - WorkflowSearchValue: "", - } - - // put this outside because need to use it as an input of the table tests - ctrl := gomock.NewController(t) - - tests := map[string]struct { - request *ListAllWorkflowExecutionsRequest - mockDBVisibilityManager VisibilityManager - mockESVisibilityManager VisibilityManager - mockPinotVisibilityManager VisibilityManager - mockDBVisibilityManagerAffordance func(mockDBVisibilityManager *MockVisibilityManager) - mockPinotVisibilityManagerAffordance func(wg *sync.WaitGroup, mockPinotVisibilityManager *MockVisibilityManager) - mockESVisibilityManagerAffordance func(wg *sync.WaitGroup, mockESVisibilityManager *MockVisibilityManager) - readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter - readModeIsFromPinot dynamicconfig.BoolPropertyFnWithDomainFilter - readModeIsDouble dynamicconfig.BoolPropertyFnWithDomainFilter - wgCount int - expectedError error - }{ - "Case1-1: success case with DB visibility is not nil": { - request: request, - mockDBVisibilityManager: NewMockVisibilityManager(ctrl), - mockDBVisibilityManagerAffordance: func(mockDBVisibilityManager *MockVisibilityManager) { - mockDBVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - wgCount: 0, - expectedError: nil, - }, - "Case1-2: success case with Pinot visibility is not nil": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockPinotVisibilityManager *MockVisibilityManager) { - mockPinotVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - wgCount: 0, - expectedError: nil, - }, - "Case2-1: success case with double read": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockPinotVisibilityManager *MockVisibilityManager) { - mockPinotVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - mockESVisibilityManager: NewMockVisibilityManager(ctrl), - mockESVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockESVisibilityManager *MockVisibilityManager) { - mockESVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).DoAndReturn(func( - ctx context.Context, request *ListAllWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { - wg.Done() - return nil, nil - }).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - wgCount: 1, - expectedError: nil, - }, - "Case2-2: both ES and Pinot nil case with double read": { - request: request, - mockDBVisibilityManager: NewMockVisibilityManager(ctrl), - mockDBVisibilityManagerAffordance: func(mockDBVisibilityManager *MockVisibilityManager) { - mockDBVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - wgCount: 0, - expectedError: nil, - }, - "Case2-3: Pinot nil case with double read": { - request: request, - mockESVisibilityManager: NewMockVisibilityManager(ctrl), - mockESVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockESVisibilityManager *MockVisibilityManager) { - mockESVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - wgCount: 0, - expectedError: nil, - }, - "Case2-4: Read mode is from ES with double read": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockPinotVisibilityManager *MockVisibilityManager) { - mockPinotVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).DoAndReturn(func( - ctx context.Context, request *ListAllWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { - wg.Done() - return nil, nil - }).Times(1) - }, - mockESVisibilityManager: NewMockVisibilityManager(ctrl), - mockESVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockESVisibilityManager *MockVisibilityManager) { - mockESVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - wgCount: 1, - expectedError: nil, - }, - "Case2-5: Read modes' are false with double read": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockPinotVisibilityManager *MockVisibilityManager) {}, - mockESVisibilityManager: NewMockVisibilityManager(ctrl), - mockESVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockESVisibilityManager *MockVisibilityManager) {}, - mockDBVisibilityManager: NewMockVisibilityManager(ctrl), - mockDBVisibilityManagerAffordance: func(mockDBVisibilityManager *MockVisibilityManager) { - mockDBVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - wgCount: 0, - expectedError: nil, - }, - "Case2-6: double read with an error": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockPinotVisibilityManager *MockVisibilityManager) { - mockPinotVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).DoAndReturn(func( - ctx context.Context, request *ListAllWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { - wg.Done() - return nil, fmt.Errorf("test error") - }).Times(1) - }, - mockESVisibilityManager: NewMockVisibilityManager(ctrl), - mockESVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockESVisibilityManager *MockVisibilityManager) { - mockESVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()). - Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - wgCount: 1, - expectedError: nil, - }, - "Case2-7: double read with panic": { - request: request, - mockPinotVisibilityManager: NewMockVisibilityManager(ctrl), - mockPinotVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockPinotVisibilityManager *MockVisibilityManager) { - mockPinotVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).DoAndReturn(func( - ctx context.Context, request *ListAllWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { - wg.Done() - panic("test panic") - }).Times(1) - }, - mockESVisibilityManager: NewMockVisibilityManager(ctrl), - mockESVisibilityManagerAffordance: func(wg *sync.WaitGroup, mockESVisibilityManager *MockVisibilityManager) { - mockESVisibilityManager.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()). - Return(nil, nil).Times(1) - }, - readModeIsFromES: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - readModeIsFromPinot: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), - readModeIsDouble: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), - wgCount: 1, - expectedError: nil, - }, - } - for name, test := range tests { - t.Run(name, func(t *testing.T) { - wg := sync.WaitGroup{} - wg.Add(test.wgCount) - - if test.mockDBVisibilityManager != nil { - test.mockDBVisibilityManagerAffordance(test.mockDBVisibilityManager.(*MockVisibilityManager)) - } - if test.mockPinotVisibilityManager != nil { - test.mockPinotVisibilityManagerAffordance(&wg, test.mockPinotVisibilityManager.(*MockVisibilityManager)) - } - if test.mockESVisibilityManager != nil { - test.mockESVisibilityManagerAffordance(&wg, test.mockESVisibilityManager.(*MockVisibilityManager)) - } - - visibilityManager := NewVisibilityTripleManager(test.mockDBVisibilityManager, test.mockPinotVisibilityManager, - test.mockESVisibilityManager, test.readModeIsFromPinot, test.readModeIsFromES, - nil, dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), test.readModeIsDouble, log.NewNoop()) - - _, err := visibilityManager.ListAllWorkflowExecutions(context.Background(), test.request) - - wg.Wait() - if test.expectedError != nil { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestPinotTripleListClosedWorkflowExecutions(t *testing.T) { request := &ListWorkflowExecutionsRequest{ Domain: "test-domain", diff --git a/common/persistence/wrappers/errorinjectors/injectors_test.go b/common/persistence/wrappers/errorinjectors/injectors_test.go index d3eced6b645..b21f640dda4 100644 --- a/common/persistence/wrappers/errorinjectors/injectors_test.go +++ b/common/persistence/wrappers/errorinjectors/injectors_test.go @@ -260,7 +260,6 @@ func builderForPassThrough(t *testing.T, injector any, errorRate float64, logger mocked.EXPECT().ListOpenWorkflowExecutionsByType(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) mocked.EXPECT().ListOpenWorkflowExecutionsByWorkflowID(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) mocked.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) - mocked.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) mocked.EXPECT().RecordWorkflowExecutionStarted(gomock.Any(), gomock.Any()).Return(expectedErr) mocked.EXPECT().RecordWorkflowExecutionClosed(gomock.Any(), gomock.Any()).Return(expectedErr) mocked.EXPECT().RecordWorkflowExecutionUninitialized(gomock.Any(), gomock.Any()).Return(expectedErr) diff --git a/common/persistence/wrappers/errorinjectors/utils.go b/common/persistence/wrappers/errorinjectors/utils.go index 961725c7e6b..c741a82dd2b 100644 --- a/common/persistence/wrappers/errorinjectors/utils.go +++ b/common/persistence/wrappers/errorinjectors/utils.go @@ -284,8 +284,6 @@ func visibilityManagerTags(op string) *tag.Tag { return &tag.StoreOperationDeleteWorkflowExecution case "VisibilityManager.ListWorkflowExecutions": return &tag.StoreOperationListWorkflowExecutions - case "VisibilityManager.ListAllWorkflowExecutions": - return &tag.StoreOperationListAllWorkflowExecutions case "VisibilityManager.ScanWorkflowExecutions": return &tag.StoreOperationScanWorkflowExecutions case "VisibilityManager.CountWorkflowExecutions": diff --git a/common/persistence/wrappers/errorinjectors/visibility_generated.go b/common/persistence/wrappers/errorinjectors/visibility_generated.go index e2cfb340f82..b2e0f27368c 100644 --- a/common/persistence/wrappers/errorinjectors/visibility_generated.go +++ b/common/persistence/wrappers/errorinjectors/visibility_generated.go @@ -122,21 +122,6 @@ func (c *injectorVisibilityManager) GetName() (s1 string) { return c.wrapped.GetName() } -func (c *injectorVisibilityManager) ListAllWorkflowExecutions(ctx context.Context, request *persistence.ListAllWorkflowExecutionsRequest) (lp1 *persistence.ListWorkflowExecutionsResponse, err error) { - fakeErr := generateFakeError(c.errorRate) - var forwardCall bool - if forwardCall = shouldForwardCallToPersistence(fakeErr); forwardCall { - lp1, err = c.wrapped.ListAllWorkflowExecutions(ctx, request) - } - - if fakeErr != nil { - logErr(c.logger, "VisibilityManager.ListAllWorkflowExecutions", fakeErr, forwardCall, err) - err = fakeErr - return - } - return -} - func (c *injectorVisibilityManager) ListClosedWorkflowExecutions(ctx context.Context, request *persistence.ListWorkflowExecutionsRequest) (lp1 *persistence.ListWorkflowExecutionsResponse, err error) { fakeErr := generateFakeError(c.errorRate) var forwardCall bool diff --git a/common/persistence/wrappers/metered/metered_test.go b/common/persistence/wrappers/metered/metered_test.go index 952a12f82a4..9232b68d229 100644 --- a/common/persistence/wrappers/metered/metered_test.go +++ b/common/persistence/wrappers/metered/metered_test.go @@ -250,7 +250,6 @@ func prepareMockForTest(t *testing.T, input interface{}, expectedErr error) { mocked.EXPECT().ListOpenWorkflowExecutionsByType(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr).Times(1) mocked.EXPECT().ListOpenWorkflowExecutionsByWorkflowID(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr).Times(1) mocked.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr).Times(1) - mocked.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr).Times(1) mocked.EXPECT().RecordWorkflowExecutionStarted(gomock.Any(), gomock.Any()).Return(expectedErr).Times(1) mocked.EXPECT().RecordWorkflowExecutionClosed(gomock.Any(), gomock.Any()).Return(expectedErr).Times(1) mocked.EXPECT().RecordWorkflowExecutionUninitialized(gomock.Any(), gomock.Any()).Return(expectedErr).Times(1) diff --git a/common/persistence/wrappers/metered/visibility_generated.go b/common/persistence/wrappers/metered/visibility_generated.go index 57eeae7d81b..7769b8b7887 100644 --- a/common/persistence/wrappers/metered/visibility_generated.go +++ b/common/persistence/wrappers/metered/visibility_generated.go @@ -111,17 +111,6 @@ func (c *meteredVisibilityManager) GetName() (s1 string) { return c.wrapped.GetName() } -func (c *meteredVisibilityManager) ListAllWorkflowExecutions(ctx context.Context, request *persistence.ListAllWorkflowExecutionsRequest) (lp1 *persistence.ListWorkflowExecutionsResponse, err error) { - op := func() error { - lp1, err = c.wrapped.ListAllWorkflowExecutions(ctx, request) - c.emptyMetric("VisibilityManager.ListAllWorkflowExecutions", request, lp1, err) - return err - } - - err = c.call(metrics.PersistenceListAllWorkflowExecutionsScope, op, getCustomMetricTags(request)...) - return -} - func (c *meteredVisibilityManager) ListClosedWorkflowExecutions(ctx context.Context, request *persistence.ListWorkflowExecutionsRequest) (lp1 *persistence.ListWorkflowExecutionsResponse, err error) { op := func() error { lp1, err = c.wrapped.ListClosedWorkflowExecutions(ctx, request) diff --git a/common/persistence/wrappers/ratelimited/visibility_generated.go b/common/persistence/wrappers/ratelimited/visibility_generated.go index 85bcb3be93b..cb66591ef2e 100644 --- a/common/persistence/wrappers/ratelimited/visibility_generated.go +++ b/common/persistence/wrappers/ratelimited/visibility_generated.go @@ -91,14 +91,6 @@ func (c *ratelimitedVisibilityManager) GetName() (s1 string) { return c.wrapped.GetName() } -func (c *ratelimitedVisibilityManager) ListAllWorkflowExecutions(ctx context.Context, request *persistence.ListAllWorkflowExecutionsRequest) (lp1 *persistence.ListWorkflowExecutionsResponse, err error) { - if ok := c.rateLimiter.Allow(); !ok { - err = ErrPersistenceLimitExceeded - return - } - return c.wrapped.ListAllWorkflowExecutions(ctx, request) -} - func (c *ratelimitedVisibilityManager) ListClosedWorkflowExecutions(ctx context.Context, request *persistence.ListWorkflowExecutionsRequest) (lp1 *persistence.ListWorkflowExecutionsResponse, err error) { if ok := c.rateLimiter.Allow(); !ok { err = ErrPersistenceLimitExceeded diff --git a/common/persistence/wrappers/ratelimited/wrappers_test.go b/common/persistence/wrappers/ratelimited/wrappers_test.go index 2c55c0715be..93c3f0ad76d 100644 --- a/common/persistence/wrappers/ratelimited/wrappers_test.go +++ b/common/persistence/wrappers/ratelimited/wrappers_test.go @@ -245,7 +245,6 @@ func builderForPassThrough(t *testing.T, injector any, limiter quotas.Limiter, e mocked.EXPECT().ListOpenWorkflowExecutionsByType(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) mocked.EXPECT().ListOpenWorkflowExecutionsByWorkflowID(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) mocked.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) - mocked.EXPECT().ListAllWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&persistence.ListWorkflowExecutionsResponse{}, expectedErr) mocked.EXPECT().RecordWorkflowExecutionStarted(gomock.Any(), gomock.Any()).Return(expectedErr) mocked.EXPECT().RecordWorkflowExecutionClosed(gomock.Any(), gomock.Any()).Return(expectedErr) mocked.EXPECT().RecordWorkflowExecutionUninitialized(gomock.Any(), gomock.Any()).Return(expectedErr) diff --git a/common/persistence/wrappers/sampled/visibility_manager.go b/common/persistence/wrappers/sampled/visibility_manager.go index 0182abdfcfc..395b688bf72 100644 --- a/common/persistence/wrappers/sampled/visibility_manager.go +++ b/common/persistence/wrappers/sampled/visibility_manager.go @@ -232,17 +232,6 @@ func (p *visibilityManager) ListClosedWorkflowExecutionsByStatus( return p.persistence.ListClosedWorkflowExecutionsByStatus(ctx, request) } -func (p *visibilityManager) ListAllWorkflowExecutions( - ctx context.Context, - request *persistence.ListAllWorkflowExecutionsRequest, -) (*persistence.ListWorkflowExecutionsResponse, error) { - if err := p.tryConsumeListToken(request.Domain, "ListAllWorkflowExecutions"); err != nil { - return nil, err - } - - return p.persistence.ListAllWorkflowExecutions(ctx, request) -} - func (p *visibilityManager) RecordWorkflowExecutionUninitialized( ctx context.Context, request *persistence.RecordWorkflowExecutionUninitializedRequest, diff --git a/common/types/shared.go b/common/types/shared.go index 2c31d056adf..9b49ac7cdc5 100644 --- a/common/types/shared.go +++ b/common/types/shared.go @@ -3482,64 +3482,6 @@ func (v *ListOpenWorkflowExecutionsResponse) GetExecutions() (o []*WorkflowExecu return } -// ListAllWorkflowExecutionsRequest is the request to ListAllWorkflowExecutions -type ListAllWorkflowExecutionsRequest struct { - Domain string `json:"domain,omitempty"` - MaximumPageSize int32 `json:"maximumPageSize,omitempty"` - NextPageToken []byte `json:"nextPageToken,omitempty"` - StartTimeFilter *StartTimeFilter `json:"StartTimeFilter,omitempty"` - PartialMatch bool `json:"partialMatch,omitempty"` - StatusFilter []WorkflowExecutionCloseStatus `json:"closeStatus,omitempty"` - WorkflowSearchValue string `json:"workflowSearchValue,omitempty"` - SortColumn string `json:"sortColumn,omitempty"` - SortOrder string `json:"sortOrder,omitempty"` -} - -func (v *ListAllWorkflowExecutionsRequest) SerializeForLogging() (string, error) { - if v == nil { - return "", nil - } - return SerializeRequest(v) -} - -// GetDomain gets the domain -func (v *ListAllWorkflowExecutionsRequest) GetDomain() (o string) { - if v != nil { - return v.Domain - } - return -} - -// GetMaximumPageSize returns the max page size -func (v *ListAllWorkflowExecutionsRequest) GetMaximumPageSize() (o int32) { - if v != nil { - return v.MaximumPageSize - } - return -} - -// ListAllWorkflowExecutionsResponse is the response for ListAllWorkflowExecutions -type ListAllWorkflowExecutionsResponse struct { - Executions []*WorkflowExecutionInfo `json:"executions,omitempty"` - NextPageToken []byte `json:"nextPageToken,omitempty"` -} - -// GetExecutions returns the workflow execution info -func (v *ListAllWorkflowExecutionsResponse) GetExecutions() (o []*WorkflowExecutionInfo) { - if v != nil && v.Executions != nil { - return v.Executions - } - return -} - -// GetNextPageToken returns the next page token) -func (v *ListAllWorkflowExecutionsResponse) GetNextPageToken() (o []byte) { - if v != nil && v.NextPageToken != nil { - return v.NextPageToken - } - return -} - // ListTaskListPartitionsRequest is an internal type (TBD...) type ListTaskListPartitionsRequest struct { Domain string `json:"domain,omitempty"` diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index d5fbf8f4182..ad4a1106a54 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -3075,93 +3075,6 @@ func (wh *WorkflowHandler) ListWorkflowExecutions( return resp, nil } -// ListAllWorkflowExecutions - retrieves info for all workflow executions in a domain -func (wh *WorkflowHandler) ListAllWorkflowExecutions( - ctx context.Context, - listRequest *types.ListAllWorkflowExecutionsRequest, -) (resp *types.ListAllWorkflowExecutionsResponse, retError error) { - if wh.isShuttingDown() { - return nil, validate.ErrShuttingDown - } - - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return nil, err - } - - if listRequest == nil { - return nil, validate.ErrRequestNotSet - } - - if listRequest.GetDomain() == "" { - return nil, validate.ErrDomainNotSet - } - - if listRequest.StartTimeFilter == nil { - return nil, &types.BadRequestError{Message: "StartTimeFilter is required"} - } - - if listRequest.StartTimeFilter.EarliestTime == nil { - return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"} - } - - if listRequest.StartTimeFilter.LatestTime == nil { - return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"} - } - - if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() { - return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"} - } - - if listRequest.GetMaximumPageSize() <= 0 { - listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())) - } - - if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) { - return nil, &types.BadRequestError{ - Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())} - } - - domain := listRequest.GetDomain() - domainID, err := wh.GetDomainCache().GetDomainID(domain) - if err != nil { - return nil, err - } - - baseReq := persistence.ListWorkflowExecutionsRequest{ - DomainUUID: domainID, - Domain: domain, - PageSize: int(listRequest.GetMaximumPageSize()), - NextPageToken: listRequest.NextPageToken, - EarliestTime: listRequest.StartTimeFilter.GetEarliestTime(), - LatestTime: listRequest.StartTimeFilter.GetLatestTime(), - } - listallrequest := &persistence.ListAllWorkflowExecutionsRequest{ - ListWorkflowExecutionsRequest: baseReq, - PartialMatch: listRequest.PartialMatch, - WorkflowSearchValue: listRequest.WorkflowSearchValue, - SortColumn: listRequest.SortColumn, - SortOrder: listRequest.SortOrder, - } - copy(listallrequest.StatusFilter, listRequest.StatusFilter) - var persistenceResp *persistence.ListWorkflowExecutionsResponse - - persistenceResp, err = wh.GetVisibilityManager().ListAllWorkflowExecutions( - ctx, - listallrequest, - ) - if err != nil { - return nil, err - } - - wh.GetLogger().Debug("List all workflows", - tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType) - - resp = &types.ListAllWorkflowExecutionsResponse{} - resp.Executions = persistenceResp.Executions - resp.NextPageToken = persistenceResp.NextPageToken - return resp, nil -} - // RestartWorkflowExecution - retrieves info for an existing workflow then restarts it func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) { if wh.isShuttingDown() { diff --git a/service/frontend/api/handler_test.go b/service/frontend/api/handler_test.go index ffcc31e043f..ba77d385b36 100644 --- a/service/frontend/api/handler_test.go +++ b/service/frontend/api/handler_test.go @@ -1786,56 +1786,6 @@ func (s *workflowHandlerSuite) TestListWorkflowExecutions() { s.NotNil(err) } -func (s *workflowHandlerSuite) TestListAllWorkflowExecutions() { - config := s.newConfig(dc.NewInMemoryClient()) - wh := s.getWorkflowHandler(config) - - s.mockDomainCache.EXPECT().GetDomainID(gomock.Any()).Return(s.testDomainID, nil).AnyTimes() - s.mockVisibilityMgr.On("ListAllWorkflowExecutions", mock.Anything, mock.Anything).Return(&persistence.ListWorkflowExecutionsResponse{}, nil).Once() - - listRequest := &types.ListAllWorkflowExecutionsRequest{ - Domain: s.testDomain, - MaximumPageSize: 0, - NextPageToken: nil, - StartTimeFilter: &types.StartTimeFilter{ - EarliestTime: common.Int64Ptr(0), - LatestTime: common.Int64Ptr(time.Now().UnixNano()), - }, - PartialMatch: false, - WorkflowSearchValue: "test", - } - ctx := context.Background() - - // valid request - _, err := wh.ListAllWorkflowExecutions(ctx, listRequest) - s.NoError(err) - - // nil request - listRequest = nil - _, err = wh.ListAllWorkflowExecutions(ctx, listRequest) - s.NotNil(err) - s.Equal(err, validate.ErrRequestNotSet) - - // invalid request - start time filter not set - listRequest = &types.ListAllWorkflowExecutionsRequest{ - Domain: s.testDomain, - StartTimeFilter: nil, - } - _, err = wh.ListAllWorkflowExecutions(ctx, listRequest) - s.NotNil(err) - - // invalid request - earliest time > latest time - listRequest = &types.ListAllWorkflowExecutionsRequest{ - Domain: s.testDomain, - StartTimeFilter: &types.StartTimeFilter{ - EarliestTime: common.Int64Ptr(time.Now().UnixNano()), - LatestTime: common.Int64Ptr(0), - }, - } - _, err = wh.ListAllWorkflowExecutions(ctx, listRequest) - s.NotNil(err) -} - func (s *workflowHandlerSuite) TestScantWorkflowExecutions() { config := s.newConfig(dc.NewInMemoryClient()) wh := s.getWorkflowHandler(config)