Skip to content

Commit

Permalink
Allow filtering by CloseStatus in ListAllWorkflowExecutions api (uber…
Browse files Browse the repository at this point in the history
  • Loading branch information
sankari165 committed Jun 14, 2024
1 parent ef1105c commit ceff8ca
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 16 deletions.
1 change: 1 addition & 0 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type (
ListAllWorkflowExecutionsRequest struct {
ListWorkflowExecutionsRequest
PartialMatch bool
StatusFilter []types.WorkflowExecutionCloseStatus
WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID
SortColumn string // This should be a valid search attribute
SortOrder string // DESC or ASC
Expand Down
1 change: 1 addition & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ type (
// InternalListAllWorkflowExecutionsByTypeRequest is used to list all open and closed executions with specific filters in a domain
InternalListAllWorkflowExecutionsByTypeRequest struct {
InternalListWorkflowExecutionsRequest
StatusFilter []types.WorkflowExecutionCloseStatus
PartialMatch bool
WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID
SortColumn string // This should be a valid search attribute
Expand Down
18 changes: 18 additions & 0 deletions common/persistence/pinot/pinot_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ func (s *PinotQuerySearchField) lastSearchField() {
}
}

func (s *PinotQuerySearchField) resetSearchField() {
s.string = ""
}

func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) {
s.checkFirstSearchField()
if _, ok := val.(string); ok {
Expand Down Expand Up @@ -766,6 +770,16 @@ func (q *PinotQuery) addOffsetAndLimits(offset int, limit int) {
q.limits += fmt.Sprintf("LIMIT %d, %d\n", offset, limit)
}

func (q *PinotQuery) addStatusFilters(status []types.WorkflowExecutionCloseStatus) {
for _, s := range status {
q.search.addEqual(CloseStatus, s.String())
}

q.search.lastSearchField()
q.filters.addQuery(q.search.string)
q.search.resetSearchField()
}

func (f *PinotQueryFilter) checkFirstFilter() {
if f.string == "" {
f.string = "WHERE "
Expand Down Expand Up @@ -1068,6 +1082,10 @@ func (v *pinotVisibilityStore) getListAllWorkflowExecutionsQuery(tableName strin
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

if request.StatusFilter != nil {
query.addStatusFilters(request.StatusFilter)
}

if request.WorkflowSearchValue != "" {
if request.PartialMatch {
query.search.addMatch(WorkflowID, request.WorkflowSearchValue)
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/pinot/pinot_visibility_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,12 +1769,16 @@ func TestGetListAllWorkflowExecutionsQuery(t *testing.T) {
},
PartialMatch: false,
WorkflowSearchValue: "123",
StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusCompleted, types.WorkflowExecutionCloseStatusTimedOut},
},
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND StartTime BETWEEN 1547596871371 AND 2547596873371
AND ( CloseStatus = 'COMPLETED'
OR CloseStatus = 'TIMED_OUT'
)
AND ( WorkflowID = '123'
OR WorkflowType = '123'
OR RunID = '123'
Expand All @@ -1796,6 +1800,7 @@ LIMIT 0, 10
},
PartialMatch: false,
WorkflowSearchValue: "123",
StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusTerminated},
SortColumn: CloseTime,
SortOrder: AscendingOrder,
},
Expand All @@ -1804,6 +1809,8 @@ FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND StartTime BETWEEN 1547596871371 AND 2547596873371
AND ( CloseStatus = 'TERMINATED'
)
AND ( WorkflowID = '123'
OR WorkflowType = '123'
OR RunID = '123'
Expand Down
1 change: 1 addition & 0 deletions common/persistence/visibility_single_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (v *visibilityManagerImpl) ListAllWorkflowExecutions(
SortColumn: request.SortColumn,
SortOrder: request.SortOrder,
}
copy(internalRequest.StatusFilter, request.StatusFilter)
if internalListRequest != nil {
internalRequest.InternalListWorkflowExecutionsRequest = *internalListRequest
}
Expand Down
17 changes: 9 additions & 8 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3482,14 +3482,15 @@ func (v *ListOpenWorkflowExecutionsResponse) GetExecutions() (o []*WorkflowExecu

// ListAllWorkflowExecutionsRequest is the request to ListAllWorkflowExecutions
type ListAllWorkflowExecutionsRequest struct {
Domain string `json:"domain,omitempty"`
MaximumPageSize int32 `json:"maximumPageSize,omitempty"`
NextPageToken []byte `json:"nextPageToken,omitempty"`
StartTimeFilter *StartTimeFilter `json:"StartTimeFilter,omitempty"`
PartialMatch bool `json:"partialMatch,omitempty"`
WorkflowSearchValue string `json:"workflowSearchValue,omitempty"`
SortColumn string `json:"sortColumn,omitempty"`
SortOrder string `json:"sortOrder,omitempty"`
Domain string `json:"domain,omitempty"`
MaximumPageSize int32 `json:"maximumPageSize,omitempty"`
NextPageToken []byte `json:"nextPageToken,omitempty"`
StartTimeFilter *StartTimeFilter `json:"StartTimeFilter,omitempty"`
PartialMatch bool `json:"partialMatch,omitempty"`
StatusFilter []WorkflowExecutionCloseStatus `json:"closeStatus,omitempty"`
WorkflowSearchValue string `json:"workflowSearchValue,omitempty"`
SortColumn string `json:"sortColumn,omitempty"`
SortOrder string `json:"sortOrder,omitempty"`
}

func (v *ListAllWorkflowExecutionsRequest) SerializeForLogging() (string, error) {
Expand Down
17 changes: 9 additions & 8 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3135,18 +3135,19 @@ func (wh *WorkflowHandler) ListAllWorkflowExecutions(
EarliestTime: listRequest.StartTimeFilter.GetEarliestTime(),
LatestTime: listRequest.StartTimeFilter.GetLatestTime(),
}

listallrequest := &persistence.ListAllWorkflowExecutionsRequest{
ListWorkflowExecutionsRequest: baseReq,
PartialMatch: listRequest.PartialMatch,
WorkflowSearchValue: listRequest.WorkflowSearchValue,
SortColumn: listRequest.SortColumn,
SortOrder: listRequest.SortOrder,
}
copy(listallrequest.StatusFilter, listRequest.StatusFilter)
var persistenceResp *persistence.ListWorkflowExecutionsResponse

persistenceResp, err = wh.GetVisibilityManager().ListAllWorkflowExecutions(
ctx,
&persistence.ListAllWorkflowExecutionsRequest{
ListWorkflowExecutionsRequest: baseReq,
PartialMatch: listRequest.PartialMatch,
WorkflowSearchValue: listRequest.WorkflowSearchValue,
SortColumn: listRequest.SortColumn,
SortOrder: listRequest.SortOrder,
},
listallrequest,
)
if err != nil {
return nil, err
Expand Down

0 comments on commit ceff8ca

Please sign in to comment.