Skip to content

Commit

Permalink
Add method to list all workflow executions with support for partial m…
Browse files Browse the repository at this point in the history
…atch and search params (#6017)

* try

* update

* Update dataVisibilityManagerInterfaces.go

* updated

* Update pinot_visibility_store_test.go

* Update visibility_store_mock.go

* Update es_visibility_store_test.go

* Update pinot_visibility_store.go
  • Loading branch information
sankari165 committed May 22, 2024
1 parent bbe87aa commit 1eb24b3
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 11 deletions.
2 changes: 1 addition & 1 deletion common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
// purposes.

// ErrVisibilityOperationNotSupported is an error which indicates that operation is not supported in selected persistence
var ErrVisibilityOperationNotSupported = &types.BadRequestError{Message: "Operation is not supported. Please use ElasticSearch"}
var ErrVisibilityOperationNotSupported = &types.BadRequestError{Message: "Operation is not supported"}

type (
// RecordWorkflowExecutionStartedRequest is used to add a record of a newly
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type (
GetClosedWorkflowExecution(ctx context.Context, request *InternalGetClosedWorkflowExecutionRequest) (*InternalGetClosedWorkflowExecutionResponse, error)
DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
ListAllWorkflowExecutions(ctx context.Context, request *InternalListAllWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error)
ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
Expand Down Expand Up @@ -706,6 +707,13 @@ type (
WorkflowTypeName string
}

// InternalListAllWorkflowExecutionsByTypeRequest is used to list all open and closed executions with specific filters in a domain
InternalListAllWorkflowExecutionsByTypeRequest struct {
InternalListWorkflowExecutionsRequest
PartialMatch bool
WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID
}

// InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution
InternalGetClosedWorkflowExecutionResponse struct {
Execution *InternalVisibilityWorkflowExecutionInfo
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/elasticsearch/es_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (

const (
esPersistenceName = "elasticsearch"
DomainID = "DomainID"
StartTime = "StartTime"
)

type (
Expand Down Expand Up @@ -442,6 +444,10 @@ func (v *esVisibilityStore) ListWorkflowExecutions(
return resp, nil
}

func (v *esVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return nil, p.ErrVisibilityOperationNotSupported
}

func (v *esVisibilityStore) ScanWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/elasticsearch/es_visibility_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ func (s *ESVisibilitySuite) TestListOpenWorkflowExecutionsByType() {
s.True(strings.Contains(err.Error(), "ListOpenWorkflowExecutionsByType failed"))
}

func (s *ESVisibilitySuite) TestListAllWorkflowExecutions() {
_, err := s.visibilityStore.ListAllWorkflowExecutions(context.Background(), &p.InternalListAllWorkflowExecutionsByTypeRequest{})
s.Error(err)
s.Equal(p.ErrVisibilityOperationNotSupported, err)
}

func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByType() {
s.mockESClient.On("Search", mock.Anything, mock.MatchedBy(func(input *es.SearchRequest) bool {
s.False(input.IsOpen)
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/nosql/nosql_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ func (v *nosqlVisibilityStore) ListWorkflowExecutions(
return nil, persistence.ErrVisibilityOperationNotSupported
}

func (v *nosqlVisibilityStore) ListAllWorkflowExecutions(
ctx context.Context,
request *persistence.InternalListAllWorkflowExecutionsByTypeRequest,
) (*persistence.InternalListWorkflowExecutionsResponse, error) {
return nil, persistence.ErrVisibilityOperationNotSupported
}

func (v *nosqlVisibilityStore) ScanWorkflowExecutions(
_ context.Context,
_ *persistence.ListWorkflowExecutionsByQueryRequest) (*persistence.InternalListWorkflowExecutionsResponse, error) {
Expand Down
105 changes: 105 additions & 0 deletions common/persistence/pinot/pinot_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,28 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co
return v.pinotClient.Search(req)
}

func (v *pinotVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query, err := getListAllWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list all workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
IsOpen: true,
Filter: isRecordValid,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
ListRequest: &request.InternalListWorkflowExecutionsRequest,
}

return v.pinotClient.Search(req)
}

func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
Expand Down Expand Up @@ -659,6 +681,7 @@ func isTimeStruct(value []byte) ([]byte, error) {
type PinotQuery struct {
query string
filters PinotQueryFilter
search PinotQuerySearchField
sorters string
limits string
}
Expand All @@ -667,10 +690,47 @@ type PinotQueryFilter struct {
string
}

type PinotQuerySearchField struct {
string
}

func (s *PinotQuerySearchField) checkFirstSearchField() {
if s.string == "" {
s.string = "( "
} else {
s.string += "OR "
}
}

func (s *PinotQuerySearchField) lastSearchField() {
if s.string != "" {
s.string += " )"
}
}

func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) {
s.checkFirstSearchField()
if _, ok := val.(string); ok {
val = fmt.Sprintf("'%s'", val)
} else {
val = fmt.Sprintf("%v", val)
}

quotedVal := fmt.Sprintf("%s", val)
s.string += fmt.Sprintf("%s = %s\n", obj, quotedVal)
}

func (s *PinotQuerySearchField) addMatch(obj string, val interface{}) {
s.checkFirstSearchField()

s.string += fmt.Sprintf("text_match(%s, '\"%s\"')\n", obj, val)
}

func NewPinotQuery(tableName string) PinotQuery {
return PinotQuery{
query: fmt.Sprintf("SELECT *\nFROM %s\n", tableName),
filters: PinotQueryFilter{},
search: PinotQuerySearchField{},
sorters: "",
limits: "",
}
Expand Down Expand Up @@ -976,6 +1036,51 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor
return query.String(), nil
}

func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (string, error) {
if request == nil {
return "", nil
}

query := NewPinotQuery(tableName)

query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)

if !request.EarliestTime.IsZero() && !request.LatestTime.IsZero() {
earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano
latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano
query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds
}

query.addPinotSorter(StartTime, DescendingOrder)

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

if request.WorkflowSearchValue != "" {
if request.PartialMatch {
query.search.addMatch(WorkflowID, request.WorkflowSearchValue)
query.search.addMatch(WorkflowType, request.WorkflowSearchValue)
query.search.addMatch(RunID, request.WorkflowSearchValue)
} else {
query.search.addEqual(WorkflowID, request.WorkflowSearchValue)
query.search.addEqual(WorkflowType, request.WorkflowSearchValue)
query.search.addEqual(RunID, request.WorkflowSearchValue)
}

query.search.lastSearchField()
query.filters.addQuery(query.search.string)
}

return query.String(), nil
}

func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) {
if request == nil {
return "", nil
Expand Down
Loading

0 comments on commit 1eb24b3

Please sign in to comment.