Skip to content

Commit

Permalink
Remove unused ListAllWorkflowExecutions method (uber#6204)
Browse files Browse the repository at this point in the history
  • Loading branch information
sankari165 authored Jul 31, 2024
1 parent a024af3 commit 8fc4d27
Show file tree
Hide file tree
Showing 34 changed files with 1 addition and 1,345 deletions.
1 change: 0 additions & 1 deletion common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ var (
StoreOperationGetClosedWorkflowExecution = storeOperation("get-closed-wf-execution")
StoreOperationVisibilityDeleteWorkflowExecution = storeOperation("vis-delete-wf-execution")
StoreOperationListWorkflowExecutions = storeOperation("list-wf-executions")
StoreOperationListAllWorkflowExecutions = storeOperation("list-all-wf-executions")
StoreOperationScanWorkflowExecutions = storeOperation("scan-wf-executions")
StoreOperationCountWorkflowExecutions = storeOperation("count-wf-executions")
StoreOperationDeleteUninitializedWorkflowExecution = storeOperation("delete-uninitialized-wf-execution")
Expand Down
9 changes: 0 additions & 9 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ const (
PersistenceDeleteUninitializedWorkflowExecutionScope
// PersistenceListWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer
PersistenceListWorkflowExecutionsScope
// PersistenceListAllWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer
PersistenceListAllWorkflowExecutionsScope
// PersistenceScanWorkflowExecutionsScope tracks ScanWorkflowExecutions calls made by service to persistence layer
PersistenceScanWorkflowExecutionsScope
// PersistenceCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer
Expand Down Expand Up @@ -753,8 +751,6 @@ const (
ElasticsearchGetClosedWorkflowExecutionScope
// ElasticsearchListWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer
ElasticsearchListWorkflowExecutionsScope
// ElasticsearchListAllWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer
ElasticsearchListAllWorkflowExecutionsScope
// ElasticsearchScanWorkflowExecutionsScope tracks ScanWorkflowExecutions calls made by service to persistence layer
ElasticsearchScanWorkflowExecutionsScope
// ElasticsearchCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer
Expand Down Expand Up @@ -790,8 +786,6 @@ const (
PinotGetClosedWorkflowExecutionScope
// PinotListWorkflowExecutionsScope tracks ListWorkflowExecutions calls made by service to persistence layer
PinotListWorkflowExecutionsScope
// PinotListAllWorkflowExecutionsScope tracks ListAllWorkflowExecutions calls made by service to persistence layer
PinotListAllWorkflowExecutionsScope
// PinotScanWorkflowExecutionsScope tracks ScanWorkflowExecutions calls made by service to persistence layer
PinotScanWorkflowExecutionsScope
// PinotCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer
Expand Down Expand Up @@ -1441,7 +1435,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceVisibilityDeleteWorkflowExecutionScope: {operation: "VisibilityDeleteWorkflowExecution"},
PersistenceDeleteUninitializedWorkflowExecutionScope: {operation: "VisibilityDeleteUninitializedWorkflowExecution"},
PersistenceListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"},
PersistenceListAllWorkflowExecutionsScope: {operation: "ListAllWorkflowExecutions"},
PersistenceScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"},
PersistenceCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"},
PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes"},
Expand Down Expand Up @@ -1685,7 +1678,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ElasticsearchListClosedWorkflowExecutionsByStatusScope: {operation: "ListClosedWorkflowExecutionsByStatus"},
ElasticsearchGetClosedWorkflowExecutionScope: {operation: "GetClosedWorkflowExecution"},
ElasticsearchListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"},
ElasticsearchListAllWorkflowExecutionsScope: {operation: "ListAllWorkflowExecutions"},
ElasticsearchScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"},
ElasticsearchCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"},
ElasticsearchDeleteWorkflowExecutionsScope: {operation: "DeleteWorkflowExecution"},
Expand All @@ -1703,7 +1695,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PinotListClosedWorkflowExecutionsByStatusScope: {operation: "ListClosedWorkflowExecutionsByStatus"},
PinotGetClosedWorkflowExecutionScope: {operation: "GetClosedWorkflowExecution"},
PinotListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"},
PinotListAllWorkflowExecutionsScope: {operation: "ListAllWorkflowExecutions"},
PinotScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"},
PinotCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"},
PinotDeleteWorkflowExecutionsScope: {operation: "DeleteWorkflowExecution"},
Expand Down
30 changes: 0 additions & 30 deletions common/mocks/VisibilityManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ type (
NextPageToken []byte
}

// ListAllWorkflowExecutionsRequest is used to list all executions in a domain
ListAllWorkflowExecutionsRequest struct {
ListWorkflowExecutionsRequest
PartialMatch bool
StatusFilter []types.WorkflowExecutionCloseStatus
WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID
SortColumn string // This should be a valid search attribute
SortOrder string // DESC or ASC
}

// ListWorkflowExecutionsByQueryRequest is used to list executions in a domain
ListWorkflowExecutionsByQueryRequest struct {
DomainUUID string
Expand Down Expand Up @@ -235,7 +225,6 @@ type (
ListClosedWorkflowExecutionsByStatus(ctx context.Context, request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*ListWorkflowExecutionsResponse, error)
ListAllWorkflowExecutions(ctx context.Context, request *ListAllWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*ListWorkflowExecutionsResponse, error)
CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
// NOTE: GetClosedWorkflowExecution is only for persistence testing, currently no index is supported for filtering by RunID
Expand Down
15 changes: 0 additions & 15 deletions common/persistence/dataVisibilityManagerInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ type (
GetClosedWorkflowExecution(ctx context.Context, request *InternalGetClosedWorkflowExecutionRequest) (*InternalGetClosedWorkflowExecutionResponse, error)
DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
ListAllWorkflowExecutions(ctx context.Context, request *InternalListAllWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error)
ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
Expand Down Expand Up @@ -702,16 +701,6 @@ type (
WorkflowTypeName string
}

// InternalListAllWorkflowExecutionsByTypeRequest is used to list all open and closed executions with specific filters in a domain
InternalListAllWorkflowExecutionsByTypeRequest struct {
InternalListWorkflowExecutionsRequest
StatusFilter []types.WorkflowExecutionCloseStatus
PartialMatch bool
WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID
SortColumn string // This should be a valid search attribute
SortOrder string // DESC or ASC
}

// InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution
InternalGetClosedWorkflowExecutionResponse struct {
Execution *InternalVisibilityWorkflowExecutionInfo
Expand Down
19 changes: 0 additions & 19 deletions common/persistence/elasticsearch/es_visibility_metric_clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,25 +276,6 @@ func (p *visibilityMetricsClient) GetClosedWorkflowExecution(
return response, err
}

func (p *visibilityMetricsClient) ListAllWorkflowExecutions(
ctx context.Context,
request *p.ListAllWorkflowExecutionsRequest,
) (*p.ListWorkflowExecutionsResponse, error) {

scopeWithDomainTag := p.metricClient.Scope(metrics.ElasticsearchListAllWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.ElasticsearchRequestsPerDomain)

sw := scopeWithDomainTag.StartTimer(metrics.ElasticsearchLatencyPerDomain)
response, err := p.persistence.ListAllWorkflowExecutions(ctx, request)
sw.Stop()

if err != nil {
p.updateErrorMetric(scopeWithDomainTag, metrics.ElasticsearchListAllWorkflowExecutionsScope, err)
}

return response, err
}

func (p *visibilityMetricsClient) ListWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
Expand Down
93 changes: 1 addition & 92 deletions common/persistence/elasticsearch/es_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ import (
)

const (
esPersistenceName = "elasticsearch"
oneMicroSecondInNano = int64(time.Microsecond / time.Nanosecond)
esPersistenceName = "elasticsearch"
)

type (
Expand Down Expand Up @@ -443,41 +442,6 @@ func (v *esVisibilityStore) ListWorkflowExecutions(
return resp, nil
}

func (v *esVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

if request.PageSize == 0 {
request.PageSize = 1000
}

token, err := es.GetNextPageToken(request.NextPageToken)
if err != nil {
return nil, err
}

queryDSL, err := v.getESQueryDSLForListAll(request, token)
if err != nil {
return nil, &types.BadRequestError{Message: fmt.Sprintf("Error when building query: %v", err)}
}

resp, err := v.esClient.SearchByQuery(ctx, &es.SearchByQueryRequest{
Index: v.index,
Query: queryDSL,
NextPageToken: request.NextPageToken,
PageSize: request.PageSize,
Filter: isRecordValid,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListAllWorkflowExecutions failed, %v", err),
}
}
return resp, nil
}

func (v *esVisibilityStore) ScanWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
Expand Down Expand Up @@ -599,11 +563,6 @@ func (v *esVisibilityStore) getESQueryDSL(request *p.ListWorkflowExecutionsByQue
return v.processedDSLfromSQL(sql, request.DomainUUID, token)
}

func (v *esVisibilityStore) getESQueryDSLForListAll(request *p.InternalListAllWorkflowExecutionsByTypeRequest, token *es.ElasticVisibilityPageToken) (string, error) {
sql := getSQLFromListAllRequest(request)
return v.processedDSLfromSQL(sql, request.DomainUUID, token)
}

func (v *esVisibilityStore) processedDSLfromSQL(sql, domainUUID string, token *es.ElasticVisibilityPageToken) (string, error) {
dsl, err := getCustomizedDSLFromSQL(sql, domainUUID)
if err != nil {
Expand Down Expand Up @@ -643,56 +602,6 @@ func getSQLFromListRequest(request *p.ListWorkflowExecutionsByQueryRequest) stri
return sql
}

func getSQLFromListAllRequest(request *p.InternalListAllWorkflowExecutionsByTypeRequest) string {
sql := "select * from dummy "
var earliestTimeStr, latestTimeStr, timeRange, searchString, whereClause, orderByClause string

if !request.EarliestTime.IsZero() && !request.LatestTime.IsZero() {
earliestTimeStr = strconv.FormatInt(request.EarliestTime.UnixNano()-oneMicroSecondInNano, 10)
latestTimeStr = strconv.FormatInt(request.LatestTime.UnixNano()+oneMicroSecondInNano, 10)
timeRange = fmt.Sprintf("%s between %s and %s", es.StartTime, earliestTimeStr, latestTimeStr)
whereClause = addToWhereClause(whereClause, timeRange)
}

statusFilters := make([]string, 0, len(request.StatusFilter))
for _, status := range request.StatusFilter {
statusFilters = append(statusFilters, fmt.Sprintf("%s = %d", es.CloseStatus, status))
}
statusFilterString := strings.Join(statusFilters, " or ")
whereClause = addToWhereClause(whereClause, statusFilterString)

if request.WorkflowSearchValue != "" {
searchString = fmt.Sprintf("%s = \"%s\" or %s = \"%s\" or %s = \"%s\" ",
es.WorkflowID, request.WorkflowSearchValue,
es.RunID, request.WorkflowSearchValue,
es.WorkflowType, request.WorkflowSearchValue)
whereClause = addToWhereClause(whereClause, searchString)
}

if whereClause != "" {
sql += whereClause
}

if request.SortColumn != "" && request.SortOrder != "" {
orderByClause = fmt.Sprintf(" order by %s %s", request.SortColumn, request.SortOrder)
sql += orderByClause
}

return fmt.Sprintf("%s limit %d", sql, request.PageSize)
}

func addToWhereClause(whereClause, condition string) string {
if condition == "" {
return whereClause
}
if whereClause == "" {
return fmt.Sprintf("where (%s)", condition)
}

whereClause += fmt.Sprintf(" and (%s)", condition)
return whereClause
}

func getSQLFromCountRequest(request *p.CountWorkflowExecutionsRequest) string {
var sql string
if strings.TrimSpace(request.Query) == "" {
Expand Down
54 changes: 0 additions & 54 deletions common/persistence/elasticsearch/es_visibility_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,60 +359,6 @@ func (s *ESVisibilitySuite) TestListOpenWorkflowExecutionsByType() {
s.True(strings.Contains(err.Error(), "ListOpenWorkflowExecutionsByType failed"))
}

func (s *ESVisibilitySuite) TestListAllWorkflowExecutions() {
s.mockESClient.On("SearchByQuery", mock.Anything, mock.MatchedBy(func(input *es.SearchByQueryRequest) bool {
s.True(strings.Contains(input.Query, `{"match_phrase":{"WorkflowID":{"query":"123"}}}`))
s.True(strings.Contains(input.Query, `{"match_phrase":{"WorkflowType":{"query":"123"}}}`))
s.True(strings.Contains(input.Query, `{"match_phrase":{"RunID":{"query":"123"}}}`))
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

request := &p.InternalListAllWorkflowExecutionsByTypeRequest{
InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{
DomainUUID: testDomainID,
Domain: testDomain,
EarliestTime: time.UnixMilli(testEarliestTime),
LatestTime: time.UnixMilli(testLatestTime),
PageSize: 10,
},
StatusFilter: []types.WorkflowExecutionCloseStatus{types.WorkflowExecutionCloseStatusCanceled, types.WorkflowExecutionCloseStatusFailed},
WorkflowSearchValue: "123",
SortOrder: "DESC",
SortColumn: es.WorkflowID,
}

ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

_, err := s.visibilityStore.ListAllWorkflowExecutions(ctx, request)
s.NoError(err)

s.mockESClient.On("SearchByQuery", mock.Anything, mock.Anything).Return(nil, errTestESSearch).Once()
_, err = s.visibilityStore.ListAllWorkflowExecutions(ctx, request)
s.Error(err)
_, ok := err.(*types.InternalServiceError)
s.True(ok)
s.True(strings.Contains(err.Error(), "ListAllWorkflowExecutions failed"))

invalidRequest := &p.InternalListAllWorkflowExecutionsByTypeRequest{
InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{
DomainUUID: testDomainID,
Domain: testDomain,
EarliestTime: time.UnixMilli(testEarliestTime),
LatestTime: time.UnixMilli(testLatestTime),
},
SortColumn: ")",
SortOrder: "DESC",
}
_, err = s.visibilityStore.ListAllWorkflowExecutions(ctx, invalidRequest)
s.Error(err)
_, ok = err.(*types.BadRequestError)
s.True(ok)
s.True(strings.Contains(err.Error(), "Error when building query"))

}

func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByType() {
s.mockESClient.On("Search", mock.Anything, mock.MatchedBy(func(input *es.SearchRequest) bool {
s.False(input.IsOpen)
Expand Down
7 changes: 0 additions & 7 deletions common/persistence/nosql/nosql_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,6 @@ func (v *nosqlVisibilityStore) ListWorkflowExecutions(
return nil, persistence.ErrVisibilityOperationNotSupported
}

func (v *nosqlVisibilityStore) ListAllWorkflowExecutions(
ctx context.Context,
request *persistence.InternalListAllWorkflowExecutionsByTypeRequest,
) (*persistence.InternalListWorkflowExecutionsResponse, error) {
return nil, persistence.ErrVisibilityOperationNotSupported
}

func (v *nosqlVisibilityStore) ScanWorkflowExecutions(
_ context.Context,
_ *persistence.ListWorkflowExecutionsByQueryRequest) (*persistence.InternalListWorkflowExecutionsResponse, error) {
Expand Down
Loading

0 comments on commit 8fc4d27

Please sign in to comment.