diff --git a/common/persistence/dataVisibilityManagerInterfaces.go b/common/persistence/dataVisibilityManagerInterfaces.go index 3a77c2a6233..c2fdc5ad604 100644 --- a/common/persistence/dataVisibilityManagerInterfaces.go +++ b/common/persistence/dataVisibilityManagerInterfaces.go @@ -137,6 +137,7 @@ type ( ListAllWorkflowExecutionsRequest struct { ListWorkflowExecutionsRequest PartialMatch bool + StatusFilter []types.WorkflowExecutionCloseStatus WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID SortColumn string // This should be a valid search attribute SortOrder string // DESC or ASC diff --git a/common/persistence/data_store_interfaces.go b/common/persistence/data_store_interfaces.go index 509ab435a1f..af440ccc2ae 100644 --- a/common/persistence/data_store_interfaces.go +++ b/common/persistence/data_store_interfaces.go @@ -710,6 +710,7 @@ type ( // InternalListAllWorkflowExecutionsByTypeRequest is used to list all open and closed executions with specific filters in a domain InternalListAllWorkflowExecutionsByTypeRequest struct { InternalListWorkflowExecutionsRequest + StatusFilter []types.WorkflowExecutionCloseStatus PartialMatch bool WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID SortColumn string // This should be a valid search attribute diff --git a/common/persistence/pinot/pinot_visibility_store.go b/common/persistence/pinot/pinot_visibility_store.go index 4a441d94f5c..e15e3b5f3b5 100644 --- a/common/persistence/pinot/pinot_visibility_store.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -708,6 +708,10 @@ func (s *PinotQuerySearchField) lastSearchField() { } } +func (s *PinotQuerySearchField) resetSearchField() { + s.string = "" +} + func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) { s.checkFirstSearchField() if _, ok := val.(string); ok { @@ -766,6 +770,16 @@ func (q *PinotQuery) addOffsetAndLimits(offset int, limit int) { q.limits += fmt.Sprintf("LIMIT %d, %d\n", offset, limit) } +func (q *PinotQuery) addStatusFilters(status []types.WorkflowExecutionCloseStatus) { + for _, s := range status { + q.search.addEqual(CloseStatus, s.String()) + } + + q.search.lastSearchField() + q.filters.addQuery(q.search.string) + q.search.resetSearchField() +} + func (f *PinotQueryFilter) checkFirstFilter() { if f.string == "" { f.string = "WHERE " @@ -1068,6 +1082,10 @@ func (v *pinotVisibilityStore) getListAllWorkflowExecutionsQuery(tableName strin pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) + if request.StatusFilter != nil { + query.addStatusFilters(request.StatusFilter) + } + if request.WorkflowSearchValue != "" { if request.PartialMatch { query.search.addMatch(WorkflowID, request.WorkflowSearchValue) diff --git a/common/persistence/pinot/pinot_visibility_store_test.go b/common/persistence/pinot/pinot_visibility_store_test.go index bb63f1f27ce..87610974ac8 100644 --- a/common/persistence/pinot/pinot_visibility_store_test.go +++ b/common/persistence/pinot/pinot_visibility_store_test.go @@ -1769,12 +1769,16 @@ func TestGetListAllWorkflowExecutionsQuery(t *testing.T) { }, PartialMatch: false, WorkflowSearchValue: "123", + StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusCompleted, types.WorkflowExecutionCloseStatusTimedOut}, }, expectResult: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' AND IsDeleted = false AND StartTime BETWEEN 1547596871371 AND 2547596873371 +AND ( CloseStatus = 'COMPLETED' +OR CloseStatus = 'TIMED_OUT' + ) AND ( WorkflowID = '123' OR WorkflowType = '123' OR RunID = '123' @@ -1796,6 +1800,7 @@ LIMIT 0, 10 }, PartialMatch: false, WorkflowSearchValue: "123", + StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusTerminated}, SortColumn: CloseTime, SortOrder: AscendingOrder, }, @@ -1804,6 +1809,8 @@ FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' AND IsDeleted = false AND StartTime BETWEEN 1547596871371 AND 2547596873371 +AND ( CloseStatus = 'TERMINATED' + ) AND ( WorkflowID = '123' OR WorkflowType = '123' OR RunID = '123' diff --git a/common/persistence/visibility_single_manager.go b/common/persistence/visibility_single_manager.go index 9aad96c6e76..2f35c3e6732 100644 --- a/common/persistence/visibility_single_manager.go +++ b/common/persistence/visibility_single_manager.go @@ -273,6 +273,7 @@ func (v *visibilityManagerImpl) ListAllWorkflowExecutions( SortColumn: request.SortColumn, SortOrder: request.SortOrder, } + copy(internalRequest.StatusFilter, request.StatusFilter) if internalListRequest != nil { internalRequest.InternalListWorkflowExecutionsRequest = *internalListRequest } diff --git a/common/types/shared.go b/common/types/shared.go index 5e85cbc55da..59fe5d0dcfc 100644 --- a/common/types/shared.go +++ b/common/types/shared.go @@ -3482,14 +3482,15 @@ func (v *ListOpenWorkflowExecutionsResponse) GetExecutions() (o []*WorkflowExecu // ListAllWorkflowExecutionsRequest is the request to ListAllWorkflowExecutions type ListAllWorkflowExecutionsRequest struct { - Domain string `json:"domain,omitempty"` - MaximumPageSize int32 `json:"maximumPageSize,omitempty"` - NextPageToken []byte `json:"nextPageToken,omitempty"` - StartTimeFilter *StartTimeFilter `json:"StartTimeFilter,omitempty"` - PartialMatch bool `json:"partialMatch,omitempty"` - WorkflowSearchValue string `json:"workflowSearchValue,omitempty"` - SortColumn string `json:"sortColumn,omitempty"` - SortOrder string `json:"sortOrder,omitempty"` + Domain string `json:"domain,omitempty"` + MaximumPageSize int32 `json:"maximumPageSize,omitempty"` + NextPageToken []byte `json:"nextPageToken,omitempty"` + StartTimeFilter *StartTimeFilter `json:"StartTimeFilter,omitempty"` + PartialMatch bool `json:"partialMatch,omitempty"` + StatusFilter []WorkflowExecutionCloseStatus `json:"closeStatus,omitempty"` + WorkflowSearchValue string `json:"workflowSearchValue,omitempty"` + SortColumn string `json:"sortColumn,omitempty"` + SortOrder string `json:"sortOrder,omitempty"` } func (v *ListAllWorkflowExecutionsRequest) SerializeForLogging() (string, error) { diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index c2fb44cd1e2..62fd51eccd9 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -3135,18 +3135,19 @@ func (wh *WorkflowHandler) ListAllWorkflowExecutions( EarliestTime: listRequest.StartTimeFilter.GetEarliestTime(), LatestTime: listRequest.StartTimeFilter.GetLatestTime(), } - + listallrequest := &persistence.ListAllWorkflowExecutionsRequest{ + ListWorkflowExecutionsRequest: baseReq, + PartialMatch: listRequest.PartialMatch, + WorkflowSearchValue: listRequest.WorkflowSearchValue, + SortColumn: listRequest.SortColumn, + SortOrder: listRequest.SortOrder, + } + copy(listallrequest.StatusFilter, listRequest.StatusFilter) var persistenceResp *persistence.ListWorkflowExecutionsResponse persistenceResp, err = wh.GetVisibilityManager().ListAllWorkflowExecutions( ctx, - &persistence.ListAllWorkflowExecutionsRequest{ - ListWorkflowExecutionsRequest: baseReq, - PartialMatch: listRequest.PartialMatch, - WorkflowSearchValue: listRequest.WorkflowSearchValue, - SortColumn: listRequest.SortColumn, - SortOrder: listRequest.SortOrder, - }, + listallrequest, ) if err != nil { return nil, err