Skip to content

Commit

Permalink
Use internal IndexedValueType (uber#5016)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Nov 10, 2022
1 parent 0c8e16c commit 197bd16
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 70 deletions.
48 changes: 24 additions & 24 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package definition

import "github.com/uber/cadence/.gen/go/shared"
import "github.com/uber/cadence/common/types"

// valid indexed fields on ES
const (
Expand Down Expand Up @@ -65,16 +65,16 @@ var defaultIndexedKeys = createDefaultIndexedKeys()

func createDefaultIndexedKeys() map[string]interface{} {
defaultIndexedKeys := map[string]interface{}{
CustomStringField: shared.IndexedValueTypeString,
CustomKeywordField: shared.IndexedValueTypeKeyword,
CustomIntField: shared.IndexedValueTypeInt,
CustomBoolField: shared.IndexedValueTypeBool,
CustomDoubleField: shared.IndexedValueTypeDouble,
CustomDatetimeField: shared.IndexedValueTypeDatetime,
CadenceChangeVersion: shared.IndexedValueTypeKeyword,
BinaryChecksums: shared.IndexedValueTypeKeyword,
CustomDomain: shared.IndexedValueTypeString,
Operator: shared.IndexedValueTypeString,
CustomStringField: types.IndexedValueTypeString,
CustomKeywordField: types.IndexedValueTypeKeyword,
CustomIntField: types.IndexedValueTypeInt,
CustomBoolField: types.IndexedValueTypeBool,
CustomDoubleField: types.IndexedValueTypeDouble,
CustomDatetimeField: types.IndexedValueTypeDatetime,
CadenceChangeVersion: types.IndexedValueTypeKeyword,
BinaryChecksums: types.IndexedValueTypeKeyword,
CustomDomain: types.IndexedValueTypeString,
Operator: types.IndexedValueTypeString,
}
for k, v := range systemIndexedKeys {
defaultIndexedKeys[k] = v
Expand All @@ -89,19 +89,19 @@ func GetDefaultIndexedKeys() map[string]interface{} {

// systemIndexedKeys is Cadence created visibility keys
var systemIndexedKeys = map[string]interface{}{
DomainID: shared.IndexedValueTypeKeyword,
WorkflowID: shared.IndexedValueTypeKeyword,
RunID: shared.IndexedValueTypeKeyword,
WorkflowType: shared.IndexedValueTypeKeyword,
StartTime: shared.IndexedValueTypeInt,
ExecutionTime: shared.IndexedValueTypeInt,
CloseTime: shared.IndexedValueTypeInt,
CloseStatus: shared.IndexedValueTypeInt,
HistoryLength: shared.IndexedValueTypeInt,
TaskList: shared.IndexedValueTypeKeyword,
IsCron: shared.IndexedValueTypeBool,
NumClusters: shared.IndexedValueTypeInt,
UpdateTime: shared.IndexedValueTypeInt,
DomainID: types.IndexedValueTypeKeyword,
WorkflowID: types.IndexedValueTypeKeyword,
RunID: types.IndexedValueTypeKeyword,
WorkflowType: types.IndexedValueTypeKeyword,
StartTime: types.IndexedValueTypeInt,
ExecutionTime: types.IndexedValueTypeInt,
CloseTime: types.IndexedValueTypeInt,
CloseStatus: types.IndexedValueTypeInt,
HistoryLength: types.IndexedValueTypeInt,
TaskList: types.IndexedValueTypeKeyword,
IsCron: types.IndexedValueTypeBool,
NumClusters: types.IndexedValueTypeInt,
UpdateTime: types.IndexedValueTypeInt,
}

// IsSystemIndexedKey return true is key is system added
Expand Down
2 changes: 1 addition & 1 deletion common/elasticsearch/validator/searchAttrValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (sv *SearchAttributesValidator) isValidSearchAttributesValue(
key string,
value []byte,
) bool {
valueType := common.ConvertIndexedValueTypeToThriftType(validAttr[key], sv.logger)
valueType := common.ConvertIndexedValueTypeToInternalType(validAttr[key], sv.logger)
_, err := common.DeserializeSearchAttributeValue(value, valueType)
return err == nil
}
12 changes: 6 additions & 6 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (v *esVisibilityStore) processSortField(dsl *fastjson.Value) (string, error
obj.Visit(func(k []byte, v *fastjson.Value) { // visit is only way to get object key in fastjson
sortField = string(k)
})
if v.getFieldType(sortField) == workflow.IndexedValueTypeString {
if v.getFieldType(sortField) == types.IndexedValueTypeString {
return "", errors.New("not able to sort by IndexedValueTypeString field, use IndexedValueTypeKeyword field")
}
// add RunID as tie-breaker
Expand All @@ -673,7 +673,7 @@ func (v *esVisibilityStore) processSortField(dsl *fastjson.Value) (string, error
return sortField, nil
}

func (v *esVisibilityStore) getFieldType(fieldName string) workflow.IndexedValueType {
func (v *esVisibilityStore) getFieldType(fieldName string) types.IndexedValueType {
if strings.HasPrefix(fieldName, definition.Attr) {
fieldName = fieldName[len(definition.Attr)+1:] // remove prefix
}
Expand All @@ -682,14 +682,14 @@ func (v *esVisibilityStore) getFieldType(fieldName string) workflow.IndexedValue
if !ok {
v.logger.Error("Unknown fieldName, validation should be done in frontend already", tag.Value(fieldName))
}
return common.ConvertIndexedValueTypeToThriftType(fieldType, v.logger)
return common.ConvertIndexedValueTypeToInternalType(fieldType, v.logger)
}

func (v *esVisibilityStore) getValueOfSearchAfterInJSON(token *es.ElasticVisibilityPageToken, sortField string) (string, error) {
var sortVal interface{}
var err error
switch v.getFieldType(sortField) {
case workflow.IndexedValueTypeInt, workflow.IndexedValueTypeDatetime, workflow.IndexedValueTypeBool:
case types.IndexedValueTypeInt, types.IndexedValueTypeDatetime, types.IndexedValueTypeBool:
sortVal, err = token.SortValue.(json.Number).Int64()
if err != nil {
err, ok := err.(*strconv.NumError) // field not present, ES will return big int +-9223372036854776000
Expand All @@ -702,7 +702,7 @@ func (v *esVisibilityStore) getValueOfSearchAfterInJSON(token *es.ElasticVisibil
sortVal = math.MaxInt64
}
}
case workflow.IndexedValueTypeDouble:
case types.IndexedValueTypeDouble:
switch token.SortValue.(type) {
case json.Number:
sortVal, err = token.SortValue.(json.Number).Float64()
Expand All @@ -712,7 +712,7 @@ func (v *esVisibilityStore) getValueOfSearchAfterInJSON(token *es.ElasticVisibil
case string: // field not present, ES will return "-Infinity" or "Infinity"
sortVal = fmt.Sprintf(`"%s"`, token.SortValue.(string))
}
case workflow.IndexedValueTypeKeyword:
case types.IndexedValueTypeKeyword:
if token.SortValue != nil {
sortVal = fmt.Sprintf(`"%s"`, token.SortValue.(string))
} else { // field not present, ES will return null (so token.SortValue is nil)
Expand Down
15 changes: 6 additions & 9 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/valyala/fastjson"

"github.com/uber/cadence/.gen/go/indexer"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/dynamicconfig"
Expand All @@ -46,7 +45,6 @@ import (
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)

type ESVisibilitySuite struct {
Expand Down Expand Up @@ -182,8 +180,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
memoBytes := []byte(`test bytes`)
request.Memo = p.NewDataBlob(memoBytes, common.EncodingTypeThriftRW)
request.CloseTimestamp = time.Unix(0, int64(999))
closeStatus := workflow.WorkflowExecutionCloseStatusTerminated
request.Status = *thrift.ToWorkflowExecutionCloseStatus(&closeStatus)
request.Status = types.WorkflowExecutionCloseStatusTerminated
request.HistoryLength = int64(20)
request.IsCron = false
request.NumClusters = 2
Expand All @@ -200,7 +197,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
s.Equal(memoBytes, fields[es.Memo].GetBinaryData())
s.Equal(string(common.EncodingTypeThriftRW), fields[es.Encoding].GetStringData())
s.Equal(request.CloseTimestamp.UnixNano(), fields[es.CloseTime].GetIntData())
s.Equal(int64(closeStatus), fields[es.CloseStatus].GetIntData())
s.Equal(int64(request.Status), fields[es.CloseStatus].GetIntData())
s.Equal(request.HistoryLength, fields[es.HistoryLength].GetIntData())
s.Equal(request.IsCron, fields[es.IsCron].GetBoolData())
s.Equal((int64)(request.NumClusters), fields[es.NumClusters].GetIntData())
Expand Down Expand Up @@ -439,10 +436,10 @@ func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByStatus() {
return true
})).Return(testSearchResult, nil).Once()

closeStatus := workflow.WorkflowExecutionCloseStatus(testCloseStatus)
closeStatus := types.WorkflowExecutionCloseStatus(testCloseStatus)
request := &p.InternalListClosedWorkflowExecutionsByStatusRequest{
InternalListWorkflowExecutionsRequest: *testRequest,
Status: *thrift.ToWorkflowExecutionCloseStatus(&closeStatus),
Status: closeStatus,
}

ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
Expand Down Expand Up @@ -978,8 +975,8 @@ func (s *ESVisibilitySuite) TestProcessAllValuesForKey() {
}

func (s *ESVisibilitySuite) TestGetFieldType() {
s.Equal(workflow.IndexedValueTypeInt, s.visibilityStore.getFieldType("StartTime"))
s.Equal(workflow.IndexedValueTypeDatetime, s.visibilityStore.getFieldType("Attr.CustomDatetimeField"))
s.Equal(types.IndexedValueTypeInt, s.visibilityStore.getFieldType("StartTime"))
s.Equal(types.IndexedValueTypeDatetime, s.visibilityStore.getFieldType("Attr.CustomDatetimeField"))
}

func (s *ESVisibilitySuite) TestGetValueOfSearchAfterInJSON() {
Expand Down
39 changes: 26 additions & 13 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pborman/uuid"
"go.uber.org/yarpc/yarpcerrors"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -788,60 +787,74 @@ func IsJustOrderByClause(clause string) bool {
return strings.HasPrefix(whereClause, "order by")
}

// ConvertIndexedValueTypeToThriftType takes fieldType as interface{} and convert to IndexedValueType.
// ConvertIndexedValueTypeToInternalType takes fieldType as interface{} and convert to IndexedValueType.
// Because different implementation of dynamic config client may lead to different types
func ConvertIndexedValueTypeToThriftType(fieldType interface{}, logger log.Logger) workflow.IndexedValueType {
func ConvertIndexedValueTypeToInternalType(fieldType interface{}, logger log.Logger) types.IndexedValueType {
switch t := fieldType.(type) {
case float64:
return workflow.IndexedValueType(t)
return types.IndexedValueType(t)
case int:
return workflow.IndexedValueType(t)
case workflow.IndexedValueType:
return types.IndexedValueType(t)
case types.IndexedValueType:
return t
case []byte:
var result types.IndexedValueType
if err := result.UnmarshalText(t); err != nil {
logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err))
return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
}
return result
case string:
var result types.IndexedValueType
if err := result.UnmarshalText([]byte(t)); err != nil {
logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err))
return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
}
return result
default:
// Unknown fieldType, please make sure dynamic config return correct value type
logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t))
return fieldType.(workflow.IndexedValueType) // it will panic and been captured by logger
return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
}
}

// DeserializeSearchAttributeValue takes json encoded search attribute value and it's type as input, then
// unmarshal the value into a concrete type and return the value
func DeserializeSearchAttributeValue(value []byte, valueType workflow.IndexedValueType) (interface{}, error) {
func DeserializeSearchAttributeValue(value []byte, valueType types.IndexedValueType) (interface{}, error) {
switch valueType {
case workflow.IndexedValueTypeString, workflow.IndexedValueTypeKeyword:
case types.IndexedValueTypeString, types.IndexedValueTypeKeyword:
var val string
if err := json.Unmarshal(value, &val); err != nil {
var listVal []string
err = json.Unmarshal(value, &listVal)
return listVal, err
}
return val, nil
case workflow.IndexedValueTypeInt:
case types.IndexedValueTypeInt:
var val int64
if err := json.Unmarshal(value, &val); err != nil {
var listVal []int64
err = json.Unmarshal(value, &listVal)
return listVal, err
}
return val, nil
case workflow.IndexedValueTypeDouble:
case types.IndexedValueTypeDouble:
var val float64
if err := json.Unmarshal(value, &val); err != nil {
var listVal []float64
err = json.Unmarshal(value, &listVal)
return listVal, err
}
return val, nil
case workflow.IndexedValueTypeBool:
case types.IndexedValueTypeBool:
var val bool
if err := json.Unmarshal(value, &val); err != nil {
var listVal []bool
err = json.Unmarshal(value, &listVal)
return listVal, err
}
return val, nil
case workflow.IndexedValueTypeDatetime:
case types.IndexedValueTypeDatetime:
var val time.Time
if err := json.Unmarshal(value, &val); err != nil {
var listVal []time.Time
Expand Down
16 changes: 10 additions & 6 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/yarpc/yarpcerrors"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -272,11 +271,16 @@ func TestCreateHistoryStartWorkflowRequest_ExpirationTimeWithoutCron(t *testing.
require.True(t, delta < 62*time.Second)
}

func TestConvertIndexedValueTypeToThriftType(t *testing.T) {
expected := workflow.IndexedValueType_Values()
for i := 0; i < len(expected); i++ {
require.Equal(t, expected[i], ConvertIndexedValueTypeToThriftType(i, nil))
require.Equal(t, expected[i], ConvertIndexedValueTypeToThriftType(float64(i), nil))
func TestConvertIndexedValueTypeToInternalType(t *testing.T) {
values := []types.IndexedValueType{types.IndexedValueTypeString, types.IndexedValueTypeKeyword, types.IndexedValueTypeInt, types.IndexedValueTypeDouble, types.IndexedValueTypeBool, types.IndexedValueTypeDatetime}
for _, expected := range values {
require.Equal(t, expected, ConvertIndexedValueTypeToInternalType(int(expected), nil))
require.Equal(t, expected, ConvertIndexedValueTypeToInternalType(float64(expected), nil))

buffer, err := expected.MarshalText()
require.NoError(t, err)
require.Equal(t, expected, ConvertIndexedValueTypeToInternalType(buffer, nil))
require.Equal(t, expected, ConvertIndexedValueTypeToInternalType(string(buffer), nil))
}
}

Expand Down
3 changes: 1 addition & 2 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)

const (
Expand Down Expand Up @@ -4466,7 +4465,7 @@ func (wh *WorkflowHandler) getArchivedHistory(
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
converted := make(map[string]types.IndexedValueType)
for k, v := range keys {
converted[k] = thrift.ToIndexedValueType(common.ConvertIndexedValueTypeToThriftType(v, wh.GetLogger()))
converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
}
return converted
}
Expand Down
25 changes: 18 additions & 7 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
Expand Down Expand Up @@ -1410,12 +1409,18 @@ func (s *workflowHandlerSuite) TestConvertIndexedKeyToThrift() {
"key4i": 3,
"key5i": 4,
"key6i": 5,
"key1t": shared.IndexedValueTypeString,
"key2t": shared.IndexedValueTypeKeyword,
"key3t": shared.IndexedValueTypeInt,
"key4t": shared.IndexedValueTypeDouble,
"key5t": shared.IndexedValueTypeBool,
"key6t": shared.IndexedValueTypeDatetime,
"key1t": types.IndexedValueTypeString,
"key2t": types.IndexedValueTypeKeyword,
"key3t": types.IndexedValueTypeInt,
"key4t": types.IndexedValueTypeDouble,
"key5t": types.IndexedValueTypeBool,
"key6t": types.IndexedValueTypeDatetime,
"key1s": "STRING",
"key2s": "KEYWORD",
"key3s": "INT",
"key4s": "DOUBLE",
"key5s": "BOOL",
"key6s": "DATETIME",
}
result := wh.convertIndexedKeyToThrift(m)
s.Equal(types.IndexedValueTypeString, result["key1"])
Expand All @@ -1436,6 +1441,12 @@ func (s *workflowHandlerSuite) TestConvertIndexedKeyToThrift() {
s.Equal(types.IndexedValueTypeDouble, result["key4t"])
s.Equal(types.IndexedValueTypeBool, result["key5t"])
s.Equal(types.IndexedValueTypeDatetime, result["key6t"])
s.Equal(types.IndexedValueTypeString, result["key1s"])
s.Equal(types.IndexedValueTypeKeyword, result["key2s"])
s.Equal(types.IndexedValueTypeInt, result["key3s"])
s.Equal(types.IndexedValueTypeDouble, result["key4s"])
s.Equal(types.IndexedValueTypeBool, result["key5s"])
s.Equal(types.IndexedValueTypeDatetime, result["key6s"])
s.Panics(func() {
wh.convertIndexedKeyToThrift(map[string]interface{}{
"invalidType": "unknown",
Expand Down
3 changes: 1 addition & 2 deletions tools/cli/workflowCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
"github.com/uber/cadence/service/history/execution"
)

Expand Down Expand Up @@ -965,7 +964,7 @@ func convertSearchAttributesToMapOfInterface(searchAttributes *types.SearchAttri
indexedFields := searchAttributes.GetIndexedFields()
for k, v := range indexedFields {
valueType := validKeys[k]
deserializedValue, err := common.DeserializeSearchAttributeValue(v, thrift.FromIndexedValueType(valueType))
deserializedValue, err := common.DeserializeSearchAttributeValue(v, valueType)
if err != nil {
ErrorAndExit("Error deserializing search attribute value", err)
}
Expand Down

0 comments on commit 197bd16

Please sign in to comment.