diff --git a/.gen/go/indexer/idl.go b/.gen/go/indexer/idl.go new file mode 100644 index 00000000000..dbf829a06bd --- /dev/null +++ b/.gen/go/indexer/idl.go @@ -0,0 +1,43 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by thriftrw v1.13.1. DO NOT EDIT. +// @generated + +package indexer + +import ( + "github.com/uber/cadence/.gen/go/shared" + "go.uber.org/thriftrw/thriftreflect" +) + +// ThriftModule represents the IDL file used to generate this package. +var ThriftModule = &thriftreflect.ThriftModule{ + Name: "indexer", + Package: "github.com/uber/cadence/.gen/go/indexer", + FilePath: "indexer.thrift", + SHA1: "957d45f87bfa8baf14af0e071c30083fb9cb66f8", + Includes: []*thriftreflect.ThriftModule{ + shared.ThriftModule, + }, + Raw: rawIDL, +} + +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.indexer\n\ninclude \"shared.thrift\"\n\nenum MessageType {\n Index\n Delete\n}\n\nenum FieldType {\n String\n Int\n Bool\n}\n\nstruct Field {\n 10: optional FieldType type\n 20: optional string stringData\n 30: optional i64 (js.type = \"Long\") intData\n 40: optional bool boolData\n}\n\nstruct IndexAttributes {\n 10: optional map fields\n}\n\nstruct Message {\n 10: optional MessageType messageType\n 20: optional string domainID\n 30: optional string workflowID\n 40: optional string runID\n 50: optional i64 (js.type = \"Long\") version\n 60: optional IndexAttributes indexAttributes\n}" diff --git a/.gen/go/indexer/types.go b/.gen/go/indexer/types.go new file mode 100644 index 00000000000..52bad30e44b --- /dev/null +++ b/.gen/go/indexer/types.go @@ -0,0 +1,1266 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by thriftrw v1.13.1. DO NOT EDIT. +// @generated + +package indexer + +import ( + "bytes" + "encoding/json" + "fmt" + "go.uber.org/multierr" + "go.uber.org/thriftrw/wire" + "go.uber.org/zap/zapcore" + "math" + "strconv" + "strings" +) + +type Field struct { + Type *FieldType `json:"type,omitempty"` + StringData *string `json:"stringData,omitempty"` + IntData *int64 `json:"intData,omitempty"` + BoolData *bool `json:"boolData,omitempty"` +} + +// ToWire translates a Field struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *Field) ToWire() (wire.Value, error) { + var ( + fields [4]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.Type != nil { + w, err = v.Type.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.StringData != nil { + w, err = wire.NewValueString(*(v.StringData)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.IntData != nil { + w, err = wire.NewValueI64(*(v.IntData)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + if v.BoolData != nil { + w, err = wire.NewValueBool(*(v.BoolData)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 40, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _FieldType_Read(w wire.Value) (FieldType, error) { + var v FieldType + err := v.FromWire(w) + return v, err +} + +// FromWire deserializes a Field struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a Field struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v Field +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *Field) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TI32 { + var x FieldType + x, err = _FieldType_Read(field.Value) + v.Type = &x + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.StringData = &x + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.IntData = &x + if err != nil { + return err + } + + } + case 40: + if field.Value.Type() == wire.TBool { + var x bool + x, err = field.Value.GetBool(), error(nil) + v.BoolData = &x + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a Field +// struct. +func (v *Field) String() string { + if v == nil { + return "" + } + + var fields [4]string + i := 0 + if v.Type != nil { + fields[i] = fmt.Sprintf("Type: %v", *(v.Type)) + i++ + } + if v.StringData != nil { + fields[i] = fmt.Sprintf("StringData: %v", *(v.StringData)) + i++ + } + if v.IntData != nil { + fields[i] = fmt.Sprintf("IntData: %v", *(v.IntData)) + i++ + } + if v.BoolData != nil { + fields[i] = fmt.Sprintf("BoolData: %v", *(v.BoolData)) + i++ + } + + return fmt.Sprintf("Field{%v}", strings.Join(fields[:i], ", ")) +} + +func _FieldType_EqualsPtr(lhs, rhs *FieldType) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return x.Equals(y) + } + return lhs == nil && rhs == nil +} + +func _String_EqualsPtr(lhs, rhs *string) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return (x == y) + } + return lhs == nil && rhs == nil +} + +func _I64_EqualsPtr(lhs, rhs *int64) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return (x == y) + } + return lhs == nil && rhs == nil +} + +func _Bool_EqualsPtr(lhs, rhs *bool) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return (x == y) + } + return lhs == nil && rhs == nil +} + +// Equals returns true if all the fields of this Field match the +// provided Field. +// +// This function performs a deep comparison. +func (v *Field) Equals(rhs *Field) bool { + if v == nil { + return rhs == nil + } else if rhs == nil { + return false + } + if !_FieldType_EqualsPtr(v.Type, rhs.Type) { + return false + } + if !_String_EqualsPtr(v.StringData, rhs.StringData) { + return false + } + if !_I64_EqualsPtr(v.IntData, rhs.IntData) { + return false + } + if !_Bool_EqualsPtr(v.BoolData, rhs.BoolData) { + return false + } + + return true +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of Field. +func (v *Field) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { + if v == nil { + return nil + } + if v.Type != nil { + err = multierr.Append(err, enc.AddObject("type", *v.Type)) + } + if v.StringData != nil { + enc.AddString("stringData", *v.StringData) + } + if v.IntData != nil { + enc.AddInt64("intData", *v.IntData) + } + if v.BoolData != nil { + enc.AddBool("boolData", *v.BoolData) + } + return err +} + +// GetType returns the value of Type if it is set or its +// zero value if it is unset. +func (v *Field) GetType() (o FieldType) { + if v.Type != nil { + return *v.Type + } + + return +} + +// GetStringData returns the value of StringData if it is set or its +// zero value if it is unset. +func (v *Field) GetStringData() (o string) { + if v.StringData != nil { + return *v.StringData + } + + return +} + +// GetIntData returns the value of IntData if it is set or its +// zero value if it is unset. +func (v *Field) GetIntData() (o int64) { + if v.IntData != nil { + return *v.IntData + } + + return +} + +// GetBoolData returns the value of BoolData if it is set or its +// zero value if it is unset. +func (v *Field) GetBoolData() (o bool) { + if v.BoolData != nil { + return *v.BoolData + } + + return +} + +type FieldType int32 + +const ( + FieldTypeString FieldType = 0 + FieldTypeInt FieldType = 1 + FieldTypeBool FieldType = 2 +) + +// FieldType_Values returns all recognized values of FieldType. +func FieldType_Values() []FieldType { + return []FieldType{ + FieldTypeString, + FieldTypeInt, + FieldTypeBool, + } +} + +// UnmarshalText tries to decode FieldType from a byte slice +// containing its name. +// +// var v FieldType +// err := v.UnmarshalText([]byte("String")) +func (v *FieldType) UnmarshalText(value []byte) error { + switch s := string(value); s { + case "String": + *v = FieldTypeString + return nil + case "Int": + *v = FieldTypeInt + return nil + case "Bool": + *v = FieldTypeBool + return nil + default: + val, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return fmt.Errorf("unknown enum value %q for %q: %v", s, "FieldType", err) + } + *v = FieldType(val) + return nil + } +} + +// MarshalText encodes FieldType to text. +// +// If the enum value is recognized, its name is returned. Otherwise, +// its integer value is returned. +// +// This implements the TextMarshaler interface. +func (v FieldType) MarshalText() ([]byte, error) { + switch int32(v) { + case 0: + return []byte("String"), nil + case 1: + return []byte("Int"), nil + case 2: + return []byte("Bool"), nil + } + return []byte(strconv.FormatInt(int64(v), 10)), nil +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of FieldType. +// Enums are logged as objects, where the value is logged with key "value", and +// if this value's name is known, the name is logged with key "name". +func (v FieldType) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddInt32("value", int32(v)) + switch int32(v) { + case 0: + enc.AddString("name", "String") + case 1: + enc.AddString("name", "Int") + case 2: + enc.AddString("name", "Bool") + } + return nil +} + +// Ptr returns a pointer to this enum value. +func (v FieldType) Ptr() *FieldType { + return &v +} + +// ToWire translates FieldType into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// Enums are represented as 32-bit integers over the wire. +func (v FieldType) ToWire() (wire.Value, error) { + return wire.NewValueI32(int32(v)), nil +} + +// FromWire deserializes FieldType from its Thrift-level +// representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TI32) +// if err != nil { +// return FieldType(0), err +// } +// +// var v FieldType +// if err := v.FromWire(x); err != nil { +// return FieldType(0), err +// } +// return v, nil +func (v *FieldType) FromWire(w wire.Value) error { + *v = (FieldType)(w.GetI32()) + return nil +} + +// String returns a readable string representation of FieldType. +func (v FieldType) String() string { + w := int32(v) + switch w { + case 0: + return "String" + case 1: + return "Int" + case 2: + return "Bool" + } + return fmt.Sprintf("FieldType(%d)", w) +} + +// Equals returns true if this FieldType value matches the provided +// value. +func (v FieldType) Equals(rhs FieldType) bool { + return v == rhs +} + +// MarshalJSON serializes FieldType into JSON. +// +// If the enum value is recognized, its name is returned. Otherwise, +// its integer value is returned. +// +// This implements json.Marshaler. +func (v FieldType) MarshalJSON() ([]byte, error) { + switch int32(v) { + case 0: + return ([]byte)("\"String\""), nil + case 1: + return ([]byte)("\"Int\""), nil + case 2: + return ([]byte)("\"Bool\""), nil + } + return ([]byte)(strconv.FormatInt(int64(v), 10)), nil +} + +// UnmarshalJSON attempts to decode FieldType from its JSON +// representation. +// +// This implementation supports both, numeric and string inputs. If a +// string is provided, it must be a known enum name. +// +// This implements json.Unmarshaler. +func (v *FieldType) UnmarshalJSON(text []byte) error { + d := json.NewDecoder(bytes.NewReader(text)) + d.UseNumber() + t, err := d.Token() + if err != nil { + return err + } + + switch w := t.(type) { + case json.Number: + x, err := w.Int64() + if err != nil { + return err + } + if x > math.MaxInt32 { + return fmt.Errorf("enum overflow from JSON %q for %q", text, "FieldType") + } + if x < math.MinInt32 { + return fmt.Errorf("enum underflow from JSON %q for %q", text, "FieldType") + } + *v = (FieldType)(x) + return nil + case string: + return v.UnmarshalText([]byte(w)) + default: + return fmt.Errorf("invalid JSON value %q (%T) to unmarshal into %q", t, t, "FieldType") + } +} + +type IndexAttributes struct { + Fields map[string]*Field `json:"fields,omitempty"` +} + +type _Map_String_Field_MapItemList map[string]*Field + +func (m _Map_String_Field_MapItemList) ForEach(f func(wire.MapItem) error) error { + for k, v := range m { + if v == nil { + return fmt.Errorf("invalid [%v]: value is nil", k) + } + kw, err := wire.NewValueString(k), error(nil) + if err != nil { + return err + } + + vw, err := v.ToWire() + if err != nil { + return err + } + err = f(wire.MapItem{Key: kw, Value: vw}) + if err != nil { + return err + } + } + return nil +} + +func (m _Map_String_Field_MapItemList) Size() int { + return len(m) +} + +func (_Map_String_Field_MapItemList) KeyType() wire.Type { + return wire.TBinary +} + +func (_Map_String_Field_MapItemList) ValueType() wire.Type { + return wire.TStruct +} + +func (_Map_String_Field_MapItemList) Close() {} + +// ToWire translates a IndexAttributes struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *IndexAttributes) ToWire() (wire.Value, error) { + var ( + fields [1]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.Fields != nil { + w, err = wire.NewValueMap(_Map_String_Field_MapItemList(v.Fields)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _Field_Read(w wire.Value) (*Field, error) { + var v Field + err := v.FromWire(w) + return &v, err +} + +func _Map_String_Field_Read(m wire.MapItemList) (map[string]*Field, error) { + if m.KeyType() != wire.TBinary { + return nil, nil + } + + if m.ValueType() != wire.TStruct { + return nil, nil + } + + o := make(map[string]*Field, m.Size()) + err := m.ForEach(func(x wire.MapItem) error { + k, err := x.Key.GetString(), error(nil) + if err != nil { + return err + } + + v, err := _Field_Read(x.Value) + if err != nil { + return err + } + + o[k] = v + return nil + }) + m.Close() + return o, err +} + +// FromWire deserializes a IndexAttributes struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a IndexAttributes struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v IndexAttributes +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *IndexAttributes) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TMap { + v.Fields, err = _Map_String_Field_Read(field.Value.GetMap()) + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a IndexAttributes +// struct. +func (v *IndexAttributes) String() string { + if v == nil { + return "" + } + + var fields [1]string + i := 0 + if v.Fields != nil { + fields[i] = fmt.Sprintf("Fields: %v", v.Fields) + i++ + } + + return fmt.Sprintf("IndexAttributes{%v}", strings.Join(fields[:i], ", ")) +} + +func _Map_String_Field_Equals(lhs, rhs map[string]*Field) bool { + if len(lhs) != len(rhs) { + return false + } + + for lk, lv := range lhs { + rv, ok := rhs[lk] + if !ok { + return false + } + if !lv.Equals(rv) { + return false + } + } + return true +} + +// Equals returns true if all the fields of this IndexAttributes match the +// provided IndexAttributes. +// +// This function performs a deep comparison. +func (v *IndexAttributes) Equals(rhs *IndexAttributes) bool { + if v == nil { + return rhs == nil + } else if rhs == nil { + return false + } + if !((v.Fields == nil && rhs.Fields == nil) || (v.Fields != nil && rhs.Fields != nil && _Map_String_Field_Equals(v.Fields, rhs.Fields))) { + return false + } + + return true +} + +type _Map_String_Field_Zapper map[string]*Field + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of _Map_String_Field_Zapper. +func (m _Map_String_Field_Zapper) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { + for k, v := range m { + err = multierr.Append(err, enc.AddObject((string)(k), v)) + } + return err +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of IndexAttributes. +func (v *IndexAttributes) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { + if v == nil { + return nil + } + if v.Fields != nil { + err = multierr.Append(err, enc.AddObject("fields", (_Map_String_Field_Zapper)(v.Fields))) + } + return err +} + +// GetFields returns the value of Fields if it is set or its +// zero value if it is unset. +func (v *IndexAttributes) GetFields() (o map[string]*Field) { + if v.Fields != nil { + return v.Fields + } + + return +} + +type Message struct { + MessageType *MessageType `json:"messageType,omitempty"` + DomainID *string `json:"domainID,omitempty"` + WorkflowID *string `json:"workflowID,omitempty"` + RunID *string `json:"runID,omitempty"` + Version *int64 `json:"version,omitempty"` + IndexAttributes *IndexAttributes `json:"indexAttributes,omitempty"` +} + +// ToWire translates a Message struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *Message) ToWire() (wire.Value, error) { + var ( + fields [6]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.MessageType != nil { + w, err = v.MessageType.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.DomainID != nil { + w, err = wire.NewValueString(*(v.DomainID)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.WorkflowID != nil { + w, err = wire.NewValueString(*(v.WorkflowID)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + if v.RunID != nil { + w, err = wire.NewValueString(*(v.RunID)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 40, Value: w} + i++ + } + if v.Version != nil { + w, err = wire.NewValueI64(*(v.Version)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 50, Value: w} + i++ + } + if v.IndexAttributes != nil { + w, err = v.IndexAttributes.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 60, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _MessageType_Read(w wire.Value) (MessageType, error) { + var v MessageType + err := v.FromWire(w) + return v, err +} + +func _IndexAttributes_Read(w wire.Value) (*IndexAttributes, error) { + var v IndexAttributes + err := v.FromWire(w) + return &v, err +} + +// FromWire deserializes a Message struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a Message struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v Message +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *Message) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TI32 { + var x MessageType + x, err = _MessageType_Read(field.Value) + v.MessageType = &x + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.DomainID = &x + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.WorkflowID = &x + if err != nil { + return err + } + + } + case 40: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.RunID = &x + if err != nil { + return err + } + + } + case 50: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.Version = &x + if err != nil { + return err + } + + } + case 60: + if field.Value.Type() == wire.TStruct { + v.IndexAttributes, err = _IndexAttributes_Read(field.Value) + if err != nil { + return err + } + + } + } + } + + return nil +} + +// String returns a readable string representation of a Message +// struct. +func (v *Message) String() string { + if v == nil { + return "" + } + + var fields [6]string + i := 0 + if v.MessageType != nil { + fields[i] = fmt.Sprintf("MessageType: %v", *(v.MessageType)) + i++ + } + if v.DomainID != nil { + fields[i] = fmt.Sprintf("DomainID: %v", *(v.DomainID)) + i++ + } + if v.WorkflowID != nil { + fields[i] = fmt.Sprintf("WorkflowID: %v", *(v.WorkflowID)) + i++ + } + if v.RunID != nil { + fields[i] = fmt.Sprintf("RunID: %v", *(v.RunID)) + i++ + } + if v.Version != nil { + fields[i] = fmt.Sprintf("Version: %v", *(v.Version)) + i++ + } + if v.IndexAttributes != nil { + fields[i] = fmt.Sprintf("IndexAttributes: %v", v.IndexAttributes) + i++ + } + + return fmt.Sprintf("Message{%v}", strings.Join(fields[:i], ", ")) +} + +func _MessageType_EqualsPtr(lhs, rhs *MessageType) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return x.Equals(y) + } + return lhs == nil && rhs == nil +} + +// Equals returns true if all the fields of this Message match the +// provided Message. +// +// This function performs a deep comparison. +func (v *Message) Equals(rhs *Message) bool { + if v == nil { + return rhs == nil + } else if rhs == nil { + return false + } + if !_MessageType_EqualsPtr(v.MessageType, rhs.MessageType) { + return false + } + if !_String_EqualsPtr(v.DomainID, rhs.DomainID) { + return false + } + if !_String_EqualsPtr(v.WorkflowID, rhs.WorkflowID) { + return false + } + if !_String_EqualsPtr(v.RunID, rhs.RunID) { + return false + } + if !_I64_EqualsPtr(v.Version, rhs.Version) { + return false + } + if !((v.IndexAttributes == nil && rhs.IndexAttributes == nil) || (v.IndexAttributes != nil && rhs.IndexAttributes != nil && v.IndexAttributes.Equals(rhs.IndexAttributes))) { + return false + } + + return true +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of Message. +func (v *Message) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { + if v == nil { + return nil + } + if v.MessageType != nil { + err = multierr.Append(err, enc.AddObject("messageType", *v.MessageType)) + } + if v.DomainID != nil { + enc.AddString("domainID", *v.DomainID) + } + if v.WorkflowID != nil { + enc.AddString("workflowID", *v.WorkflowID) + } + if v.RunID != nil { + enc.AddString("runID", *v.RunID) + } + if v.Version != nil { + enc.AddInt64("version", *v.Version) + } + if v.IndexAttributes != nil { + err = multierr.Append(err, enc.AddObject("indexAttributes", v.IndexAttributes)) + } + return err +} + +// GetMessageType returns the value of MessageType if it is set or its +// zero value if it is unset. +func (v *Message) GetMessageType() (o MessageType) { + if v.MessageType != nil { + return *v.MessageType + } + + return +} + +// GetDomainID returns the value of DomainID if it is set or its +// zero value if it is unset. +func (v *Message) GetDomainID() (o string) { + if v.DomainID != nil { + return *v.DomainID + } + + return +} + +// GetWorkflowID returns the value of WorkflowID if it is set or its +// zero value if it is unset. +func (v *Message) GetWorkflowID() (o string) { + if v.WorkflowID != nil { + return *v.WorkflowID + } + + return +} + +// GetRunID returns the value of RunID if it is set or its +// zero value if it is unset. +func (v *Message) GetRunID() (o string) { + if v.RunID != nil { + return *v.RunID + } + + return +} + +// GetVersion returns the value of Version if it is set or its +// zero value if it is unset. +func (v *Message) GetVersion() (o int64) { + if v.Version != nil { + return *v.Version + } + + return +} + +// GetIndexAttributes returns the value of IndexAttributes if it is set or its +// zero value if it is unset. +func (v *Message) GetIndexAttributes() (o *IndexAttributes) { + if v.IndexAttributes != nil { + return v.IndexAttributes + } + + return +} + +type MessageType int32 + +const ( + MessageTypeIndex MessageType = 0 + MessageTypeDelete MessageType = 1 +) + +// MessageType_Values returns all recognized values of MessageType. +func MessageType_Values() []MessageType { + return []MessageType{ + MessageTypeIndex, + MessageTypeDelete, + } +} + +// UnmarshalText tries to decode MessageType from a byte slice +// containing its name. +// +// var v MessageType +// err := v.UnmarshalText([]byte("Index")) +func (v *MessageType) UnmarshalText(value []byte) error { + switch s := string(value); s { + case "Index": + *v = MessageTypeIndex + return nil + case "Delete": + *v = MessageTypeDelete + return nil + default: + val, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return fmt.Errorf("unknown enum value %q for %q: %v", s, "MessageType", err) + } + *v = MessageType(val) + return nil + } +} + +// MarshalText encodes MessageType to text. +// +// If the enum value is recognized, its name is returned. Otherwise, +// its integer value is returned. +// +// This implements the TextMarshaler interface. +func (v MessageType) MarshalText() ([]byte, error) { + switch int32(v) { + case 0: + return []byte("Index"), nil + case 1: + return []byte("Delete"), nil + } + return []byte(strconv.FormatInt(int64(v), 10)), nil +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of MessageType. +// Enums are logged as objects, where the value is logged with key "value", and +// if this value's name is known, the name is logged with key "name". +func (v MessageType) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddInt32("value", int32(v)) + switch int32(v) { + case 0: + enc.AddString("name", "Index") + case 1: + enc.AddString("name", "Delete") + } + return nil +} + +// Ptr returns a pointer to this enum value. +func (v MessageType) Ptr() *MessageType { + return &v +} + +// ToWire translates MessageType into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// Enums are represented as 32-bit integers over the wire. +func (v MessageType) ToWire() (wire.Value, error) { + return wire.NewValueI32(int32(v)), nil +} + +// FromWire deserializes MessageType from its Thrift-level +// representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TI32) +// if err != nil { +// return MessageType(0), err +// } +// +// var v MessageType +// if err := v.FromWire(x); err != nil { +// return MessageType(0), err +// } +// return v, nil +func (v *MessageType) FromWire(w wire.Value) error { + *v = (MessageType)(w.GetI32()) + return nil +} + +// String returns a readable string representation of MessageType. +func (v MessageType) String() string { + w := int32(v) + switch w { + case 0: + return "Index" + case 1: + return "Delete" + } + return fmt.Sprintf("MessageType(%d)", w) +} + +// Equals returns true if this MessageType value matches the provided +// value. +func (v MessageType) Equals(rhs MessageType) bool { + return v == rhs +} + +// MarshalJSON serializes MessageType into JSON. +// +// If the enum value is recognized, its name is returned. Otherwise, +// its integer value is returned. +// +// This implements json.Marshaler. +func (v MessageType) MarshalJSON() ([]byte, error) { + switch int32(v) { + case 0: + return ([]byte)("\"Index\""), nil + case 1: + return ([]byte)("\"Delete\""), nil + } + return ([]byte)(strconv.FormatInt(int64(v), 10)), nil +} + +// UnmarshalJSON attempts to decode MessageType from its JSON +// representation. +// +// This implementation supports both, numeric and string inputs. If a +// string is provided, it must be a known enum name. +// +// This implements json.Unmarshaler. +func (v *MessageType) UnmarshalJSON(text []byte) error { + d := json.NewDecoder(bytes.NewReader(text)) + d.UseNumber() + t, err := d.Token() + if err != nil { + return err + } + + switch w := t.(type) { + case json.Number: + x, err := w.Int64() + if err != nil { + return err + } + if x > math.MaxInt32 { + return fmt.Errorf("enum overflow from JSON %q for %q", text, "MessageType") + } + if x < math.MinInt32 { + return fmt.Errorf("enum underflow from JSON %q for %q", text, "MessageType") + } + *v = (MessageType)(x) + return nil + case string: + return v.UnmarshalText([]byte(w)) + default: + return fmt.Errorf("invalid JSON value %q (%T) to unmarshal into %q", t, t, "MessageType") + } +} diff --git a/Gopkg.lock b/Gopkg.lock index ad1494c28c7..7ba9d4e51bf 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -209,6 +209,19 @@ revision = "5c8c8bd35d3832f5d134ae1e1e375b69a4d25242" version = "v1.0.1" +[[projects]] + branch = "master" + digest = "1:212bebc561f4f654a653225868b2a97353cd5e160dc0b0bbc7232b06608474ec" + name = "github.com/mailru/easyjson" + packages = [ + ".", + "buffer", + "jlexer", + "jwriter", + ] + pruneopts = "" + revision = "60711f1a8329503b04e1c88535f419d0bb440bff" + [[projects]] digest = "1:9ea83adf8e96d6304f394d40436f2eb44c1dc3250d223b74088cc253a6cd0a1c" name = "github.com/mattn/go-colorable" @@ -249,6 +262,18 @@ pruneopts = "" revision = "be2c049b30ccd4d3fd795d6bf7dce74e42eeedaa" +[[projects]] + digest = "1:0384ecd644553551aeef4b938f83dd6c1ac3c489da5a04238080af116890ee62" + name = "github.com/olivere/elastic" + packages = [ + ".", + "config", + "uritemplates", + ] + pruneopts = "" + revision = "1619150b007041b6dba8aa447f0e2d151cc2b4c5" + version = "v6.2.14" + [[projects]] digest = "1:78fb99d6011c2ae6c72f3293a83951311147b12b06a5ffa43abf750c4fab6ac5" name = "github.com/opentracing/opentracing-go" @@ -280,6 +305,14 @@ revision = "635575b42742856941dbc767b44905bb9ba083f6" version = "v2.0.7" +[[projects]] + digest = "1:7365acd48986e205ccb8652cc746f09c8b7876030d53710ea6ef7d0bd0dcd7ca" + name = "github.com/pkg/errors" + packages = ["."] + pruneopts = "" + revision = "645ef00459ed84a119197bfb8d8205042c6df63d" + version = "v0.8.0" + [[projects]] digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" name = "github.com/pmezard/go-difflib" @@ -704,6 +737,7 @@ "github.com/iancoleman/strcase", "github.com/jmoiron/sqlx", "github.com/olekukonko/tablewriter", + "github.com/olivere/elastic", "github.com/pborman/uuid", "github.com/robfig/cron", "github.com/sirupsen/logrus", diff --git a/Makefile b/Makefile index d1992ed200f..090cfda9693 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,7 @@ THRIFTRW_SRCS = \ idl/github.com/uber/cadence/history.thrift \ idl/github.com/uber/cadence/matching.thrift \ idl/github.com/uber/cadence/replicator.thrift \ + idl/github.com/uber/cadence/indexer.thrift \ idl/github.com/uber/cadence/shared.thrift \ idl/github.com/uber/cadence/admin.thrift \ diff --git a/cmd/server/server.go b/cmd/server/server.go index e37d896acb0..a0a31a14b37 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/service/matching" "github.com/uber/cadence/service/worker" + "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/messaging" "go.uber.org/zap" ) @@ -133,11 +134,12 @@ func (s *server) startService() common.Daemon { // TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) - enableVisibilityToKafka := dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, dynamicconfig.DefaultEnableVisibilityToKafka) + params.ESConfig = &s.cfg.ElasticSearch + enableVisibilityToKafka := dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, params.ESConfig.Enable)() if params.ClusterMetadata.IsGlobalDomainEnabled() { - params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, true) - } else if enableVisibilityToKafka() { - params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, false) + params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, true, enableVisibilityToKafka) + } else if enableVisibilityToKafka { + params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, false, enableVisibilityToKafka) } else { params.MessagingClient = nil } @@ -149,6 +151,22 @@ func (s *server) startService() common.Daemon { } } + // enable visibility to kafka and enable visibility to elastic search are using one config + if enableVisibilityToKafka { + esFactory := elasticsearch.NewFactory(&s.cfg.ElasticSearch) + esClient, err := esFactory.NewClient() + if err != nil { + log.Fatalf("error creating elastic search client: %v", err) + } + params.ESClient = esClient + + indexName, ok := params.ESConfig.Indices[common.VisibilityAppName] + if !ok || len(indexName) == 0 { + log.Fatalf("elastic search config missing visibility index") + } + params.ESConfig.Enable = enableVisibilityToKafka // force to use dynamic config + } + params.Logger.Info("Starting service " + s.name) var daemon common.Daemon diff --git a/common/constants.go b/common/constants.go index c99da217cd7..009d86c9706 100644 --- a/common/constants.go +++ b/common/constants.go @@ -73,3 +73,8 @@ const ( // GetHistoryMaxPageSize is the max page size for get history GetHistoryMaxPageSize = 1000 ) + +const ( + // VisibilityAppName is used to find kafka topics and ES indexName for visibility + VisibilityAppName = "visibility" +) diff --git a/common/messaging/message.go b/common/elasticsearch/config.go similarity index 82% rename from common/messaging/message.go rename to common/elasticsearch/config.go index 2bd803f85fe..a104ca04bc6 100644 --- a/common/messaging/message.go +++ b/common/elasticsearch/config.go @@ -18,12 +18,17 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package messaging +package elasticsearch -// OpenWorkflowMsg is visibility data for open workflow -type OpenWorkflowMsg struct { - Domain string - WorkflowID string - RunID string - StartTime int64 -} +import ( + "net/url" +) + +// Config for connecting to ElasticSearch +type ( + Config struct { + Enable bool `yaml:enable` + URL url.URL `yaml:url` + Indices map[string]string `yaml:indices` + } +) diff --git a/common/elasticsearch/defs.go b/common/elasticsearch/defs.go new file mode 100644 index 00000000000..10570e0fa11 --- /dev/null +++ b/common/elasticsearch/defs.go @@ -0,0 +1,64 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package elasticsearch + +import "github.com/uber/cadence/.gen/go/indexer" + +// All legal fields allowed in elastic search index +const ( + DomainID = "DomainID" + WorkflowID = "WorkflowID" + RunID = "RunID" + WorkflowType = "WorkflowType" + StartTime = "StartTime" + CloseTime = "CloseTime" + CloseStatus = "CloseStatus" + HistoryLength = "HistoryLength" + + KafkaKey = "KafkaKey" +) + +// Supported field types +var ( + FieldTypeString = indexer.FieldTypeString + FieldTypeInt = indexer.FieldTypeInt + FieldTypeBool = indexer.FieldTypeBool +) + +var ( + validFieldName = map[string]interface{}{ + DomainID: struct{}{}, + WorkflowID: struct{}{}, + RunID: struct{}{}, + WorkflowType: struct{}{}, + StartTime: struct{}{}, + CloseTime: struct{}{}, + CloseStatus: struct{}{}, + HistoryLength: struct{}{}, + KafkaKey: struct{}{}, + } +) + +// IsFieldNameValid return true if given field name are allowed to index in elastic search +func IsFieldNameValid(name string) bool { + _, ok := validFieldName[name] + return ok +} diff --git a/common/elasticsearch/factory.go b/common/elasticsearch/factory.go new file mode 100644 index 00000000000..796f7419704 --- /dev/null +++ b/common/elasticsearch/factory.go @@ -0,0 +1,45 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package elasticsearch + +import "github.com/olivere/elastic" + +// Factory is interface to create ElasticSearch client +type Factory interface { + NewClient() (*elastic.Client, error) +} + +type clientFactory struct { + Config *Config +} + +var _ Factory = (*clientFactory)(nil) + +// NewFactory create a new ElasticSearch client +func NewFactory(config *Config) Factory { + return &clientFactory{Config: config} +} + +func (f *clientFactory) NewClient() (*elastic.Client, error) { + return elastic.NewClient( + elastic.SetURL(f.Config.URL.String()), + ) +} diff --git a/common/logging/events.go b/common/logging/events.go index 1344891d58e..2a5579ca33d 100644 --- a/common/logging/events.go +++ b/common/logging/events.go @@ -88,6 +88,12 @@ const ( ReplicationTaskProcessorShutdown = 7104 ReplicationTaskProcessorShutdownTimedout = 7105 ReplicationTaskProcessingFailed = 7106 + IndexProcessorStarting = 7200 + IndexProcessorStarted = 7201 + IndexProcessorStartFailed = 7202 + IndexProcessorShuttingDown = 7203 + IndexProcessorShutDown = 7204 + IndexProcessorShuttingDownTimedout = 7205 // General purpose events OperationFailed = 9000 diff --git a/common/logging/helpers.go b/common/logging/helpers.go index 389110aeef7..a659b98399d 100644 --- a/common/logging/helpers.go +++ b/common/logging/helpers.go @@ -488,3 +488,45 @@ func LogListClosedWorkflowByFilter(logger bark.Logger, domain, filter string) { "FilterType": filter, }).Info("List closed workflow with filter") } + +// LogIndexProcessorStartingEvent is used to log index processor starting +func LogIndexProcessorStartingEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: IndexProcessorStarting, + }).Info("Index processor starting.") +} + +// LogIndexProcessorStartedEvent is used to log index processor starting +func LogIndexProcessorStartedEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: IndexProcessorStarted, + }).Info("Index processor started.") +} + +// LogIndexProcessorStartFailedEvent is used to log index processor started +func LogIndexProcessorStartFailedEvent(logger bark.Logger, err error) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: IndexProcessorStartFailed, + }).WithError(err).Warn("Index processor failed to start.") +} + +// LogIndexProcessorShuttingDownEvent is used to log index processing shutting down +func LogIndexProcessorShuttingDownEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: IndexProcessorShuttingDown, + }).Info("Index processor shutting down.") +} + +// LogIndexProcessorShutDownEvent is used to log index processing shutting down +func LogIndexProcessorShutDownEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: IndexProcessorShutDown, + }).Info("Index processor shut down.") +} + +// LogIndexProcessorShutDownTimedoutEvent is used to log index processing shutting down +func LogIndexProcessorShutDownTimedoutEvent(logger bark.Logger) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: IndexProcessorShuttingDownTimedout, + }).Info("Index processor shut down timedout.") +} diff --git a/common/logging/tags.go b/common/logging/tags.go index 4dee28e4444..fd39a5dc81f 100644 --- a/common/logging/tags.go +++ b/common/logging/tags.go @@ -79,6 +79,9 @@ const ( TagCursorTimestamp = "cursor-timestamp" TagHistorySize = "history-size" TagEventCount = "event-count" + TagESRequest = "es-request" + TagESKey = "es-mapping-key" + TagESField = "es-field" // workflow logging tag values // TagWorkflowComponent Values @@ -93,6 +96,9 @@ const ( TagValueReplicatorComponent = "replicator" TagValueReplicationTaskProcessorComponent = "replication-task-processor" TagValueHistoryReplicatorComponent = "history-replicator" + TagValueIndexerComponent = "indexer" + TagValueIndexerProcessorComponent = "indexer-processor" + TagValueIndexerESProcessorComponent = "indexer-es-processor" // TagHistoryBuilderAction values TagValueActionWorkflowStarted = "add-workflowexecution-started-event" diff --git a/common/messaging/interface.go b/common/messaging/interface.go index 3f011ca0986..4d142b1bfe7 100644 --- a/common/messaging/interface.go +++ b/common/messaging/interface.go @@ -23,8 +23,9 @@ package messaging type ( // Client is the interface used to abstract out interaction with messaging system for replication Client interface { - NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error) - NewProducer(topic string) (Producer, error) + NewConsumer(appName, consumerName string, concurrency int) (Consumer, error) + NewConsumerWithClusterName(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error) + NewProducer(appName string) (Producer, error) NewProducerWithClusterName(sourceCluster string) (Producer, error) } diff --git a/common/messaging/kafkaClient.go b/common/messaging/kafkaClient.go index 61dcdb6d7d0..c26cd5ceb6e 100644 --- a/common/messaging/kafkaClient.go +++ b/common/messaging/kafkaClient.go @@ -45,8 +45,9 @@ type ( var _ Client = (*kafkaClient)(nil) // NewKafkaClient is used to create an instance of KafkaClient -func NewKafkaClient(kc *KafkaConfig, metricsClient metrics.Client, zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope, checkCluster bool) Client { - kc.Validate(checkCluster) +func NewKafkaClient(kc *KafkaConfig, metricsClient metrics.Client, zLogger *zap.Logger, logger bark.Logger, metricScope tally.Scope, + checkCluster, checkApp bool) Client { + kc.Validate(checkCluster, checkApp) // mapping from cluster name to list of broker ip addresses brokers := map[string][]string{} @@ -76,7 +77,36 @@ func NewKafkaClient(kc *KafkaConfig, metricsClient metrics.Client, zLogger *zap. } // NewConsumer is used to create a Kafka consumer -func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error) { +func (c *kafkaClient) NewConsumer(app, consumerName string, concurrency int) (Consumer, error) { + topics := c.config.getTopicsForApplication(app) + kafkaClusterNameForTopic := c.config.getKafkaClusterForTopic(topics.Topic) + kafkaClusterNameForDLQTopic := c.config.getKafkaClusterForTopic(topics.DLQTopic) + topicList := uberKafka.ConsumerTopicList{ + uberKafka.ConsumerTopic{ + Topic: uberKafka.Topic{ + Name: topics.Topic, + Cluster: kafkaClusterNameForTopic, + }, + DLQ: uberKafka.Topic{ + Name: topics.DLQTopic, + Cluster: kafkaClusterNameForDLQTopic, + }, + }, + } + + consumerConfig := uberKafka.NewConsumerConfig(consumerName, topicList) + consumerConfig.Concurrency = concurrency + consumerConfig.Offsets.Initial.Offset = uberKafka.OffsetOldest + + uConsumer, err := c.client.NewConsumer(consumerConfig) + if err != nil { + return nil, err + } + return newKafkaConsumer(uConsumer, c.logger), nil +} + +// NewConsumerWithClusterName is used to create a Kafka consumer for consuming replication tasks +func (c *kafkaClient) NewConsumerWithClusterName(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error) { currentTopics := c.config.getTopicsForCadenceCluster(currentCluster) sourceTopics := c.config.getTopicsForCadenceCluster(sourceCluster) @@ -108,8 +138,9 @@ func (c *kafkaClient) NewConsumer(currentCluster, sourceCluster, consumerName st } // NewProducer is used to create a Kafka producer -func (c *kafkaClient) NewProducer(topic string) (Producer, error) { - kafkaClusterName := c.config.getKafkaClusterForTopic(topic) +func (c *kafkaClient) NewProducer(app string) (Producer, error) { + topics := c.config.getTopicsForApplication(app) + kafkaClusterName := c.config.getKafkaClusterForTopic(topics.Topic) brokers := c.config.getBrokersForKafkaCluster(kafkaClusterName) producer, err := sarama.NewSyncProducer(brokers, nil) @@ -118,9 +149,9 @@ func (c *kafkaClient) NewProducer(topic string) (Producer, error) { } if c.metricsClient != nil { - return NewMetricProducer(NewKafkaProducer(topic, producer, c.logger), c.metricsClient), nil + return NewMetricProducer(NewKafkaProducer(topics.Topic, producer, c.logger), c.metricsClient), nil } - return NewKafkaProducer(topic, producer, c.logger), nil + return NewKafkaProducer(topics.Topic, producer, c.logger), nil } // NewProducerWithClusterName is used to create a Kafka producer for shipping replication tasks diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go index 2182229ca46..1c9466cf696 100644 --- a/common/messaging/kafkaConfig.go +++ b/common/messaging/kafkaConfig.go @@ -30,6 +30,7 @@ type ( Clusters map[string]ClusterConfig `yaml:"clusters"` Topics map[string]TopicConfig `yaml:"topics"` ClusterToTopic map[string]TopicList `yaml:"cadence-cluster-topics"` + Applications map[string]TopicList `yaml:"applications"` } // ClusterConfig describes the configuration for a single Kafka cluster @@ -50,11 +51,8 @@ type ( } ) -// VisibilityTopicName for visibility data to kafka -const VisibilityTopicName = "visibility-topic" - // Validate will validate config for kafka -func (k *KafkaConfig) Validate(checkCluster bool) { +func (k *KafkaConfig) Validate(checkCluster bool, checkApp bool) { if len(k.Clusters) == 0 { panic("Empty Kafka Cluster Config") } @@ -83,8 +81,15 @@ func (k *KafkaConfig) Validate(checkCluster bool) { validateTopicsFn(topics.RetryTopic) validateTopicsFn(topics.DLQTopic) } - } else { - validateTopicsFn(VisibilityTopicName) + } + if checkApp { + if len(k.Applications) == 0 { + panic("Empty Applications Config") + } + for _, topics := range k.Applications { + validateTopicsFn(topics.Topic) + validateTopicsFn(topics.DLQTopic) + } } } @@ -99,3 +104,7 @@ func (k *KafkaConfig) getKafkaClusterForTopic(topic string) string { func (k *KafkaConfig) getBrokersForKafkaCluster(kafkaCluster string) []string { return k.Clusters[kafkaCluster].Brokers } + +func (k *KafkaConfig) getTopicsForApplication(app string) TopicList { + return k.Applications[app] +} diff --git a/common/messaging/kafkaProducer.go b/common/messaging/kafkaProducer.go index bbd9cb27214..bef9d5be77a 100644 --- a/common/messaging/kafkaProducer.go +++ b/common/messaging/kafkaProducer.go @@ -24,6 +24,7 @@ import ( "errors" "github.com/Shopify/sarama" "github.com/uber-common/bark" + "github.com/uber/cadence/.gen/go/indexer" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/common/codec" "github.com/uber/cadence/common/codec/gob" @@ -103,12 +104,12 @@ func (p *kafkaProducer) Close() error { return p.producer.Close() } -func (p *kafkaProducer) serializeTask(task *replicator.ReplicationTask) ([]byte, error) { - payload, err := p.msgEncoder.Encode(task) +func (p *kafkaProducer) serializeThrift(input codec.ThriftObject) ([]byte, error) { + payload, err := p.msgEncoder.Encode(input) if err != nil { p.logger.WithFields(bark.Fields{ logging.TagErr: err, - }).Error("Failed to serialize replication task") + }).Error("Failed to serialize thrift object") return nil, err } @@ -116,7 +117,7 @@ func (p *kafkaProducer) serializeTask(task *replicator.ReplicationTask) ([]byte, return payload, nil } -func (p *kafkaProducer) getKey(task *replicator.ReplicationTask) sarama.Encoder { +func (p *kafkaProducer) getKeyForReplicationTask(task *replicator.ReplicationTask) sarama.Encoder { if task == nil { return nil } @@ -143,26 +144,26 @@ func (p *kafkaProducer) getProducerMessage(message interface{}) (*sarama.Produce switch message.(type) { case *replicator.ReplicationTask: task := message.(*replicator.ReplicationTask) - payload, err := p.serializeTask(task) + payload, err := p.serializeThrift(task) if err != nil { return nil, err } - partitionKey := p.getKey(task) + partitionKey := p.getKeyForReplicationTask(task) msg := &sarama.ProducerMessage{ Topic: p.topic, Key: partitionKey, Value: sarama.ByteEncoder(payload), } return msg, nil - case *OpenWorkflowMsg: - openRecord := message.(*OpenWorkflowMsg) - payload, err := p.gobEncoder.Encode(openRecord) + case *indexer.Message: + indexMsg := message.(*indexer.Message) + payload, err := p.serializeThrift(indexMsg) if err != nil { return nil, err } msg := &sarama.ProducerMessage{ - Topic: VisibilityTopicName, - Key: sarama.StringEncoder(openRecord.WorkflowID), + Topic: p.topic, + Key: sarama.StringEncoder(indexMsg.GetWorkflowID()), Value: sarama.ByteEncoder(payload), } return msg, nil diff --git a/common/messaging/mocks/Message.go b/common/messaging/mocks/Message.go new file mode 100644 index 00000000000..8152d518c99 --- /dev/null +++ b/common/messaging/mocks/Message.go @@ -0,0 +1,100 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Message is an autogenerated mock type for the Message type +type Message struct { + mock.Mock +} + +// Ack provides a mock function with given fields: +func (_m *Message) Ack() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Nack provides a mock function with given fields: +func (_m *Message) Nack() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Offset provides a mock function with given fields: +func (_m *Message) Offset() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// Partition provides a mock function with given fields: +func (_m *Message) Partition() int32 { + ret := _m.Called() + + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int32) + } + + return r0 +} + +// Value provides a mock function with given fields: +func (_m *Message) Value() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 1b3cedb7c38..6263b8273a6 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -618,6 +618,10 @@ const ( SyncShardTaskScope // SyncActivityTaskScope is the scope used by sync activity information processing SyncActivityTaskScope + // ESProcessorScope is scope used by all metric emitted by esProcessor + ESProcessorScope + // IndexProcessorScope is scope used by all metric emitted by index processor + IndexProcessorScope NumWorkerScopes ) @@ -875,6 +879,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryReplicationTaskScope: {operation: "HistoryReplicationTask"}, SyncShardTaskScope: {operation: "SyncShardTask"}, SyncActivityTaskScope: {operation: "SyncActivityTask"}, + ESProcessorScope: {operation: "ESProcessor"}, + IndexProcessorScope: {operation: "IndexProcessor"}, }, } @@ -1055,6 +1061,9 @@ const ( ReplicatorMessages = iota + NumCommonMetrics ReplicatorFailures ReplicatorLatency + ESProcessorFailures + ESProcessorCorruptedData + IndexProcessorCorruptedData NumWorkerMetrics ) @@ -1219,9 +1228,12 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ SyncMatchLatency: {metricName: "syncmatch.latency", metricType: Timer}, }, Worker: { - ReplicatorMessages: {metricName: "replicator.messages"}, - ReplicatorFailures: {metricName: "replicator.errors"}, - ReplicatorLatency: {metricName: "replicator.latency"}, + ReplicatorMessages: {metricName: "replicator.messages"}, + ReplicatorFailures: {metricName: "replicator.errors"}, + ReplicatorLatency: {metricName: "replicator.latency"}, + ESProcessorFailures: {metricName: "es-processor.errors"}, + ESProcessorCorruptedData: {metricName: "es-processor.corrupted-data"}, + IndexProcessorCorruptedData: {metricName: "index-processor.corrupted-data"}, }, } diff --git a/common/mocks/MessagingClient.go b/common/mocks/MessagingClient.go index d106b9da236..edb12cc3fbe 100644 --- a/common/mocks/MessagingClient.go +++ b/common/mocks/MessagingClient.go @@ -43,12 +43,17 @@ func NewMockMessagingClient(publisher messaging.Producer, consumer messaging.Con } // NewConsumer generates a dummy implementation of kafka consumer -func (c *MessagingClient) NewConsumer(currentCluster, sourceCluster, consumerName string, concurrency int) (messaging.Consumer, error) { +func (c *MessagingClient) NewConsumer(appName, consumerName string, concurrency int) (messaging.Consumer, error) { + return c.consumerMock, nil +} + +// NewConsumerWithClusterName generates a dummy implementation of kafka consumer +func (c *MessagingClient) NewConsumerWithClusterName(currentCluster, sourceCluster, consumerName string, concurrency int) (messaging.Consumer, error) { return c.consumerMock, nil } // NewProducer generates a dummy implementation of kafka producer -func (c *MessagingClient) NewProducer(topic string) (messaging.Producer, error) { +func (c *MessagingClient) NewProducer(appName string) (messaging.Producer, error) { return c.publisherMock, nil } diff --git a/common/service/config/config.go b/common/service/config/config.go index dfb8d614981..69edac1919b 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -26,6 +26,7 @@ import ( "time" "github.com/uber-go/tally/m3" + "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/service/dynamicconfig" "github.com/uber/ringpop-go/discovery" @@ -48,6 +49,8 @@ type ( Kafka messaging.KafkaConfig `yaml:"kafka"` // Archival is the config for archival Archival Archival `yaml:"archival"` + // ElasticSearch if config for connecting to ElasticSearch + ElasticSearch elasticsearch.Config `yaml:elasticsearch` } // Service contains the service specific config items diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index e88ca0ffe5f..a8269a16fb0 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -155,6 +155,11 @@ var keys = map[Key]string{ WorkerReplicatorConcurrency: "worker.replicatorConcurrency", WorkerReplicatorBufferRetryCount: "worker.replicatorBufferRetryCount", WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry", + WorkerIndexerConcurrency: "worker.indexerConcurrency", + WorkerESProcessorNumOfWorkers: "worker.ESProcessorNumOfWorkers", + WorkerESProcessorBulkActions: "worker.ESProcessorBulkActions", + WorkerESProcessorBulkSize: "worker.ESProcessorBulkSize", + WorkerESProcessorFlushInterval: "worker.ESProcessorFlushInterval", } const ( @@ -375,16 +380,26 @@ const ( // EnableEventsV2 is whether to use eventsV2 EnableEventsV2 - // key for history worker + // key for worker // WorkerPersistenceMaxQPS is the max qps worker host can query DB WorkerPersistenceMaxQPS - // WorkerReplicatorConcurrency is the max concurrenct tasks to be processed at any given time + // WorkerReplicatorConcurrency is the max concurrent tasks to be processed at any given time WorkerReplicatorConcurrency // WorkerReplicatorBufferRetryCount is the retry attempt when encounter retry error WorkerReplicatorBufferRetryCount // WorkerReplicationTaskMaxRetry is the max retry for any task WorkerReplicationTaskMaxRetry + // WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time + WorkerIndexerConcurrency + // WorkerESProcessorNumOfWorkers is num of workers for esProcessor + WorkerESProcessorNumOfWorkers + // WorkerESProcessorBulkActions is max number of requests in bulk for esProcessor + WorkerESProcessorBulkActions + // WorkerESProcessorBulkSize is max total size of bulk in bytes for esProcessor + WorkerESProcessorBulkSize + // WorkerESProcessorFlushInterval is flush interval for esProcessor + WorkerESProcessorFlushInterval // lastKeyForTest must be the last one in this const group for testing purpose lastKeyForTest @@ -443,6 +458,3 @@ func TaskTypeFilter(taskType int) FilterOption { filterMap[TaskType] = taskType } } - -// DefaultEnableVisibilityToKafka default value for config EnableVisibilityToKafka -const DefaultEnableVisibilityToKafka = false diff --git a/common/service/service.go b/common/service/service.go index 2cd0d47594a..567e0ef830a 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -38,8 +38,10 @@ import ( "github.com/uber/cadence/common/service/config" "github.com/uber/cadence/common/service/dynamicconfig" + "github.com/olivere/elastic" "github.com/uber-common/bark" "github.com/uber-go/tally" + "github.com/uber/cadence/common/elasticsearch" ringpop "github.com/uber/ringpop-go" "go.uber.org/yarpc" ) @@ -66,6 +68,8 @@ type ( ReplicatorConfig config.Replicator MetricsClient metrics.Client MessagingClient messaging.Client + ESClient *elastic.Client + ESConfig *elasticsearch.Config DynamicConfig dynamicconfig.Client DispatcherProvider client.DispatcherProvider BlobstoreClient blobstore.Client diff --git a/config/development.yaml b/config/development.yaml index 8352a2c0065..d6d535ff0a1 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -99,5 +99,20 @@ kafka: brokers: - 127.0.0.1:9092 topics: - visibility-topic: + cadence-visibility: cluster: test + cadence-visibility-dlq: + cluster: test + applications: + visibility: + topic: cadence-visibility + dlq-topic: cadence-visibility-dlq + +elasticsearch: + enable: false + url: + scheme: "http" + host: "127.0.0.1:9200" + indices: + visibility: visibility-dev + diff --git a/host/client_integration_test.go b/host/client_integration_test.go index afb6709104d..ec36fe1b43d 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -188,7 +188,7 @@ func (s *clientIntegrationSuite) setupSuite(enableGlobalDomain bool, isMasterClu s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) s.host = NewCadence(s.ClusterMetadata, server.NewIPYarpcDispatcherProvider(), s.mockMessagingClient, s.MetadataProxy, s.MetadataManagerV2, s.ShardMgr, s.HistoryMgr, s.HistoryV2Mgr, s.ExecutionMgrFactory, s.TaskMgr, - s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false, s.enableEventsV2) + s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false, s.enableEventsV2, false) s.host.Start() s.engine = s.host.GetFrontendClient() diff --git a/host/integration_cross_dc_domain_test.go b/host/integration_cross_dc_domain_test.go index 2e973fbf7b8..cfdea66d683 100644 --- a/host/integration_cross_dc_domain_test.go +++ b/host/integration_cross_dc_domain_test.go @@ -114,7 +114,7 @@ func (s *integrationCrossDCSuite) setupTest(enableGlobalDomain bool, isMasterClu s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) s.host = NewCadence(s.ClusterMetadata, client.NewIPYarpcDispatcherProvider(), s.mockMessagingClient, s.MetadataProxy, s.MetadataManagerV2, s.ShardMgr, s.HistoryMgr, s.HistoryV2Mgr, s.ExecutionMgrFactory, s.TaskMgr, - s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false, s.enableEventsV2) + s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false, s.enableEventsV2, false) s.host.Start() diff --git a/host/integration_test.go b/host/integration_test.go index 97bac13ddf9..f47b0bfb770 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -137,7 +137,7 @@ func (s *integrationSuite) setupSuite(enableGlobalDomain bool, isMasterCluster b s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) s.host = NewCadence(s.ClusterMetadata, client.NewIPYarpcDispatcherProvider(), s.mockMessagingClient, s.MetadataProxy, s.MetadataManagerV2, s.ShardMgr, s.HistoryMgr, s.HistoryV2Mgr, s.ExecutionMgrFactory, s.TaskMgr, - s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false, s.enableEventsV2) + s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false, s.enableEventsV2, false) s.host.Start() s.engine = s.host.GetFrontendClient() diff --git a/host/onebox.go b/host/onebox.go index 999bb2ce3bc..ff81fe78080 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -84,31 +84,32 @@ type Cadence interface { type ( cadenceImpl struct { - adminHandler *frontend.AdminHandler - frontendHandler *frontend.WorkflowHandler - matchingHandler *matching.Handler - historyHandlers []*history.Handler - numberOfHistoryShards int - numberOfHistoryHosts int - logger bark.Logger - clusterMetadata cluster.Metadata - dispatcherProvider client.DispatcherProvider - messagingClient messaging.Client - metadataMgr persistence.MetadataManager - metadataMgrV2 persistence.MetadataManager - shardMgr persistence.ShardManager - historyMgr persistence.HistoryManager - historyV2Mgr persistence.HistoryV2Manager - taskMgr persistence.TaskManager - visibilityMgr persistence.VisibilityManager - executionMgrFactory persistence.ExecutionManagerFactory - shutdownCh chan struct{} - shutdownWG sync.WaitGroup - frontEndService service.Service - clusterNo int // cluster number - replicator *replicator.Replicator - enableWorkerService bool // tmp flag used to tell if onbox should create worker service - enableEventsV2 bool + adminHandler *frontend.AdminHandler + frontendHandler *frontend.WorkflowHandler + matchingHandler *matching.Handler + historyHandlers []*history.Handler + numberOfHistoryShards int + numberOfHistoryHosts int + logger bark.Logger + clusterMetadata cluster.Metadata + dispatcherProvider client.DispatcherProvider + messagingClient messaging.Client + metadataMgr persistence.MetadataManager + metadataMgrV2 persistence.MetadataManager + shardMgr persistence.ShardManager + historyMgr persistence.HistoryManager + historyV2Mgr persistence.HistoryV2Manager + taskMgr persistence.TaskManager + visibilityMgr persistence.VisibilityManager + executionMgrFactory persistence.ExecutionManagerFactory + shutdownCh chan struct{} + shutdownWG sync.WaitGroup + frontEndService service.Service + clusterNo int // cluster number + replicator *replicator.Replicator + enableWorkerService bool // tmp flag used to tell if onebox should create worker service + enableEventsV2 bool + enableVisibilityToKafka bool } ringpopFactoryImpl struct { @@ -121,27 +122,28 @@ func NewCadence(clusterMetadata cluster.Metadata, dispatcherProvider client.Disp metadataMgrV2 persistence.MetadataManager, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, historyV2Mgr persistence.HistoryV2Manager, executionMgrFactory persistence.ExecutionManagerFactory, taskMgr persistence.TaskManager, visibilityMgr persistence.VisibilityManager, numberOfHistoryShards, numberOfHistoryHosts int, - logger bark.Logger, clusterNo int, enableWorker, enableEventsV2 bool) Cadence { + logger bark.Logger, clusterNo int, enableWorker, enableEventsV2, enableVisibilityToKafka bool) Cadence { return &cadenceImpl{ - numberOfHistoryShards: numberOfHistoryShards, - numberOfHistoryHosts: numberOfHistoryHosts, - logger: logger, - clusterMetadata: clusterMetadata, - dispatcherProvider: dispatcherProvider, - messagingClient: messagingClient, - metadataMgr: metadataMgr, - metadataMgrV2: metadataMgrV2, - visibilityMgr: visibilityMgr, - shardMgr: shardMgr, - historyMgr: historyMgr, - historyV2Mgr: historyV2Mgr, - taskMgr: taskMgr, - executionMgrFactory: executionMgrFactory, - shutdownCh: make(chan struct{}), - clusterNo: clusterNo, - enableWorkerService: enableWorker, - enableEventsV2: enableEventsV2, + numberOfHistoryShards: numberOfHistoryShards, + numberOfHistoryHosts: numberOfHistoryHosts, + logger: logger, + clusterMetadata: clusterMetadata, + dispatcherProvider: dispatcherProvider, + messagingClient: messagingClient, + metadataMgr: metadataMgr, + metadataMgrV2: metadataMgrV2, + visibilityMgr: visibilityMgr, + shardMgr: shardMgr, + historyMgr: historyMgr, + historyV2Mgr: historyV2Mgr, + taskMgr: taskMgr, + executionMgrFactory: executionMgrFactory, + shutdownCh: make(chan struct{}), + clusterNo: clusterNo, + enableWorkerService: enableWorker, + enableEventsV2: enableEventsV2, + enableVisibilityToKafka: enableVisibilityToKafka, } } @@ -356,7 +358,7 @@ func (c *cadenceImpl) startHistory(rpHosts []string, startWG *sync.WaitGroup, en } params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) service := service.New(params) - historyConfig := history.NewConfig(dynamicconfig.NewNopCollection(), c.numberOfHistoryShards) + historyConfig := history.NewConfig(dynamicconfig.NewNopCollection(), c.numberOfHistoryShards, c.enableVisibilityToKafka) historyConfig.HistoryMgrNumConns = dynamicconfig.GetIntPropertyFn(c.numberOfHistoryShards) historyConfig.ExecutionMgrNumConns = dynamicconfig.GetIntPropertyFn(c.numberOfHistoryShards) historyConfig.EnableEventsV2 = dynamicconfig.GetBoolPropertyFnFilteredByDomain(enableEventsV2) diff --git a/hostxdc/Integration_domain_failover_test.go b/hostxdc/Integration_domain_failover_test.go index e0bb287a08f..dd58e69a8ec 100644 --- a/hostxdc/Integration_domain_failover_test.go +++ b/hostxdc/Integration_domain_failover_test.go @@ -149,7 +149,7 @@ func (s *testCluster) setupCluster(no int, enableEventsV2 bool) { messagingClient := s.createMessagingClient() testNumberOfHistoryShards := 1 // use 1 shard so we can be sure when failover completed in standby cluster s.host = host.NewCadence(s.ClusterMetadata, client.NewIPYarpcDispatcherProvider(), messagingClient, s.MetadataProxy, s.MetadataManagerV2, s.ShardMgr, s.HistoryMgr, s.HistoryV2Mgr, s.ExecutionMgrFactory, s.TaskMgr, - s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, no, true, enableEventsV2) + s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, no, true, enableEventsV2, false) s.host.Start() } @@ -191,7 +191,7 @@ func (s *testCluster) createMessagingClient() messaging.Client { Topics: topics, ClusterToTopic: clusterToTopic, } - return messaging.NewKafkaClient(&kafkaConfig, nil, zap.NewNop(), s.logger, tally.NoopScope, true) + return messaging.NewKafkaClient(&kafkaConfig, nil, zap.NewNop(), s.logger, tally.NoopScope, true, false) } func getTopicList(topicName string) messaging.TopicList { diff --git a/idl/github.com/uber/cadence/indexer.thrift b/idl/github.com/uber/cadence/indexer.thrift new file mode 100644 index 00000000000..a456f4a0ac7 --- /dev/null +++ b/idl/github.com/uber/cadence/indexer.thrift @@ -0,0 +1,54 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +namespace java com.uber.cadence.indexer + +include "shared.thrift" + +enum MessageType { + Index + Delete +} + +enum FieldType { + String + Int + Bool +} + +struct Field { + 10: optional FieldType type + 20: optional string stringData + 30: optional i64 (js.type = "Long") intData + 40: optional bool boolData +} + +struct IndexAttributes { + 10: optional map fields +} + +struct Message { + 10: optional MessageType messageType + 20: optional string domainID + 30: optional string workflowID + 40: optional string runID + 50: optional i64 (js.type = "Long") version + 60: optional IndexAttributes indexAttributes +} \ No newline at end of file diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index be1a131aa6e..c5435214d90 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -169,7 +169,7 @@ func NewEngineWithShardContext( } var visibilityProducer messaging.Producer if config.EnableVisibilityToKafka() { - visibilityProducer = getVisibilityProducer(messagingClient) + visibilityProducer = getVisibilityProducer(messagingClient, shard.GetMetricsClient()) } txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, visibilityProducer, matching, historyClient, logger) historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, matching, logger) @@ -3314,13 +3314,16 @@ func getWorkflowAlreadyStartedError(errMsg string, createRequestID string, workf } } -func getVisibilityProducer(messagingClient messaging.Client) messaging.Producer { +func getVisibilityProducer(messagingClient messaging.Client, metricsClient metrics.Client) messaging.Producer { if messagingClient == nil { return nil } - visibilityProducer, err := messagingClient.NewProducer(messaging.VisibilityTopicName) + visibilityProducer, err := messagingClient.NewProducer(common.VisibilityAppName) if err != nil { panic(err) } + if metricsClient != nil { + visibilityProducer = messaging.NewMetricProducer(visibilityProducer, metricsClient) + } return visibilityProducer } diff --git a/service/history/historyTestBase.go b/service/history/historyTestBase.go index 7720402732f..7ea4745ccec 100644 --- a/service/history/historyTestBase.go +++ b/service/history/historyTestBase.go @@ -475,14 +475,14 @@ func (s *TestShardContext) GetCurrentTime(cluster string) time.Time { // NewDynamicConfigForTest return dc for test func NewDynamicConfigForTest() *Config { dc := dynamicconfig.NewNopCollection() - config := NewConfig(dc, 1) + config := NewConfig(dc, 1, false) return config } // NewDynamicConfigForEventsV2Test with enableEventsV2 = true func NewDynamicConfigForEventsV2Test() *Config { dc := dynamicconfig.NewNopCollection() - config := NewConfig(dc, 1) + config := NewConfig(dc, 1, false) config.EnableEventsV2 = dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true) return config } diff --git a/service/history/service.go b/service/history/service.go index cdb91757459..c7ca553bd91 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -23,6 +23,7 @@ package history import ( "time" + "fmt" "github.com/uber/cadence/common" "github.com/uber/cadence/common/metrics" persistencefactory "github.com/uber/cadence/common/persistence/persistence-factory" @@ -133,7 +134,7 @@ type Config struct { } // NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { +func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilityToKafka bool) *Config { return &Config{ NumberOfShards: numberOfShards, EnableSyncActivityHeartbeat: dc.GetBoolProperty(dynamicconfig.EnableSyncActivityHeartbeat, false), @@ -144,7 +145,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { EnableVisibilitySampling: dc.GetBoolProperty(dynamicconfig.EnableVisibilitySampling, true), VisibilityOpenMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryVisibilityOpenMaxQPS, 300), VisibilityClosedMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryVisibilityClosedMaxQPS, 300), - EnableVisibilityToKafka: dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, dynamicconfig.DefaultEnableVisibilityToKafka), + EnableVisibilityToKafka: dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, enableVisibilityToKafka), HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), HistoryCacheMaxSize: dc.GetIntProperty(dynamicconfig.HistoryCacheMaxSize, 512), HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour), @@ -227,12 +228,14 @@ type Service struct { // NewService builds a new cadence-history service func NewService(params *service.BootstrapParams) common.Daemon { params.UpdateLoggerWithServiceName(common.HistoryServiceName) + fmt.Println(params.ESConfig) return &Service{ params: params, stopC: make(chan struct{}), config: NewConfig( dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), params.PersistenceConfig.NumHistoryShards, + params.ESConfig.Enable, ), } } diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 59c67d16199..00e6c66acc1 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -318,7 +318,7 @@ func (t *transferQueueActiveProcessorImpl) processDecisionTask(task *persistence // the rest of logic is making RPC call, which takes time. release(nil) if task.ScheduleID <= common.FirstEventID+2 { - err = t.recordWorkflowStarted(task.DomainID, execution, wfTypeName, startTimestamp.UnixNano(), workflowTimeout) + err = t.recordWorkflowStarted(task.DomainID, execution, wfTypeName, startTimestamp.UnixNano(), workflowTimeout, executionInfo.NextEventID) if err != nil { return err } @@ -383,7 +383,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten // the rest of logic is making RPC call, which takes time. release(nil) err = t.recordWorkflowClosed( - domainID, execution, workflowTypeName, workflowStartTimestamp, workflowCloseTimestamp, workflowCloseStatus, workflowHistoryLength, + domainID, execution, workflowTypeName, workflowStartTimestamp, workflowCloseTimestamp, workflowCloseStatus, workflowHistoryLength, executionInfo.NextEventID, ) if err != nil { return err diff --git a/service/history/transferQueueActiveProcessor_test.go b/service/history/transferQueueActiveProcessor_test.go index 1efcab90a23..9205368d2ec 100644 --- a/service/history/transferQueueActiveProcessor_test.go +++ b/service/history/transferQueueActiveProcessor_test.go @@ -331,7 +331,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_FirstDecisio s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockMatchingClient.On("AddDecisionTask", nil, s.createAddDecisionTaskRequest(transferTask, msBuilder)).Once().Return(nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", s.createRecordWorkflowExecutionStartedRequest(transferTask, msBuilder)).Once().Return(nil) - s.mockProducer.On("Publish", mock.Anything).Return(nil) + s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() _, err := s.transferQueueActiveProcessor.process(transferTask) s.Nil(err) @@ -562,6 +562,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_HasParent( CompletionEvent: event, }).Return(nil).Once() s.mockVisibilityMgr.On("RecordWorkflowExecutionClosed", mock.Anything).Return(nil).Once() + s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() _, err := s.transferQueueActiveProcessor.process(transferTask) s.Nil(err) @@ -613,6 +614,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_NoParent() persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionClosed", mock.Anything).Return(nil).Once() + s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() _, err := s.transferQueueActiveProcessor.process(transferTask) s.Nil(err) diff --git a/service/history/transferQueueProcessorBase.go b/service/history/transferQueueProcessorBase.go index 27a01f0f4c3..fffe767161c 100644 --- a/service/history/transferQueueProcessorBase.go +++ b/service/history/transferQueueProcessorBase.go @@ -22,10 +22,12 @@ package history import ( "github.com/uber-common/bark" + "github.com/uber/cadence/.gen/go/indexer" m "github.com/uber/cadence/.gen/go/matching" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" + es "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/persistence" @@ -139,11 +141,10 @@ func (t *transferQueueProcessorBase) pushDecision(task *persistence.TransferTask func (t *transferQueueProcessorBase) recordWorkflowStarted( domainID string, execution workflow.WorkflowExecution, workflowTypeName string, - startTimeUnixNano int64, workflowTimeout int32) error { + startTimeUnixNano int64, workflowTimeout int32, nextEventID int64) error { domain := defaultDomainName isSampledEnabled := false wid := execution.GetWorkflowId() - rid := execution.GetRunId() domainEntry, err := t.shard.GetDomainCache().GetDomainByID(domainID) if err != nil { @@ -162,12 +163,7 @@ func (t *transferQueueProcessorBase) recordWorkflowStarted( // publish to kafka if t.visibilityProducer != nil { - msg := &messaging.OpenWorkflowMsg{ - Domain: domain, - WorkflowID: wid, - RunID: rid, - StartTime: startTimeUnixNano, - } + msg := getVisibilityMessageForOpenExecution(domainID, execution, workflowTypeName, startTimeUnixNano, nextEventID) err := t.visibilityProducer.Publish(msg) if err != nil { return err @@ -187,7 +183,7 @@ func (t *transferQueueProcessorBase) recordWorkflowStarted( func (t *transferQueueProcessorBase) recordWorkflowClosed( domainID string, execution workflow.WorkflowExecution, workflowTypeName string, startTimeUnixNano int64, endTimeUnixNano int64, closeStatus workflow.WorkflowExecutionCloseStatus, - historyLength int64) error { + historyLength int64, nextEventID int64) error { // Record closing in visibility store retentionSeconds := int64(0) domain := defaultDomainName @@ -212,6 +208,16 @@ func (t *transferQueueProcessorBase) recordWorkflowClosed( return nil } + // publish to kafka + if t.visibilityProducer != nil { + msg := getVisibilityMessageForCloseExecution(domainID, execution, workflowTypeName, + startTimeUnixNano, endTimeUnixNano, closeStatus, historyLength, nextEventID) + err := t.visibilityProducer.Publish(msg) + if err != nil { + return err + } + } + return t.visibilityMgr.RecordWorkflowExecutionClosed(&persistence.RecordWorkflowExecutionClosedRequest{ DomainUUID: domainID, Domain: domain, @@ -224,3 +230,51 @@ func (t *transferQueueProcessorBase) recordWorkflowClosed( RetentionSeconds: retentionSeconds, }) } + +func getVisibilityMessageForOpenExecution(domainID string, execution workflow.WorkflowExecution, workflowTypeName string, + startTimeUnixNano int64, nextEventID int64) *indexer.Message { + + msgType := indexer.MessageTypeIndex + fields := map[string]*indexer.Field{ + es.WorkflowType: {Type: &es.FieldTypeString, StringData: common.StringPtr(workflowTypeName)}, + es.StartTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(startTimeUnixNano)}, + } + + msg := &indexer.Message{ + MessageType: &msgType, + DomainID: common.StringPtr(domainID), + WorkflowID: common.StringPtr(execution.GetWorkflowId()), + RunID: common.StringPtr(execution.GetRunId()), + Version: common.Int64Ptr(nextEventID), + IndexAttributes: &indexer.IndexAttributes{ + Fields: fields, + }, + } + return msg +} + +func getVisibilityMessageForCloseExecution(domainID string, execution workflow.WorkflowExecution, workflowTypeName string, + startTimeUnixNano int64, endTimeUnixNano int64, closeStatus workflow.WorkflowExecutionCloseStatus, + historyLength int64, nextEventID int64) *indexer.Message { + + msgType := indexer.MessageTypeIndex + fields := map[string]*indexer.Field{ + es.WorkflowType: {Type: &es.FieldTypeString, StringData: common.StringPtr(workflowTypeName)}, + es.StartTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(startTimeUnixNano)}, + es.CloseTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(endTimeUnixNano)}, + es.CloseStatus: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(closeStatus))}, + es.HistoryLength: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(historyLength)}, + } + + msg := &indexer.Message{ + MessageType: &msgType, + DomainID: common.StringPtr(domainID), + WorkflowID: common.StringPtr(execution.GetWorkflowId()), + RunID: common.StringPtr(execution.GetRunId()), + Version: common.Int64Ptr(nextEventID), + IndexAttributes: &indexer.IndexAttributes{ + Fields: fields, + }, + } + return msg +} diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index 2261969cc9a..a805dfaac1f 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -214,7 +214,7 @@ func (t *transferQueueStandbyProcessorImpl) processDecisionTask(transferTask *pe if !isPending { if markWorkflowAsOpen { - err := t.recordWorkflowStarted(transferTask.DomainID, execution, wfTypeName, startTimestamp.UnixNano(), workflowTimeout) + err := t.recordWorkflowStarted(transferTask.DomainID, execution, wfTypeName, startTimestamp.UnixNano(), workflowTimeout, executionInfo.NextEventID) if err != nil { return err } @@ -230,7 +230,7 @@ func (t *transferQueueStandbyProcessorImpl) processDecisionTask(transferTask *pe } if markWorkflowAsOpen { - err = t.recordWorkflowStarted(transferTask.DomainID, execution, wfTypeName, startTimestamp.UnixNano(), workflowTimeout) + err = t.recordWorkflowStarted(transferTask.DomainID, execution, wfTypeName, startTimestamp.UnixNano(), workflowTimeout, executionInfo.NextEventID) } now := t.shard.GetCurrentTime(t.clusterName) @@ -290,7 +290,7 @@ func (t *transferQueueStandbyProcessorImpl) processCloseExecution(transferTask * // since event replication should be done by active cluster return t.recordWorkflowClosed( - transferTask.DomainID, execution, workflowTypeName, workflowStartTimestamp, workflowCloseTimestamp, workflowCloseStatus, workflowHistoryLength, + transferTask.DomainID, execution, workflowTypeName, workflowStartTimestamp, workflowCloseTimestamp, workflowCloseStatus, workflowHistoryLength, executionInfo.NextEventID, ) }, standbyTaskPostActionNoOp) // no op post action, since the entire workflow is finished } diff --git a/service/history/transferQueueStandbyProcessor_test.go b/service/history/transferQueueStandbyProcessor_test.go index e167de3f5a9..9109f0dba20 100644 --- a/service/history/transferQueueStandbyProcessor_test.go +++ b/service/history/transferQueueStandbyProcessor_test.go @@ -379,7 +379,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessDecisionTask_Pending() { persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Return(nil) - s.mockProducer.On("Publish", mock.Anything).Return(nil) + s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() _, err := s.transferQueueStandbyProcessor.process(transferTask) s.Equal(ErrTaskRetry, err) } @@ -428,7 +428,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessDecisionTask_Pending_Pus s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Return(nil) s.mockMatchingClient.On("AddDecisionTask", mock.Anything, mock.Anything).Return(nil).Once() - s.mockProducer.On("Publish", mock.Anything).Return(nil) + s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() _, err := s.transferQueueStandbyProcessor.process(transferTask) s.Nil(nil, err) @@ -489,7 +489,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessDecisionTask_Success_Fir StartTimestamp: executionInfo.StartTimestamp.UnixNano(), WorkflowTimeout: int64(executionInfo.WorkflowTimeout), }).Return(nil).Once() - s.mockProducer.On("Publish", mock.Anything).Return(nil) + s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() _, err := s.transferQueueStandbyProcessor.process(transferTask) s.Nil(err) @@ -597,6 +597,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessCloseExecution() { persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockVisibilityMgr.On("RecordWorkflowExecutionClosed", mock.Anything).Return(nil).Once() + s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() _, err := s.transferQueueStandbyProcessor.process(transferTask) s.Nil(err) diff --git a/service/worker/indexer/esProcessor.go b/service/worker/indexer/esProcessor.go new file mode 100644 index 00000000000..e749ec51c39 --- /dev/null +++ b/service/worker/indexer/esProcessor.go @@ -0,0 +1,249 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package indexer + +import ( + "context" + "encoding/json" + "github.com/olivere/elastic" + "github.com/uber-common/bark" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/collection" + es "github.com/uber/cadence/common/elasticsearch" + "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "time" +) + +type ( + // ESProcessor is interface for elastic search bulk processor + ESProcessor interface { + // Stop processor and clean up + Stop() + // Add request to bulk, and record kafka message in map with provided key + // This call will be blocked when downstream has issues + Add(request elastic.BulkableRequest, key string, kafkaMsg messaging.Message) + } + + // ElasticBulkProcessor is interface for elastic.BulkProcessor + // (elastic package doesn't provide such interface that tests can mock) + ElasticBulkProcessor interface { + Start(ctx context.Context) error + Stop() error + Close() error + Stats() elastic.BulkProcessorStats + Add(request elastic.BulkableRequest) + Flush() error + } + + // esProcessorImpl implements ESProcessor, it's an agent of elastic.BulkProcessor + esProcessorImpl struct { + processor ElasticBulkProcessor + mapToKafkaMsg collection.ConcurrentTxMap // used to map ES request to kafka message + config *Config + logger bark.Logger + metricsClient metrics.Client + } +) + +var _ ESProcessor = (*esProcessorImpl)(nil) + +const ( + // retry configs for es bulk processor + esProcessorInitialRetryInterval = 200 * time.Millisecond + esProcessorMaxRetryInterval = 20 * time.Second +) + +// NewESProcessorAndStart create new ESProcessor and start +func NewESProcessorAndStart(config *Config, client *elastic.Client, processorName string, + logger bark.Logger, metricsClient metrics.Client) (ESProcessor, error) { + p := &esProcessorImpl{ + config: config, + logger: logger.WithFields(bark.Fields{ + logging.TagWorkflowComponent: logging.TagValueIndexerESProcessorComponent, + }), + metricsClient: metricsClient, + } + + processor, err := client.BulkProcessor(). + Name(processorName). + Workers(config.ESProcessorNumOfWorkers()). + BulkActions(config.ESProcessorBulkActions()). + BulkSize(config.ESProcessorBulkSize()). + FlushInterval(config.ESProcessorFlushInterval()). + Backoff(elastic.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval)). + After(p.bulkAfterAction). + Do(context.Background()) + if err != nil { + return nil, err + } + + p.processor = processor + p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) + return p, nil +} + +func (p *esProcessorImpl) Stop() { + p.processor.Stop() + p.mapToKafkaMsg = nil +} + +// Add an ES request, and an map item for kafka message +func (p *esProcessorImpl) Add(request elastic.BulkableRequest, key string, kafkaMsg messaging.Message) { + p.mapToKafkaMsg.Put(key, kafkaMsg) + p.processor.Add(request) +} + +// bulkAfterAction is triggered after bulk processor commit +func (p *esProcessorImpl) bulkAfterAction(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + if err != nil { + // This happens after configured retry, which means something bad happens on cluster or index + // When cluster back to live, processor will re-commit those failure requests + p.logger.WithFields(bark.Fields{ + logging.TagErr: err, + }).Error("Error commit bulk request.") + + for _, request := range requests { + p.logger.WithFields(bark.Fields{ + logging.TagESRequest: request.String(), + }).Error("ES request failed.") + + p.metricsClient.IncCounter(metrics.ESProcessorScope, metrics.ESProcessorFailures) + } + return + } + + responseItems := response.Items + for i := 0; i < len(requests); i++ { + key := p.getKeyForKafkaMsg(requests[i]) + if key == "" { + continue + } + responseItem := responseItems[i] + for _, resp := range responseItem { + switch { + case isResponseSuccess(resp.Status): + p.ackKafkaMsg(key) + case !isResponseRetriable(resp.Status): + p.nackKafkaMsg(key) + default: + // do nothing, bulk processor will retry + } + } + } +} + +func (p *esProcessorImpl) ackKafkaMsg(key string) { + p.ackKafkaMsgHelper(key, false) +} + +func (p *esProcessorImpl) nackKafkaMsg(key string) { + p.ackKafkaMsgHelper(key, true) +} + +func (p *esProcessorImpl) ackKafkaMsgHelper(key string, nack bool) { + msg, ok := p.mapToKafkaMsg.Get(key) + if !ok { + return // duplicate kafka message + } + kafkaMsg, ok := msg.(messaging.Message) + if !ok { // must be bug in code and bad deployment + p.logger.WithFields(bark.Fields{ + logging.TagESKey: key, + }).Fatal("Message is not kafka message.") + } + + if nack { + kafkaMsg.Nack() + } else { + kafkaMsg.Ack() + } + p.mapToKafkaMsg.Remove(key) +} + +func (p *esProcessorImpl) hashFn(key interface{}) uint32 { + id, ok := key.(string) + if !ok { + return 0 + } + numOfShards := p.config.IndexerConcurrency() + return uint32(common.WorkflowIDToHistoryShard(id, numOfShards)) +} + +func (p *esProcessorImpl) getKeyForKafkaMsg(request elastic.BulkableRequest) string { + req, err := request.Source() + if err != nil { + p.logger.WithFields(bark.Fields{ + logging.TagErr: err, + logging.TagESRequest: request.String(), + }).Error("Get request source err.") + p.metricsClient.IncCounter(metrics.ESProcessorScope, metrics.ESProcessorCorruptedData) + return "" + } + + var key string + if len(req) == 2 { // index or update requests + var body map[string]interface{} + if err := json.Unmarshal([]byte(req[1]), &body); err != nil { + p.logger.WithFields(bark.Fields{ + logging.TagErr: err, + }).Error("Unmarshal request body err.") + p.metricsClient.IncCounter(metrics.ESProcessorScope, metrics.ESProcessorCorruptedData) + return "" + } + + k, ok := body[es.KafkaKey] + if !ok { + // must be bug in code and bad deployment, check processor that add es requests + panic("KafkaKey not found") + } + key, ok = k.(string) + if !ok { + // must be bug in code and bad deployment, check processor that add es requests + panic("KafkaKey is not string") + } + } + return key +} + +// 409 - Version Conflict +// 404 - Not Found +func isResponseSuccess(status int) bool { + if status >= 200 && status < 300 || status == 409 || status == 404 { + return true + } + return false +} + +// isResponseRetriable is complaint with elastic.BulkProcessorService.RetryItemStatusCodes +// responses with these status will be kept in queue and retried until success +// 408 - Request Timeout +// 429 - Too Many Requests +// 503 - Service Unavailable +// 507 - Insufficient Storage +func isResponseRetriable(status int) bool { + switch status { + case 408, 429, 503, 507: + return true + } + return false +} diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go new file mode 100644 index 00000000000..271dc09ea6a --- /dev/null +++ b/service/worker/indexer/esProcessor_test.go @@ -0,0 +1,300 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package indexer + +import ( + "errors" + "github.com/olivere/elastic" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + "github.com/uber/cadence/common/collection" + es "github.com/uber/cadence/common/elasticsearch" + msgMocks "github.com/uber/cadence/common/messaging/mocks" + "github.com/uber/cadence/common/metrics" + mmocks "github.com/uber/cadence/common/metrics/mocks" + "github.com/uber/cadence/common/service/dynamicconfig" + "github.com/uber/cadence/service/worker/indexer/mocks" + "log" + "os" + "testing" + "time" +) + +type esProcessorSuite struct { + suite.Suite + esProcessor *esProcessorImpl + mockBulkProcessor *mocks.ElasticBulkProcessor + mockMetricClient *mmocks.Client +} + +var ( + testIndex = "test-index" + testType = esDocType + testID = "test-doc-id" +) + +func TestESProcessorSuite(t *testing.T) { + s := new(esProcessorSuite) + suite.Run(t, s) +} + +func (s *esProcessorSuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } +} + +func (s *esProcessorSuite) SetupTest() { + config := &Config{ + IndexerConcurrency: dynamicconfig.GetIntPropertyFn(32), + ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1), + ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10), + ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20), + ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute), + } + s.mockMetricClient = &mmocks.Client{} + s.mockBulkProcessor = &mocks.ElasticBulkProcessor{} + p := &esProcessorImpl{ + config: config, + logger: bark.NewNopLogger(), + metricsClient: s.mockMetricClient, + } + p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) + p.processor = s.mockBulkProcessor + + s.esProcessor = p +} + +func (s *esProcessorSuite) TearDownTest() { + s.mockBulkProcessor.AssertExpectations(s.T()) + s.mockMetricClient.AssertExpectations(s.T()) +} + +func (s *esProcessorSuite) TestNewESProcessorAndStart() { + config := &Config{ + ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1), + ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10), + ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20), + ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute), + } + + esClient := &elastic.Client{} + p, err := NewESProcessorAndStart(config, esClient, "test-processor", bark.NewNopLogger(), &mmocks.Client{}) + s.NoError(err) + s.NotNil(p) + + processor, ok := p.(*esProcessorImpl) + s.True(ok) + s.NotNil(processor.mapToKafkaMsg) + + p.Stop() +} + +func (s *esProcessorSuite) TestStop() { + s.mockBulkProcessor.On("Stop").Return(nil).Once() + s.esProcessor.Stop() + s.Nil(s.esProcessor.mapToKafkaMsg) +} + +func (s *esProcessorSuite) TestAdd() { + request := elastic.NewBulkIndexRequest() + mockKafkaMsg := &msgMocks.Message{} + key := "test-key" + s.Equal(0, s.esProcessor.mapToKafkaMsg.Size()) + + s.mockBulkProcessor.On("Add", request).Return().Once() + s.esProcessor.Add(request, key, mockKafkaMsg) + s.Equal(1, s.esProcessor.mapToKafkaMsg.Size()) + mockKafkaMsg.AssertExpectations(s.T()) +} + +func (s *esProcessorSuite) TestBulkAfterAction() { + version := int64(3) + testKey := "testKey" + request := elastic.NewBulkIndexRequest(). + Index(testIndex). + Type(testType). + Id(testID). + VersionType(versionTypeExternal). + Version(version). + Doc(map[string]interface{}{es.KafkaKey: testKey}) + requests := []elastic.BulkableRequest{request} + + mSuccess := map[string]*elastic.BulkResponseItem{ + "index": { + Index: testIndex, + Type: testType, + Id: testID, + Version: version, + Status: 200, + }, + } + response := &elastic.BulkResponse{ + Took: 3, + Errors: false, + Items: []map[string]*elastic.BulkResponseItem{mSuccess}, + } + + mockKafkaMsg := &msgMocks.Message{} + s.esProcessor.mapToKafkaMsg.Put(testKey, mockKafkaMsg) + mockKafkaMsg.On("Ack").Return(nil).Once() + s.esProcessor.bulkAfterAction(0, requests, response, nil) + mockKafkaMsg.AssertExpectations(s.T()) +} + +func (s *esProcessorSuite) TestBulkAfterAction_Nack() { + version := int64(3) + testKey := "testKey" + request := elastic.NewBulkIndexRequest(). + Index(testIndex). + Type(testType). + Id(testID). + VersionType(versionTypeExternal). + Version(version). + Doc(map[string]interface{}{es.KafkaKey: testKey}) + requests := []elastic.BulkableRequest{request} + + mFailed := map[string]*elastic.BulkResponseItem{ + "index": { + Index: testIndex, + Type: testType, + Id: testID, + Version: version, + Status: 400, + }, + } + response := &elastic.BulkResponse{ + Took: 3, + Errors: false, + Items: []map[string]*elastic.BulkResponseItem{mFailed}, + } + + mockKafkaMsg := &msgMocks.Message{} + s.esProcessor.mapToKafkaMsg.Put(testKey, mockKafkaMsg) + mockKafkaMsg.On("Nack").Return(nil).Once() + s.esProcessor.bulkAfterAction(0, requests, response, nil) + mockKafkaMsg.AssertExpectations(s.T()) +} + +func (s *esProcessorSuite) TestBulkAfterAction_Error() { + version := int64(3) + request := elastic.NewBulkIndexRequest(). + Index(testIndex). + Type(testType). + Id(testID). + VersionType(versionTypeExternal). + Version(version) + requests := []elastic.BulkableRequest{request} + + mFailed := map[string]*elastic.BulkResponseItem{ + "index": { + Index: testIndex, + Type: testType, + Id: testID, + Version: version, + Status: 400, + }, + } + response := &elastic.BulkResponse{ + Took: 3, + Errors: true, + Items: []map[string]*elastic.BulkResponseItem{mFailed}, + } + + s.mockMetricClient.On("IncCounter", metrics.ESProcessorScope, metrics.ESProcessorFailures).Once() + s.esProcessor.bulkAfterAction(0, requests, response, errors.New("some error")) +} + +func (s *esProcessorSuite) TestAckKafkaMsg() { + key := "test-key" + // no msg in map, nothing called + s.esProcessor.ackKafkaMsg(key) + + request := elastic.NewBulkIndexRequest() + mockKafkaMsg := &msgMocks.Message{} + s.mockBulkProcessor.On("Add", request).Return().Once() + s.esProcessor.Add(request, key, mockKafkaMsg) + s.Equal(1, s.esProcessor.mapToKafkaMsg.Size()) + + mockKafkaMsg.On("Ack").Return(nil).Once() + s.esProcessor.ackKafkaMsg(key) + mockKafkaMsg.AssertExpectations(s.T()) + s.Equal(0, s.esProcessor.mapToKafkaMsg.Size()) +} + +func (s *esProcessorSuite) TestNackKafkaMsg() { + key := "test-key-nack" + // no msg in map, nothing called + s.esProcessor.nackKafkaMsg(key) + + request := elastic.NewBulkIndexRequest() + mockKafkaMsg := &msgMocks.Message{} + s.mockBulkProcessor.On("Add", request).Return().Once() + s.esProcessor.Add(request, key, mockKafkaMsg) + s.Equal(1, s.esProcessor.mapToKafkaMsg.Size()) + + mockKafkaMsg.On("Nack").Return(nil).Once() + s.esProcessor.nackKafkaMsg(key) + mockKafkaMsg.AssertExpectations(s.T()) + s.Equal(0, s.esProcessor.mapToKafkaMsg.Size()) +} + +func (s *esProcessorSuite) TestHashFn() { + s.Equal(uint32(0), s.esProcessor.hashFn(0)) + s.NotEqual(uint32(0), s.esProcessor.hashFn("test")) +} + +func (s *esProcessorSuite) TestGetKeyForKafkaMsg() { + request := elastic.NewBulkIndexRequest() + s.PanicsWithValue("KafkaKey not found", func() { s.esProcessor.getKeyForKafkaMsg(request) }) + + m := map[string]interface{}{ + es.KafkaKey: 1, + } + request.Doc(m) + s.PanicsWithValue("KafkaKey is not string", func() { s.esProcessor.getKeyForKafkaMsg(request) }) + + testKey := "test-key" + m[es.KafkaKey] = testKey + request.Doc(m) + s.Equal(testKey, s.esProcessor.getKeyForKafkaMsg(request)) +} + +func (s *esProcessorSuite) TestIsResponseSuccess() { + for i := 200; i < 300; i++ { + s.True(isResponseSuccess(i)) + } + status := []int{409, 404} + for _, code := range status { + s.True(isResponseSuccess(code)) + } + status = []int{100, 199, 300, 400, 500, 408, 429, 503, 507} + for _, code := range status { + s.False(isResponseSuccess(code)) + } +} + +func (s *esProcessorSuite) TestIsResponseRetriable() { + status := []int{408, 429, 503, 507} + for _, code := range status { + s.True(isResponseRetriable(code)) + } +} diff --git a/service/worker/indexer/indexer.go b/service/worker/indexer/indexer.go new file mode 100644 index 00000000000..b910acaaca2 --- /dev/null +++ b/service/worker/indexer/indexer.go @@ -0,0 +1,94 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package indexer + +import ( + "fmt" + "github.com/olivere/elastic" + "github.com/uber-common/bark" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/elasticsearch" + "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/service/dynamicconfig" +) + +type ( + // Indexer used to consumer data from kafka then send to ElasticSearch + Indexer struct { + config *Config + kafkaClient messaging.Client + esClient *elastic.Client + logger bark.Logger + metricsClient metrics.Client + visibilityProcessor *indexProcessor + visibilityIndexName string + } + + // Config contains all configs for indexer + Config struct { + IndexerConcurrency dynamicconfig.IntPropertyFn + ESProcessorNumOfWorkers dynamicconfig.IntPropertyFn + ESProcessorBulkActions dynamicconfig.IntPropertyFn // max number of requests in bulk + ESProcessorBulkSize dynamicconfig.IntPropertyFn // max total size of bytes in bulk + ESProcessorFlushInterval dynamicconfig.DurationPropertyFn + } +) + +const ( + visibilityProcessorName = "visibility-processor" +) + +// NewIndexer create a new Indexer +func NewIndexer(config *Config, client messaging.Client, esClient *elastic.Client, esConfig *elasticsearch.Config, + logger bark.Logger, metricsClient metrics.Client) *Indexer { + logger = logger.WithFields(bark.Fields{ + logging.TagWorkflowComponent: logging.TagValueIndexerComponent, + }) + + return &Indexer{ + config: config, + kafkaClient: client, + esClient: esClient, + logger: logger, + metricsClient: metricsClient, + visibilityIndexName: esConfig.Indices[common.VisibilityAppName], + } +} + +// Start indexer +func (x Indexer) Start() error { + visibilityApp := common.VisibilityAppName + visConsumerName := getConsumerName(visibilityApp) + x.visibilityProcessor = newIndexProcessor(visibilityApp, visConsumerName, x.kafkaClient, x.esClient, + visibilityProcessorName, x.visibilityIndexName, x.config, x.logger, x.metricsClient) + return x.visibilityProcessor.Start() +} + +// Stop indexer +func (x Indexer) Stop() { + x.visibilityProcessor.Stop() +} + +func getConsumerName(topic string) string { + return fmt.Sprintf("%s-consumer", topic) +} diff --git a/service/worker/indexer/mocks/ElasticBulkProcessor.go b/service/worker/indexer/mocks/ElasticBulkProcessor.go new file mode 100644 index 00000000000..3d20b8e2627 --- /dev/null +++ b/service/worker/indexer/mocks/ElasticBulkProcessor.go @@ -0,0 +1,106 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package mocks + +import context "context" +import elastic "github.com/olivere/elastic" + +import mock "github.com/stretchr/testify/mock" + +// ElasticBulkProcessor is an autogenerated mock type for the ElasticBulkProcessor type +type ElasticBulkProcessor struct { + mock.Mock +} + +// Add provides a mock function with given fields: request +func (_m *ElasticBulkProcessor) Add(request elastic.BulkableRequest) { + _m.Called(request) +} + +// Close provides a mock function with given fields: +func (_m *ElasticBulkProcessor) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Flush provides a mock function with given fields: +func (_m *ElasticBulkProcessor) Flush() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: ctx +func (_m *ElasticBulkProcessor) Start(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stats provides a mock function with given fields: +func (_m *ElasticBulkProcessor) Stats() elastic.BulkProcessorStats { + ret := _m.Called() + + var r0 elastic.BulkProcessorStats + if rf, ok := ret.Get(0).(func() elastic.BulkProcessorStats); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(elastic.BulkProcessorStats) + } + + return r0 +} + +// Stop provides a mock function with given fields: +func (_m *ElasticBulkProcessor) Stop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/service/worker/indexer/processor.go b/service/worker/indexer/processor.go new file mode 100644 index 00000000000..0727ffa7620 --- /dev/null +++ b/service/worker/indexer/processor.go @@ -0,0 +1,267 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package indexer + +import ( + "fmt" + "github.com/olivere/elastic" + "github.com/uber-common/bark" + "github.com/uber/cadence/.gen/go/indexer" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/codec" + es "github.com/uber/cadence/common/elasticsearch" + "github.com/uber/cadence/common/logging" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "sync" + "sync/atomic" + "time" +) + +type indexProcessor struct { + appName string + consumerName string + kafkaClient messaging.Client + consumer messaging.Consumer + esClient *elastic.Client + esProcessor ESProcessor + esProcessorName string + esIndexName string + config *Config + logger bark.Logger + metricsClient metrics.Client + isStarted int32 + isStopped int32 + shutdownWG sync.WaitGroup + shutdownCh chan struct{} + msgEncoder codec.BinaryEncoder +} + +const ( + esDocIDDelimiter = "~" + esDocType = "_doc" + + versionTypeExternal = "external" +) + +var ( + errUnknownMessageType = &shared.BadRequestError{Message: "unknown message type"} +) + +func newIndexProcessor(appName, consumerName string, kafkaClient messaging.Client, esClient *elastic.Client, + esProcessorName, esIndexName string, config *Config, logger bark.Logger, metricsClient metrics.Client) *indexProcessor { + return &indexProcessor{ + appName: appName, + consumerName: consumerName, + kafkaClient: kafkaClient, + esClient: esClient, + esProcessorName: esProcessorName, + esIndexName: esIndexName, + config: config, + logger: logger.WithFields(bark.Fields{ + logging.TagWorkflowComponent: logging.TagValueIndexerProcessorComponent, + }), + metricsClient: metricsClient, + shutdownCh: make(chan struct{}), + msgEncoder: codec.NewThriftRWEncoder(), + } +} + +func (p *indexProcessor) Start() error { + if !atomic.CompareAndSwapInt32(&p.isStarted, 0, 1) { + return nil + } + + logging.LogIndexProcessorStartingEvent(p.logger) + consumer, err := p.kafkaClient.NewConsumer(p.appName, p.consumerName, p.config.IndexerConcurrency()) + if err != nil { + logging.LogIndexProcessorStartFailedEvent(p.logger, err) + return err + } + + if err := consumer.Start(); err != nil { + logging.LogIndexProcessorStartFailedEvent(p.logger, err) + return err + } + + esProcessor, err := NewESProcessorAndStart(p.config, p.esClient, p.esProcessorName, p.logger, p.metricsClient) + if err != nil { + logging.LogIndexProcessorStartFailedEvent(p.logger, err) + return err + } + + p.consumer = consumer + p.esProcessor = esProcessor + p.shutdownWG.Add(1) + go p.processorPump() + + logging.LogIndexProcessorStartedEvent(p.logger) + return nil +} + +func (p *indexProcessor) Stop() { + if !atomic.CompareAndSwapInt32(&p.isStopped, 0, 1) { + return + } + + logging.LogIndexProcessorShuttingDownEvent(p.logger) + defer logging.LogIndexProcessorShutDownEvent(p.logger) + + if atomic.LoadInt32(&p.isStarted) == 1 { + close(p.shutdownCh) + } + + if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success { + logging.LogIndexProcessorShutDownTimedoutEvent(p.logger) + } +} + +func (p *indexProcessor) processorPump() { + defer p.shutdownWG.Done() + + var workerWG sync.WaitGroup + for workerID := 0; workerID < p.config.IndexerConcurrency(); workerID++ { + workerWG.Add(1) + go p.messageProcessLoop(&workerWG, workerID) + } + + select { + case <-p.shutdownCh: + // Processor is shutting down, close the underlying consumer and esProcessor + p.consumer.Stop() + p.esProcessor.Stop() + } + + p.logger.Info("Index processor pump shutting down.") + if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success { + p.logger.Warn("Index processor timed out on worker shutdown.") + } +} + +func (p *indexProcessor) messageProcessLoop(workerWG *sync.WaitGroup, workerID int) { + defer workerWG.Done() + + for { + select { + case msg, ok := <-p.consumer.Messages(): + if !ok { + p.logger.Info("Worker for index processor shutting down.") + return // channel closed + } + err := p.process(msg) + if err != nil { + msg.Nack() + } + } + } +} + +func (p *indexProcessor) process(kafkaMsg messaging.Message) error { + logger := p.logger.WithFields(bark.Fields{ + logging.TagPartitionKey: kafkaMsg.Partition(), + logging.TagOffset: kafkaMsg.Offset(), + logging.TagAttemptStart: time.Now(), + }) + + indexMsg, err := p.deserialize(kafkaMsg.Value()) + if err != nil { + logger.WithFields(bark.Fields{ + logging.TagErr: err, + }).Error("Failed to deserialize index messages.") + p.metricsClient.IncCounter(metrics.IndexProcessorScope, metrics.IndexProcessorCorruptedData) + return err + } + + switch indexMsg.GetMessageType() { + case indexer.MessageTypeIndex: + p.addMessageToESProcessor(kafkaMsg, indexMsg) + case indexer.MessageTypeDelete: + // TODO + default: + logger.Error("Unknown message type") + p.metricsClient.IncCounter(metrics.IndexProcessorScope, metrics.IndexProcessorCorruptedData) + err = errUnknownMessageType + } + + return err +} + +func (p *indexProcessor) deserialize(payload []byte) (*indexer.Message, error) { + var msg indexer.Message + if err := p.msgEncoder.Decode(payload, &msg); err != nil { + return nil, err + } + return &msg, nil +} + +func (p *indexProcessor) addMessageToESProcessor(kafkaMsg messaging.Message, indexMsg *indexer.Message) { + keyToKafkaMsg := fmt.Sprintf("%v-%v", kafkaMsg.Partition(), kafkaMsg.Offset()) + docID := indexMsg.GetWorkflowID() + esDocIDDelimiter + indexMsg.GetRunID() + doc := p.generateESDoc(indexMsg, keyToKafkaMsg) + req := elastic.NewBulkIndexRequest(). + Index(p.esIndexName). + Type(esDocType). + Id(docID). + VersionType(versionTypeExternal). + Version(indexMsg.GetVersion()). + Doc(doc) + p.esProcessor.Add(req, keyToKafkaMsg, kafkaMsg) +} + +func (p *indexProcessor) generateESDoc(msg *indexer.Message, keyToKafkaMsg string) map[string]interface{} { + doc := p.dumpFieldsToMap(msg.IndexAttributes.Fields) + fulfillDoc(doc, msg, keyToKafkaMsg) + return doc +} + +func (p *indexProcessor) dumpFieldsToMap(fields map[string]*indexer.Field) map[string]interface{} { + doc := make(map[string]interface{}) + for k, v := range fields { + if !es.IsFieldNameValid(k) { + p.logger.WithFields(bark.Fields{ + logging.TagESField: k, + }).Error("Unregistered field.") + p.metricsClient.IncCounter(metrics.IndexProcessorScope, metrics.IndexProcessorCorruptedData) + continue + } + + switch v.GetType() { + case indexer.FieldTypeString: + doc[k] = v.GetStringData() + case indexer.FieldTypeInt: + doc[k] = v.GetIntData() + case indexer.FieldTypeBool: + doc[k] = v.GetBoolData() + default: + // must be bug in code and bad deployment, check data sent from producer + p.logger.Fatalf("Unknown field type") + } + } + return doc +} + +func fulfillDoc(doc map[string]interface{}, msg *indexer.Message, keyToKafkaMsg string) { + doc[es.DomainID] = msg.GetDomainID() + doc[es.WorkflowID] = msg.GetWorkflowID() + doc[es.RunID] = msg.GetRunID() + doc[es.KafkaKey] = keyToKafkaMsg +} diff --git a/service/worker/replicator/processor.go b/service/worker/replicator/processor.go index 02bac773e70..daf3dcc4f6c 100644 --- a/service/worker/replicator/processor.go +++ b/service/worker/replicator/processor.go @@ -134,7 +134,7 @@ func (p *replicationTaskProcessor) Start() error { } logging.LogReplicationTaskProcessorStartingEvent(p.logger) - consumer, err := p.client.NewConsumer(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency()) + consumer, err := p.client.NewConsumerWithClusterName(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency()) if err != nil { logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err) return err diff --git a/service/worker/service.go b/service/worker/service.go index 277efbcf040..631896e9036 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -35,6 +35,7 @@ import ( persistencefactory "github.com/uber/cadence/common/persistence/persistence-factory" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/service/dynamicconfig" + "github.com/uber/cadence/service/worker/indexer" "github.com/uber/cadence/service/worker/replicator" "github.com/uber/cadence/service/worker/sysworkflow" "go.uber.org/cadence/.gen/go/shared" @@ -65,6 +66,7 @@ type ( Config struct { ReplicationCfg *replicator.Config SysWorkflowCfg *sysworkflow.Config + IndexerCfg *indexer.Config } ) @@ -89,6 +91,13 @@ func NewConfig(dc *dynamicconfig.Collection) *Config { ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 50), }, SysWorkflowCfg: &sysworkflow.Config{}, + IndexerCfg: &indexer.Config{ + IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 1000), + ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 1), + ESProcessorBulkActions: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkActions, 1000), + ESProcessorBulkSize: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkSize, 2<<24), // 16MB + ESProcessorFlushInterval: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorFlushInterval, 10*time.Second), + }, } } @@ -122,6 +131,10 @@ func (s *Service) Start() { s.startSysWorker(base, log, params.MetricScope) } + if s.params.ESConfig.Enable { + s.startIndexer(params, base, log) + } + log.Infof("%v started", common.WorkerServiceName) <-s.stopC base.Stop() @@ -142,7 +155,15 @@ func (s *Service) startReplicator(params *service.BootstrapParams, base service. s.config.ReplicationCfg, params.MessagingClient, log, s.metricsClient) if err := replicator.Start(); err != nil { replicator.Stop() - log.Fatalf("Fail to start replicator: %v", err) + log.Fatalf("fail to start replicator: %v", err) + } +} + +func (s *Service) startIndexer(params *service.BootstrapParams, base service.Service, log bark.Logger) { + indexer := indexer.NewIndexer(s.config.IndexerCfg, params.MessagingClient, params.ESClient, params.ESConfig, log, s.metricsClient) + if err := indexer.Start(); err != nil { + indexer.Stop() + log.Fatalf("fail to start indexer: %v", err) } }