Skip to content

Commit

Permalink
Add CustomDomain and Operator as default indexed keys (uber#4825)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored May 11, 2022
1 parent 400bbe4 commit 45770c2
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 53 deletions.
6 changes: 2 additions & 4 deletions common/client/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,10 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi
return nil
}
version, err := version.NewVersion(clientFeatureVersion)
if err != nil {
return &types.ClientVersionNotSupportedError{FeatureVersion: clientFeatureVersion, ClientImpl: clientImpl, SupportedVersions: supportedVersions.String()}
}
if !supportedVersions.Check(version) {
if err != nil || !supportedVersions.Check(version) {
return &types.ClientVersionNotSupportedError{FeatureVersion: clientFeatureVersion, ClientImpl: clientImpl, SupportedVersions: supportedVersions.String()}
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"
CustomDomain = "CustomDomain" // to support batch workflow
Operator = "Operator" // to support batch workflow

CustomStringField = "CustomStringField"
CustomKeywordField = "CustomKeywordField"
Expand Down Expand Up @@ -70,6 +72,8 @@ func createDefaultIndexedKeys() map[string]interface{} {
CustomDatetimeField: shared.IndexedValueTypeDatetime,
CadenceChangeVersion: shared.IndexedValueTypeKeyword,
BinaryChecksums: shared.IndexedValueTypeKeyword,
CustomDomain: shared.IndexedValueTypeString,
Operator: shared.IndexedValueTypeString,
}
for k, v := range systemIndexedKeys {
defaultIndexedKeys[k] = v
Expand Down
41 changes: 23 additions & 18 deletions common/elasticsearch/validator/queryValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,19 @@ func (qv *VisibilityQueryValidator) validateComparisonExpr(expr sqlparser.Expr)
return errors.New("invalid comparison expression")
}
colNameStr := colName.Name.String()
if qv.isValidSearchAttributes(colNameStr) {
if !definition.IsSystemIndexedKey(colNameStr) { // add search attribute prefix
comparisonExpr.Left = &sqlparser.ColName{
Metadata: colName.Metadata,
Name: sqlparser.NewColIdent(definition.Attr + "." + colNameStr),
Qualifier: colName.Qualifier,
}
if !qv.isValidSearchAttributes(colNameStr) {
return fmt.Errorf("invalid search attribute %q", colNameStr)
}

if !definition.IsSystemIndexedKey(colNameStr) { // add search attribute prefix
comparisonExpr.Left = &sqlparser.ColName{
Metadata: colName.Metadata,
Name: sqlparser.NewColIdent(definition.Attr + "." + colNameStr),
Qualifier: colName.Qualifier,
}
return nil
}
return errors.New("invalid search attribute")

return nil
}

func (qv *VisibilityQueryValidator) validateRangeExpr(expr sqlparser.Expr) error {
Expand All @@ -156,17 +158,20 @@ func (qv *VisibilityQueryValidator) validateRangeExpr(expr sqlparser.Expr) error
return errors.New("invalid range expression")
}
colNameStr := colName.Name.String()
if qv.isValidSearchAttributes(colNameStr) {
if !definition.IsSystemIndexedKey(colNameStr) { // add search attribute prefix
rangeCond.Left = &sqlparser.ColName{
Metadata: colName.Metadata,
Name: sqlparser.NewColIdent(definition.Attr + "." + colNameStr),
Qualifier: colName.Qualifier,
}

if !qv.isValidSearchAttributes(colNameStr) {
return fmt.Errorf("invalid search attribute %q", colNameStr)
}

if !definition.IsSystemIndexedKey(colNameStr) { // add search attribute prefix
rangeCond.Left = &sqlparser.ColName{
Metadata: colName.Metadata,
Name: sqlparser.NewColIdent(definition.Attr + "." + colNameStr),
Qualifier: colName.Qualifier,
}
return nil
}
return errors.New("invalid search attribute")

return nil
}

func (qv *VisibilityQueryValidator) validateOrderByExpr(orderBy sqlparser.OrderBy) error {
Expand Down
4 changes: 2 additions & 2 deletions common/elasticsearch/validator/queryValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func TestValidateQuery(t *testing.T) {
{
msg: "invalid search attribute in comparison",
query: "Invalid = 'a' and 1 < 2",
err: "invalid search attribute",
err: "invalid search attribute \"Invalid\"",
},
{
msg: "invalid search attribute in range",
query: "Invalid between 1 and 2 or WorkflowID = 'wid'",
err: "invalid search attribute",
err: "invalid search attribute \"Invalid\"",
},
{
msg: "only order by",
Expand Down
9 changes: 3 additions & 6 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
// executions store, and stores workflow execution records for visibility
// purposes.

type (
// ErrVisibilityOperationNotSupported is an error which indicates that operation is not supported in selected persistence
var ErrVisibilityOperationNotSupported = &types.BadRequestError{Message: "Operation is not supported. Please use ElasticSearch"}

type (
// RecordWorkflowExecutionStartedRequest is used to add a record of a newly
// started execution
RecordWorkflowExecutionStartedRequest struct {
Expand Down Expand Up @@ -202,11 +204,6 @@ type (
}
)

// NewOperationNotSupportErrorForVis create error for operation not support in visibility
func NewOperationNotSupportErrorForVis() error {
return &types.BadRequestError{Message: "Operation not support. Please use on ElasticSearch"}
}

// IsNopUpsertWorkflowRequest return whether upsert request should be no-op
func IsNopUpsertWorkflowRequest(request *InternalUpsertWorkflowExecutionRequest) bool {
_, exist := request.SearchAttributes[definition.CadenceChangeVersion]
Expand Down
20 changes: 10 additions & 10 deletions common/persistence/nosql/nosqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (v *nosqlVisibilityStore) UpsertWorkflowExecution(
if p.IsNopUpsertWorkflowRequest(request) {
return nil
}
return p.NewOperationNotSupportErrorForVis()
return p.ErrVisibilityOperationNotSupported
}

func (v *nosqlVisibilityStore) ListOpenWorkflowExecutions(
Expand Down Expand Up @@ -352,21 +352,21 @@ func (v *nosqlVisibilityStore) DeleteWorkflowExecution(
}

func (v *nosqlVisibilityStore) ListWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
_ context.Context,
_ *p.ListWorkflowExecutionsByQueryRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
return nil, p.NewOperationNotSupportErrorForVis()
return nil, p.ErrVisibilityOperationNotSupported
}

func (v *nosqlVisibilityStore) ScanWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return nil, p.NewOperationNotSupportErrorForVis()
_ context.Context,
_ *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return nil, p.ErrVisibilityOperationNotSupported
}

func (v *nosqlVisibilityStore) CountWorkflowExecutions(
ctx context.Context,
request *p.CountWorkflowExecutionsRequest,
_ context.Context,
_ *p.CountWorkflowExecutionsRequest,
) (*p.CountWorkflowExecutionsResponse, error) {
return nil, p.NewOperationNotSupportErrorForVis()
return nil, p.ErrVisibilityOperationNotSupported
}
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (s *DBVisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
Memo: nil,
SearchAttributes: nil,
},
expected: p.NewOperationNotSupportErrorForVis(),
expected: p.ErrVisibilityOperationNotSupported,
},
}

Expand Down
22 changes: 11 additions & 11 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed(
}

func (s *sqlVisibilityStore) UpsertWorkflowExecution(
ctx context.Context,
_ context.Context,
request *p.InternalUpsertWorkflowExecutionRequest,
) error {
if p.IsNopUpsertWorkflowRequest(request) {
return nil
}
return p.NewOperationNotSupportErrorForVis()
return p.ErrVisibilityOperationNotSupported
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(
Expand Down Expand Up @@ -293,24 +293,24 @@ func (s *sqlVisibilityStore) DeleteWorkflowExecution(
}

func (s *sqlVisibilityStore) ListWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
_ context.Context,
_ *p.ListWorkflowExecutionsByQueryRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
return nil, p.NewOperationNotSupportErrorForVis()
return nil, p.ErrVisibilityOperationNotSupported
}

func (s *sqlVisibilityStore) ScanWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
_ context.Context,
_ *p.ListWorkflowExecutionsByQueryRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
return nil, p.NewOperationNotSupportErrorForVis()
return nil, p.ErrVisibilityOperationNotSupported
}

func (s *sqlVisibilityStore) CountWorkflowExecutions(
ctx context.Context,
request *p.CountWorkflowExecutionsRequest,
_ context.Context,
_ *p.CountWorkflowExecutionsRequest,
) (*p.CountWorkflowExecutionsResponse, error) {
return nil, p.NewOperationNotSupportErrorForVis()
return nil, p.ErrVisibilityOperationNotSupported
}

func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.InternalVisibilityWorkflowExecutionInfo {
Expand Down
2 changes: 1 addition & 1 deletion tools/cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func getCurrentUserFromEnv() string {
return os.Getenv(n)
}
}
return "unkown"
return "unknown"
}

func prettyPrintJSONObject(o interface{}) {
Expand Down

0 comments on commit 45770c2

Please sign in to comment.