diff --git a/common/elasticsearch/client_v6.go b/common/elasticsearch/client_v6.go index 25a6335c239..a6d05649b17 100644 --- a/common/elasticsearch/client_v6.go +++ b/common/elasticsearch/client_v6.go @@ -30,11 +30,7 @@ import ( "strconv" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" "github.com/olivere/elastic" - esaws "github.com/olivere/elastic/aws/v4" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" @@ -46,14 +42,12 @@ import ( ) var _ GenericClient = (*elasticV6)(nil) -var _ GenericBulkProcessor = (*v6BulkProcessor)(nil) type ( // elasticV6 implements Client elasticV6 struct { - client *elastic.Client - logger log.Logger - serializer p.PayloadSerializer + client *elastic.Client + logger log.Logger } // searchParametersV6 holds all required and optional parameters for executing a search @@ -65,27 +59,13 @@ type ( Sorter []elastic.Sorter SearchAfter []interface{} } - - // bulkProcessorParametersV6 holds all required and optional parameters for executing bulk service - bulkProcessorParametersV6 struct { - Name string - NumOfWorkers int - BulkActions int - BulkSize int - FlushInterval time.Duration - Backoff elastic.Backoff - BeforeFunc elastic.BulkBeforeFunc - AfterFunc elastic.BulkAfterFunc - } ) -func (c *elasticV6) IsNotFoundError(err error) bool { - return elastic.IsNotFound(err) -} - // NewV6Client returns a new implementation of GenericClient func NewV6Client( connectConfig *config.ElasticSearchConfig, + tlsClient *http.Client, + awsSigningClient *http.Client, logger log.Logger, clientOptFuncs ...elastic.ClientOptionFunc, ) (GenericClient, error) { @@ -100,29 +80,12 @@ func NewV6Client( if connectConfig.DisableHealthCheck { clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false)) } - if connectConfig.AWSSigning.Enable { - if err := config.CheckAWSSigningConfig(connectConfig.AWSSigning); err != nil { - return nil, err - } - var signingClient *http.Client - var err error - if connectConfig.AWSSigning.EnvironmentCredential != nil { - signingClient, err = buildSigningHTTPClientFromEnvironmentCredentialV6(*connectConfig.AWSSigning.EnvironmentCredential) - } else { - signingClient, err = buildSigningHTTPClientFromStaticCredentialV6(*connectConfig.AWSSigning.StaticCredential) - } - if err != nil { - return nil, err - } - clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(signingClient)) + + if awsSigningClient != nil { + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(awsSigningClient)) } - if connectConfig.TLS.Enabled { - var tlsClient *http.Client - var err error - tlsClient, err = buildTLSHTTPClient(connectConfig.TLS) - if err != nil { - return nil, err - } + + if tlsClient != nil { clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(tlsClient)) } @@ -132,30 +95,13 @@ func NewV6Client( } return &elasticV6{ - client: client, - logger: logger, - serializer: p.NewPayloadSerializer(), + client: client, + logger: logger, }, nil } -// Refer to https://github.com/olivere/elastic/blob/release-branch.v6/recipes/aws-connect-v4/main.go -func buildSigningHTTPClientFromStaticCredentialV6(credentialConfig config.AWSStaticCredential) (*http.Client, error) { - awsCredentials := credentials.NewStaticCredentials( - credentialConfig.AccessKey, - credentialConfig.SecretKey, - credentialConfig.SessionToken, - ) - return esaws.NewV4SigningClient(awsCredentials, credentialConfig.Region), nil -} - -func buildSigningHTTPClientFromEnvironmentCredentialV6(credentialConfig config.AWSEnvironmentCredential) (*http.Client, error) { - sess, err := session.NewSession(&aws.Config{ - Region: aws.String(credentialConfig.Region)}, - ) - if err != nil { - return nil, err - } - return esaws.NewV4SigningClient(sess.Config.Credentials, credentialConfig.Region), nil +func (c *elasticV6) IsNotFoundError(err error) bool { + return elastic.IsNotFound(err) } // root is for nested object like Attr property for search attributes. @@ -175,17 +121,16 @@ func (c *elasticV6) CountByQuery(ctx context.Context, index, query string) (int6 } func (c *elasticV6) Search(ctx context.Context, request *SearchRequest) (*p.InternalListWorkflowExecutionsResponse, error) { + token, err := GetNextPageToken(request.ListRequest.NextPageToken) + if err != nil { + return nil, err + } var matchQuery *elastic.MatchQuery if request.MatchQuery != nil { matchQuery = elastic.NewMatchQuery(request.MatchQuery.Name, request.MatchQuery.Text) } - token, err := GetNextPageToken(request.ListRequest.NextPageToken) - if err != nil { - return nil, err - } - searchResult, err := c.getSearchResult( ctx, request.Index, @@ -203,12 +148,11 @@ func (c *elasticV6) Search(ctx context.Context, request *SearchRequest) (*p.Inte } func (c *elasticV6) SearchByQuery(ctx context.Context, request *SearchByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { - searchResult, err := c.searchWithDSL(ctx, request.Index, request.Query) + token, err := GetNextPageToken(request.NextPageToken) if err != nil { return nil, err } - - token, err := GetNextPageToken(request.NextPageToken) + searchResult, err := c.client.Search(request.Index).Source(request.Query).Do(ctx) if err != nil { return nil, err } @@ -222,73 +166,46 @@ func (c *elasticV6) ScanByQuery(ctx context.Context, request *ScanByQueryRequest if err != nil { return nil, err } - - var searchResult *elastic.SearchResult - var scrollService *elastic.ScrollService - - if len(token.ScrollID) == 0 { // first call - searchResult, scrollService, err = c.scrollFirstPage(ctx, request.Index, request.Query) - } else { - searchResult, scrollService, err = c.scroll(ctx, token.ScrollID) - } + searchResult, err := c.scroll(ctx, request.Index, request.Query, token.ScrollID) isLastPage := false if err == io.EOF { // no more result isLastPage = true - if scrollService != nil { - err := scrollService.Clear(ctx) - if err != nil { - c.logger.Warn("scrollService Clear fail", tag.Error(err)) - } + if err := c.clearScroll(ctx, searchResult.ScrollId); err != nil { + c.logger.Warn("scroll clear failed", tag.Error(err)) } } else if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("ScanByQuery failed. Error: %v", err), } } + response := &p.InternalListWorkflowExecutionsResponse{} + actualHits := searchResult.Hits.Hits + numOfActualHits := len(actualHits) + response.Executions = c.esHitsToExecutions(searchResult.Hits, nil /* no filter */) - return c.getScanWorkflowExecutionsResponse(searchResult.Hits, request.PageSize, searchResult.ScrollId, isLastPage) -} + if numOfActualHits == request.PageSize && !isLastPage { + nextPageToken, err := SerializePageToken(&ElasticVisibilityPageToken{ScrollID: searchResult.ScrollId}) + if err != nil { + return nil, err + } + response.NextPageToken = make([]byte, len(nextPageToken)) + copy(response.NextPageToken, nextPageToken) + } -func (c *elasticV6) RunBulkProcessor(ctx context.Context, parameters *BulkProcessorParameters) (GenericBulkProcessor, error) { - beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { - parameters.BeforeFunc(executionId, fromV6ToGenericBulkableRequests(requests)) - } - - afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { - gerr := convertV6ErrorToGenericError(err) - parameters.AfterFunc( - executionId, - fromV6ToGenericBulkableRequests(requests), - fromV6toGenericBulkResponse(response), - gerr) - } - - return c.runBulkProcessor(ctx, &bulkProcessorParametersV6{ - Name: parameters.Name, - NumOfWorkers: parameters.NumOfWorkers, - BulkActions: parameters.BulkActions, - BulkSize: parameters.BulkSize, - FlushInterval: parameters.FlushInterval, - Backoff: parameters.Backoff, - BeforeFunc: beforeFunc, - AfterFunc: afterFunc, - }) + return response, nil } -func convertV6ErrorToGenericError(err error) *GenericError { - if err == nil { - return nil - } - status := unknownStatusCode - switch e := err.(type) { - case *elastic.Error: - status = e.Status - } - return &GenericError{ - Status: status, - Details: err, +func (c *elasticV6) scroll(ctx context.Context, index, query, scrollID string) (*elastic.SearchResult, error) { + scrollService := elastic.NewScrollService(c.client) + if len(scrollID) == 0 { + return scrollService.Index(index).Body(query).Do(ctx) } + return scrollService.ScrollId(scrollID).Do(ctx) +} + +func (c *elasticV6) clearScroll(ctx context.Context, scrollID string) error { + return elastic.NewScrollService(c.client).ScrollId(scrollID).Clear(ctx) } func (c *elasticV6) SearchForOneClosedExecution( @@ -328,58 +245,6 @@ func (c *elasticV6) SearchForOneClosedExecution( return response, nil } -func fromV6toGenericBulkResponse(response *elastic.BulkResponse) *GenericBulkResponse { - if response == nil { - return &GenericBulkResponse{} - } - return &GenericBulkResponse{ - Took: response.Took, - Errors: response.Errors, - Items: fromV6ToGenericBulkResponseItemMaps(response.Items), - } -} - -func fromV6ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*GenericBulkResponseItem { - var gitems []map[string]*GenericBulkResponseItem - for _, it := range items { - gitems = append(gitems, fromV6ToGenericBulkResponseItemMap(it)) - } - return gitems -} - -func fromV6ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*GenericBulkResponseItem { - if m == nil { - return nil - } - gm := make(map[string]*GenericBulkResponseItem, len(m)) - for k, v := range m { - gm[k] = fromV6ToGenericBulkResponseItem(v) - } - return gm -} - -func fromV6ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkResponseItem { - return &GenericBulkResponseItem{ - Index: v.Index, - Type: v.Type, - ID: v.Id, - Version: v.Version, - Result: v.Result, - SeqNo: v.SeqNo, - PrimaryTerm: v.PrimaryTerm, - Status: v.Status, - ForcedRefresh: v.ForcedRefresh, - } -} - -func fromV6ToGenericBulkableRequests(requests []elastic.BulkableRequest) []GenericBulkableRequest { - var v6Reqs []GenericBulkableRequest - for _, req := range requests { - v6Reqs = append(v6Reqs, req) - } - return v6Reqs -} - func (c *elasticV6) search(ctx context.Context, p *searchParametersV6) (*elastic.SearchResult, error) { searchService := c.client.Search(p.Index). Query(p.Query). @@ -417,19 +282,12 @@ func (c *elasticV6) SearchRaw(ctx context.Context, index string, query string) ( result := RawResponse{ TookInMillis: esResult.TookInMillis, + Hits: SearchHits{ + TotalHits: esResult.TotalHits(), + Hits: c.esHitsToExecutions(esResult.Hits, nil /* no filter */), + }, } - - result.Hits = SearchHits{ - TotalHits: esResult.TotalHits(), - } - if esResult.Hits != nil && len(esResult.Hits.Hits) > 0 { - result.Hits.Hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0, len(esResult.Hits.Hits)) - for _, hit := range esResult.Hits.Hits { - workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit) - result.Hits.Hits = append(result.Hits.Hits, workflowExecutionInfo) - } - } - + // V6 specific action if len(esResult.Aggregations) > 0 { result.Aggregations = make(map[string]json.RawMessage, len(esResult.Aggregations)) for key, agg := range esResult.Aggregations { @@ -440,95 +298,17 @@ func (c *elasticV6) SearchRaw(ctx context.Context, index string, query string) ( return &result, nil } -func (c *elasticV6) searchWithDSL(ctx context.Context, index, query string) (*elastic.SearchResult, error) { - return c.client.Search(index).Source(query).Do(ctx) -} - -func (c *elasticV6) scroll(ctx context.Context, scrollID string) ( - *elastic.SearchResult, *elastic.ScrollService, error) { - - scrollService := elastic.NewScrollService(c.client) - result, err := scrollService.ScrollId(scrollID).Do(ctx) - return result, scrollService, err -} - -func (c *elasticV6) scrollFirstPage(ctx context.Context, index, query string) ( - *elastic.SearchResult, *elastic.ScrollService, error) { - - scrollService := elastic.NewScrollService(c.client) - result, err := scrollService.Index(index).Body(query).Do(ctx) - return result, scrollService, err -} - -type v6BulkProcessor struct { - processor *elastic.BulkProcessor -} - -func (v *v6BulkProcessor) Start(ctx context.Context) error { - return v.processor.Start(ctx) -} - -func (v *v6BulkProcessor) Stop() error { - return v.processor.Stop() -} - -func (v *v6BulkProcessor) Close() error { - return v.processor.Close() -} - -func (v *v6BulkProcessor) Add(request *GenericBulkableAddRequest) { - var req elastic.BulkableRequest - switch request.RequestType { - case BulkableDeleteRequest: - req = elastic.NewBulkDeleteRequest(). - Index(request.Index). - Type(request.Type). - Id(request.ID). - VersionType(request.VersionType). - Version(request.Version) - case BulkableIndexRequest: - req = elastic.NewBulkIndexRequest(). - Index(request.Index). - Type(request.Type). - Id(request.ID). - VersionType(request.VersionType). - Version(request.Version). - Doc(request.Doc) - case BulkableCreateRequest: - //for bulk create request still calls the bulk index method - //with providing operation type - req = elastic.NewBulkIndexRequest(). - OpType("create"). - Index(request.Index). - Type(request.Type). - Id(request.ID). - VersionType("internal"). - Doc(request.Doc) - } - v.processor.Add(req) -} - -func (v *v6BulkProcessor) Flush() error { - return v.processor.Flush() -} - -func (c *elasticV6) runBulkProcessor(ctx context.Context, p *bulkProcessorParametersV6) (*v6BulkProcessor, error) { - processor, err := c.client.BulkProcessor(). - Name(p.Name). - Workers(p.NumOfWorkers). - BulkActions(p.BulkActions). - BulkSize(p.BulkSize). - FlushInterval(p.FlushInterval). - Backoff(p.Backoff). - Before(p.BeforeFunc). - After(p.AfterFunc). - Do(ctx) - if err != nil { - return nil, err +func (c *elasticV6) esHitsToExecutions(eshits *elastic.SearchHits, filter IsRecordValidFilter) []*p.InternalVisibilityWorkflowExecutionInfo { + var hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) + if eshits != nil && len(eshits.Hits) > 0 { + for _, hit := range eshits.Hits { + workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit) + if filter == nil || filter(workflowExecutionInfo) { + hits = append(hits, workflowExecutionInfo) + } + } } - return &v6BulkProcessor{ - processor: processor, - }, nil + return hits } func buildPutMappingBodyV6(root, key, valueType string) map[string]interface{} { @@ -564,16 +344,7 @@ func (c *elasticV6) getListWorkflowExecutionsResponse( response := &p.InternalListWorkflowExecutionsResponse{} actualHits := searchHits.Hits numOfActualHits := len(actualHits) - - response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - for i := 0; i < numOfActualHits; i++ { - workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(actualHits[i]) - if isRecordValid == nil || isRecordValid(workflowExecutionInfo) { - // for old APIs like ListOpenWorkflowExecutions, we added 1 ms to range query to overcome ES limitation - // (see getSearchResult function), but manually dropped records beyond request range here. - response.Executions = append(response.Executions, workflowExecutionInfo) - } - } + response.Executions = c.esHitsToExecutions(searchHits, isRecordValid) if numOfActualHits == pageSize { // this means the response is not the last page var nextPageToken []byte @@ -637,34 +408,6 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit return record } -func (c *elasticV6) getScanWorkflowExecutionsResponse( - searchHits *elastic.SearchHits, - pageSize int, scrollID string, - isLastPage bool, -) (*p.InternalListWorkflowExecutionsResponse, error) { - - response := &p.InternalListWorkflowExecutionsResponse{} - actualHits := searchHits.Hits - numOfActualHits := len(actualHits) - - response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - for i := 0; i < numOfActualHits; i++ { - workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(actualHits[i]) - response.Executions = append(response.Executions, workflowExecutionInfo) - } - - if numOfActualHits == pageSize && !isLastPage { - nextPageToken, err := SerializePageToken(&ElasticVisibilityPageToken{ScrollID: scrollID}) - if err != nil { - return nil, err - } - response.NextPageToken = make([]byte, len(nextPageToken)) - copy(response.NextPageToken, nextPageToken) - } - - return response, nil -} - func (c *elasticV6) getSearchResult( ctx context.Context, index string, @@ -674,13 +417,11 @@ func (c *elasticV6) getSearchResult( token *ElasticVisibilityPageToken, ) (*elastic.SearchResult, error) { - matchDomainQuery := elastic.NewMatchQuery(DomainID, request.DomainUUID) - existClosedStatusQuery := elastic.NewExistsQuery(CloseStatus) - var rangeQuery *elastic.RangeQuery - if isOpen { - rangeQuery = elastic.NewRangeQuery(StartTime) - } else { - rangeQuery = elastic.NewRangeQuery(CloseTime) + // always match domain id + boolQuery := elastic.NewBoolQuery().Must(elastic.NewMatchQuery(DomainID, request.DomainUUID)) + + if matchQuery != nil { + boolQuery = boolQuery.Must(matchQuery) } // ElasticSearch v6 is unable to precisely compare time, have to manually add resolution 1ms to time range. // Also has to use string instead of int64 to avoid data conversion issue, @@ -691,33 +432,30 @@ func (c *elasticV6) getSearchResult( if request.EarliestTime.UnixNano() < math.MinInt64+oneMicroSecondInNano { // prevent earliestTime overflow request.EarliestTime = time.Unix(0, math.MinInt64+oneMicroSecondInNano) } - earliestTimeStr := strconv.FormatInt(request.EarliestTime.UnixNano()-oneMicroSecondInNano, 10) - latestTimeStr := strconv.FormatInt(request.LatestTime.UnixNano()+oneMicroSecondInNano, 10) - rangeQuery = rangeQuery. - Gte(earliestTimeStr). - Lte(latestTimeStr) - boolQuery := elastic.NewBoolQuery().Must(matchDomainQuery).Filter(rangeQuery) - if matchQuery != nil { - boolQuery = boolQuery.Must(matchQuery) - } + var rangeQueryField string + existClosedStatusQuery := elastic.NewExistsQuery(CloseStatus) if isOpen { + rangeQueryField = StartTime boolQuery = boolQuery.MustNot(existClosedStatusQuery) } else { boolQuery = boolQuery.Must(existClosedStatusQuery) + rangeQueryField = CloseTime } + earliestTimeStr := strconv.FormatInt(request.EarliestTime.UnixNano()-oneMicroSecondInNano, 10) + latestTimeStr := strconv.FormatInt(request.LatestTime.UnixNano()+oneMicroSecondInNano, 10) + rangeQuery := elastic.NewRangeQuery(rangeQueryField).Gte(earliestTimeStr).Lte(latestTimeStr) + boolQuery = boolQuery.Filter(rangeQuery) + params := &searchParametersV6{ Index: index, Query: boolQuery, From: token.From, PageSize: request.PageSize, } - if isOpen { - params.Sorter = append(params.Sorter, elastic.NewFieldSort(StartTime).Desc()) - } else { - params.Sorter = append(params.Sorter, elastic.NewFieldSort(CloseTime).Desc()) - } + + params.Sorter = append(params.Sorter, elastic.NewFieldSort(rangeQueryField).Desc()) params.Sorter = append(params.Sorter, elastic.NewFieldSort(RunID).Desc()) if ShouldSearchAfter(token) { diff --git a/common/elasticsearch/client_v6_bulk.go b/common/elasticsearch/client_v6_bulk.go new file mode 100644 index 00000000000..dcdeab3dd34 --- /dev/null +++ b/common/elasticsearch/client_v6_bulk.go @@ -0,0 +1,183 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package elasticsearch + +import ( + "context" + + "github.com/olivere/elastic" +) + +var _ GenericBulkProcessor = (*v6BulkProcessor)(nil) + +type v6BulkProcessor struct { + processor *elastic.BulkProcessor +} + +func (c *elasticV6) RunBulkProcessor(ctx context.Context, parameters *BulkProcessorParameters) (GenericBulkProcessor, error) { + beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { + parameters.BeforeFunc(executionId, fromV6ToGenericBulkableRequests(requests)) + } + + afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + gerr := convertV6ErrorToGenericError(err) + parameters.AfterFunc( + executionId, + fromV6ToGenericBulkableRequests(requests), + fromV6toGenericBulkResponse(response), + gerr) + } + + processor, err := c.client.BulkProcessor(). + Name(parameters.Name). + Workers(parameters.NumOfWorkers). + BulkActions(parameters.BulkActions). + BulkSize(parameters.BulkSize). + FlushInterval(parameters.FlushInterval). + Backoff(parameters.Backoff). + Before(beforeFunc). + After(afterFunc). + Do(ctx) + if err != nil { + return nil, err + } + + return &v6BulkProcessor{ + processor: processor, + }, nil +} + +func (v *v6BulkProcessor) Start(ctx context.Context) error { + return v.processor.Start(ctx) +} + +func (v *v6BulkProcessor) Stop() error { + return v.processor.Stop() +} + +func (v *v6BulkProcessor) Close() error { + return v.processor.Close() +} + +func (v *v6BulkProcessor) Add(request *GenericBulkableAddRequest) { + var req elastic.BulkableRequest + switch request.RequestType { + case BulkableDeleteRequest: + req = elastic.NewBulkDeleteRequest(). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version) + case BulkableIndexRequest: + req = elastic.NewBulkIndexRequest(). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version). + Doc(request.Doc) + case BulkableCreateRequest: + //for bulk create request still calls the bulk index method + //with providing operation type + req = elastic.NewBulkIndexRequest(). + OpType("create"). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType("internal"). + Doc(request.Doc) + } + v.processor.Add(req) +} + +func (v *v6BulkProcessor) Flush() error { + return v.processor.Flush() +} + +func convertV6ErrorToGenericError(err error) *GenericError { + if err == nil { + return nil + } + status := unknownStatusCode + switch e := err.(type) { + case *elastic.Error: + status = e.Status + } + return &GenericError{ + Status: status, + Details: err, + } +} + +func fromV6toGenericBulkResponse(response *elastic.BulkResponse) *GenericBulkResponse { + if response == nil { + return &GenericBulkResponse{} + } + return &GenericBulkResponse{ + Took: response.Took, + Errors: response.Errors, + Items: fromV6ToGenericBulkResponseItemMaps(response.Items), + } +} + +func fromV6ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*GenericBulkResponseItem { + var gitems []map[string]*GenericBulkResponseItem + for _, it := range items { + gitems = append(gitems, fromV6ToGenericBulkResponseItemMap(it)) + } + return gitems +} + +func fromV6ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*GenericBulkResponseItem { + if m == nil { + return nil + } + gm := make(map[string]*GenericBulkResponseItem, len(m)) + for k, v := range m { + gm[k] = fromV6ToGenericBulkResponseItem(v) + } + return gm +} + +func fromV6ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkResponseItem { + return &GenericBulkResponseItem{ + Index: v.Index, + Type: v.Type, + ID: v.Id, + Version: v.Version, + Result: v.Result, + SeqNo: v.SeqNo, + PrimaryTerm: v.PrimaryTerm, + Status: v.Status, + ForcedRefresh: v.ForcedRefresh, + } +} + +func fromV6ToGenericBulkableRequests(requests []elastic.BulkableRequest) []GenericBulkableRequest { + var v6Reqs []GenericBulkableRequest + for _, req := range requests { + v6Reqs = append(v6Reqs, req) + } + return v6Reqs +} diff --git a/common/elasticsearch/client_v7.go b/common/elasticsearch/client_v7.go index cb7b18dbcc1..3e06c4e4d0a 100644 --- a/common/elasticsearch/client_v7.go +++ b/common/elasticsearch/client_v7.go @@ -30,11 +30,7 @@ import ( "strconv" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" "github.com/olivere/elastic/v7" - esaws "github.com/olivere/elastic/v7/aws/v4" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" @@ -46,14 +42,12 @@ import ( ) var _ GenericClient = (*elasticV7)(nil) -var _ GenericBulkProcessor = (*v7BulkProcessor)(nil) type ( // elasticV7 implements Client elasticV7 struct { - client *elastic.Client - logger log.Logger - serializer p.PayloadSerializer + client *elastic.Client + logger log.Logger } // searchParametersV7 holds all required and optional parameters for executing a search @@ -65,23 +59,13 @@ type ( Sorter []elastic.Sorter SearchAfter []interface{} } - - // bulkProcessorParametersV7 holds all required and optional parameters for executing bulk service - bulkProcessorParametersV7 struct { - Name string - NumOfWorkers int - BulkActions int - BulkSize int - FlushInterval time.Duration - Backoff elastic.Backoff - BeforeFunc elastic.BulkBeforeFunc - AfterFunc elastic.BulkAfterFunc - } ) // NewV7Client returns a new implementation of GenericClient func NewV7Client( connectConfig *config.ElasticSearchConfig, + tlsClient *http.Client, + awsSigningClient *http.Client, logger log.Logger, clientOptFuncs ...elastic.ClientOptionFunc, ) (GenericClient, error) { @@ -96,63 +80,26 @@ func NewV7Client( if connectConfig.DisableHealthCheck { clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false)) } - if connectConfig.AWSSigning.Enable { - if err := config.CheckAWSSigningConfig(connectConfig.AWSSigning); err != nil { - return nil, err - } - var signingClient *http.Client - var err error - if connectConfig.AWSSigning.EnvironmentCredential != nil { - signingClient, err = buildSigningHTTPClientFromEnvironmentCredentialV7(*connectConfig.AWSSigning.EnvironmentCredential) - } else { - signingClient, err = buildSigningHTTPClientFromStaticCredentialV7(*connectConfig.AWSSigning.StaticCredential) - } - if err != nil { - return nil, err - } - clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(signingClient)) + + if awsSigningClient != nil { + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(awsSigningClient)) } - if connectConfig.TLS.Enabled { - var tlsClient *http.Client - var err error - tlsClient, err = buildTLSHTTPClient(connectConfig.TLS) - if err != nil { - return nil, err - } + + if tlsClient != nil { clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(tlsClient)) } + client, err := elastic.NewClient(clientOptFuncs...) if err != nil { return nil, err } return &elasticV7{ - client: client, - logger: logger, - serializer: p.NewPayloadSerializer(), + client: client, + logger: logger, }, nil } -// refer to https://github.com/olivere/elastic/blob/release-branch.v7/recipes/aws-connect-v4/main.go -func buildSigningHTTPClientFromStaticCredentialV7(credentialConfig config.AWSStaticCredential) (*http.Client, error) { - awsCredentials := credentials.NewStaticCredentials( - credentialConfig.AccessKey, - credentialConfig.SecretKey, - credentialConfig.SessionToken, - ) - return esaws.NewV4SigningClient(awsCredentials, credentialConfig.Region), nil -} - -func buildSigningHTTPClientFromEnvironmentCredentialV7(credentialConfig config.AWSEnvironmentCredential) (*http.Client, error) { - sess, err := session.NewSession(&aws.Config{ - Region: aws.String(credentialConfig.Region)}, - ) - if err != nil { - return nil, err - } - return esaws.NewV4SigningClient(sess.Config.Credentials, credentialConfig.Region), nil -} - func (c *elasticV7) IsNotFoundError(err error) bool { return elastic.IsNotFound(err) } @@ -174,17 +121,16 @@ func (c *elasticV7) CountByQuery(ctx context.Context, index, query string) (int6 } func (c *elasticV7) Search(ctx context.Context, request *SearchRequest) (*p.InternalListWorkflowExecutionsResponse, error) { + token, err := GetNextPageToken(request.ListRequest.NextPageToken) + if err != nil { + return nil, err + } var matchQuery *elastic.MatchQuery if request.MatchQuery != nil { matchQuery = elastic.NewMatchQuery(request.MatchQuery.Name, request.MatchQuery.Text) } - token, err := GetNextPageToken(request.ListRequest.NextPageToken) - if err != nil { - return nil, err - } - searchResult, err := c.getSearchResult( ctx, request.Index, @@ -202,12 +148,11 @@ func (c *elasticV7) Search(ctx context.Context, request *SearchRequest) (*p.Inte } func (c *elasticV7) SearchByQuery(ctx context.Context, request *SearchByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { - searchResult, err := c.searchWithDSL(ctx, request.Index, request.Query) + token, err := GetNextPageToken(request.NextPageToken) if err != nil { return nil, err } - - token, err := GetNextPageToken(request.NextPageToken) + searchResult, err := c.client.Search(request.Index).Source(request.Query).Do(ctx) if err != nil { return nil, err } @@ -221,73 +166,46 @@ func (c *elasticV7) ScanByQuery(ctx context.Context, request *ScanByQueryRequest if err != nil { return nil, err } - - var searchResult *elastic.SearchResult - var scrollService *elastic.ScrollService - - if len(token.ScrollID) == 0 { // first call - searchResult, scrollService, err = c.scrollFirstPage(ctx, request.Index, request.Query) - } else { - searchResult, scrollService, err = c.scroll(ctx, token.ScrollID) - } + searchResult, err := c.scroll(ctx, request.Index, request.Query, token.ScrollID) isLastPage := false if err == io.EOF { // no more result isLastPage = true - if scrollService != nil { - err := scrollService.Clear(ctx) - if err != nil { - c.logger.Warn("scrollService Clear fail", tag.Error(err)) - } + if err := c.clearScroll(ctx, searchResult.ScrollId); err != nil { + c.logger.Warn("scroll clear failed", tag.Error(err)) } } else if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("ScanByQuery failed. Error: %v", err), } } + response := &p.InternalListWorkflowExecutionsResponse{} + actualHits := searchResult.Hits.Hits + numOfActualHits := len(actualHits) + response.Executions = c.esHitsToExecutions(searchResult.Hits, nil /* no filter */) - return c.getScanWorkflowExecutionsResponse(searchResult.Hits, request.PageSize, searchResult.ScrollId, isLastPage) -} + if numOfActualHits == request.PageSize && !isLastPage { + nextPageToken, err := SerializePageToken(&ElasticVisibilityPageToken{ScrollID: searchResult.ScrollId}) + if err != nil { + return nil, err + } + response.NextPageToken = make([]byte, len(nextPageToken)) + copy(response.NextPageToken, nextPageToken) + } -func (c *elasticV7) RunBulkProcessor(ctx context.Context, parameters *BulkProcessorParameters) (GenericBulkProcessor, error) { - beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { - parameters.BeforeFunc(executionId, fromV7ToGenericBulkableRequests(requests)) - } - - afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { - gerr := convertV7ErrorToGenericError(err) - parameters.AfterFunc( - executionId, - fromV7ToGenericBulkableRequests(requests), - fromV7toGenericBulkResponse(response), - gerr) - } - - return c.runBulkProcessor(ctx, &bulkProcessorParametersV7{ - Name: parameters.Name, - NumOfWorkers: parameters.NumOfWorkers, - BulkActions: parameters.BulkActions, - BulkSize: parameters.BulkSize, - FlushInterval: parameters.FlushInterval, - Backoff: parameters.Backoff, - BeforeFunc: beforeFunc, - AfterFunc: afterFunc, - }) + return response, nil } -func convertV7ErrorToGenericError(err error) *GenericError { - if err == nil { - return nil - } - status := unknownStatusCode - switch e := err.(type) { - case *elastic.Error: - status = e.Status - } - return &GenericError{ - Status: status, - Details: err, +func (c *elasticV7) scroll(ctx context.Context, index, query, scrollID string) (*elastic.SearchResult, error) { + scrollService := elastic.NewScrollService(c.client) + if len(scrollID) == 0 { + return scrollService.Index(index).Body(query).Do(ctx) } + return scrollService.ScrollId(scrollID).Do(ctx) +} + +func (c *elasticV7) clearScroll(ctx context.Context, scrollID string) error { + return elastic.NewScrollService(c.client).ScrollId(scrollID).Clear(ctx) } func (c *elasticV7) SearchForOneClosedExecution( @@ -327,58 +245,6 @@ func (c *elasticV7) SearchForOneClosedExecution( return response, nil } -func fromV7toGenericBulkResponse(response *elastic.BulkResponse) *GenericBulkResponse { - if response == nil { - return &GenericBulkResponse{} - } - return &GenericBulkResponse{ - Took: response.Took, - Errors: response.Errors, - Items: fromV7ToGenericBulkResponseItemMaps(response.Items), - } -} - -func fromV7ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*GenericBulkResponseItem { - var gitems []map[string]*GenericBulkResponseItem - for _, it := range items { - gitems = append(gitems, fromV7ToGenericBulkResponseItemMap(it)) - } - return gitems -} - -func fromV7ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*GenericBulkResponseItem { - if m == nil { - return nil - } - gm := make(map[string]*GenericBulkResponseItem, len(m)) - for k, v := range m { - gm[k] = fromV7ToGenericBulkResponseItem(v) - } - return gm -} - -func fromV7ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkResponseItem { - return &GenericBulkResponseItem{ - Index: v.Index, - Type: v.Type, - ID: v.Id, - Version: v.Version, - Result: v.Result, - SeqNo: v.SeqNo, - PrimaryTerm: v.PrimaryTerm, - Status: v.Status, - ForcedRefresh: v.ForcedRefresh, - } -} - -func fromV7ToGenericBulkableRequests(requests []elastic.BulkableRequest) []GenericBulkableRequest { - var v7Reqs []GenericBulkableRequest - for _, req := range requests { - v7Reqs = append(v7Reqs, req) - } - return v7Reqs -} - func (c *elasticV7) search(ctx context.Context, p *searchParametersV7) (*elastic.SearchResult, error) { searchService := c.client.Search(p.Index). Query(p.Query). @@ -416,109 +282,27 @@ func (c *elasticV7) SearchRaw(ctx context.Context, index string, query string) ( result := RawResponse{ TookInMillis: esResult.TookInMillis, + Hits: SearchHits{ + TotalHits: esResult.TotalHits(), + Hits: c.esHitsToExecutions(esResult.Hits, nil /*no filter*/), + }, Aggregations: esResult.Aggregations, } - result.Hits = SearchHits{ - TotalHits: esResult.TotalHits(), - } - if esResult.Hits != nil && len(esResult.Hits.Hits) > 0 { - result.Hits.Hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0, len(esResult.Hits.Hits)) - for _, hit := range esResult.Hits.Hits { - workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit) - result.Hits.Hits = append(result.Hits.Hits, workflowExecutionInfo) - } - } - return &result, nil } -func (c *elasticV7) searchWithDSL(ctx context.Context, index, query string) (*elastic.SearchResult, error) { - return c.client.Search(index).Source(query).Do(ctx) -} - -func (c *elasticV7) scroll(ctx context.Context, scrollID string) ( - *elastic.SearchResult, *elastic.ScrollService, error) { - - scrollService := elastic.NewScrollService(c.client) - result, err := scrollService.ScrollId(scrollID).Do(ctx) - return result, scrollService, err -} - -func (c *elasticV7) scrollFirstPage(ctx context.Context, index, query string) ( - *elastic.SearchResult, *elastic.ScrollService, error) { - - scrollService := elastic.NewScrollService(c.client) - result, err := scrollService.Index(index).Body(query).Do(ctx) - return result, scrollService, err -} - -type v7BulkProcessor struct { - processor *elastic.BulkProcessor -} - -func (v *v7BulkProcessor) Start(ctx context.Context) error { - return v.processor.Start(ctx) -} - -func (v *v7BulkProcessor) Stop() error { - return v.processor.Stop() -} - -func (v *v7BulkProcessor) Close() error { - return v.processor.Close() -} - -func (v *v7BulkProcessor) Add(request *GenericBulkableAddRequest) { - var req elastic.BulkableRequest - switch request.RequestType { - case BulkableDeleteRequest: - req = elastic.NewBulkDeleteRequest(). - Index(request.Index). - Id(request.ID). - VersionType(request.VersionType). - Version(request.Version) - case BulkableIndexRequest: - req = elastic.NewBulkIndexRequest(). - Index(request.Index). - Id(request.ID). - VersionType(request.VersionType). - Version(request.Version). - Doc(request.Doc) - case BulkableCreateRequest: - //for bulk create request still calls the bulk index method - //with providing operation type - req = elastic.NewBulkIndexRequest(). - OpType("create"). - Index(request.Index). - Id(request.ID). - VersionType("internal"). - Doc(request.Doc) - } - v.processor.Add(req) -} - -func (v *v7BulkProcessor) Flush() error { - return v.processor.Flush() -} - -func (c *elasticV7) runBulkProcessor(ctx context.Context, p *bulkProcessorParametersV7) (*v7BulkProcessor, error) { - processor, err := c.client.BulkProcessor(). - Name(p.Name). - Workers(p.NumOfWorkers). - BulkActions(p.BulkActions). - BulkSize(p.BulkSize). - FlushInterval(p.FlushInterval). - Backoff(p.Backoff). - Before(p.BeforeFunc). - After(p.AfterFunc). - Do(ctx) - if err != nil { - return nil, err +func (c *elasticV7) esHitsToExecutions(eshits *elastic.SearchHits, filter IsRecordValidFilter) []*p.InternalVisibilityWorkflowExecutionInfo { + var hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) + if eshits != nil && len(eshits.Hits) > 0 { + for _, hit := range eshits.Hits { + workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit) + if filter == nil || filter(workflowExecutionInfo) { + hits = append(hits, workflowExecutionInfo) + } + } } - return &v7BulkProcessor{ - processor: processor, - }, nil + return hits } func buildPutMappingBodyV7(root, key, valueType string) map[string]interface{} { @@ -627,34 +411,6 @@ func (c *elasticV7) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit return record } -func (c *elasticV7) getScanWorkflowExecutionsResponse( - searchHits *elastic.SearchHits, - pageSize int, scrollID string, - isLastPage bool, -) (*p.InternalListWorkflowExecutionsResponse, error) { - - response := &p.InternalListWorkflowExecutionsResponse{} - actualHits := searchHits.Hits - numOfActualHits := len(actualHits) - - response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - for i := 0; i < numOfActualHits; i++ { - workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(actualHits[i]) - response.Executions = append(response.Executions, workflowExecutionInfo) - } - - if numOfActualHits == pageSize && !isLastPage { - nextPageToken, err := SerializePageToken(&ElasticVisibilityPageToken{ScrollID: scrollID}) - if err != nil { - return nil, err - } - response.NextPageToken = make([]byte, len(nextPageToken)) - copy(response.NextPageToken, nextPageToken) - } - - return response, nil -} - func (c *elasticV7) getSearchResult( ctx context.Context, index string, @@ -664,15 +420,13 @@ func (c *elasticV7) getSearchResult( token *ElasticVisibilityPageToken, ) (*elastic.SearchResult, error) { - matchDomainQuery := elastic.NewMatchQuery(DomainID, request.DomainUUID) - existClosedStatusQuery := elastic.NewExistsQuery(CloseStatus) - var rangeQuery *elastic.RangeQuery - if isOpen { - rangeQuery = elastic.NewRangeQuery(StartTime) - } else { - rangeQuery = elastic.NewRangeQuery(CloseTime) + // always match domain id + boolQuery := elastic.NewBoolQuery().Must(elastic.NewMatchQuery(DomainID, request.DomainUUID)) + + if matchQuery != nil { + boolQuery = boolQuery.Must(matchQuery) } - // ElasticSearch v7 is unable to precisely compare time, have to manually add resolution 1ms to time range. + // ElasticSearch v6 is unable to precisely compare time, have to manually add resolution 1ms to time range. // Also has to use string instead of int64 to avoid data conversion issue, // 9223372036854775807 to 9223372036854776000 (long overflow) if request.LatestTime.UnixNano() > math.MaxInt64-oneMicroSecondInNano { // prevent latestTime overflow @@ -681,38 +435,34 @@ func (c *elasticV7) getSearchResult( if request.EarliestTime.UnixNano() < math.MinInt64+oneMicroSecondInNano { // prevent earliestTime overflow request.EarliestTime = time.Unix(0, math.MinInt64+oneMicroSecondInNano) } - earliestTimeStr := strconv.FormatInt(request.EarliestTime.UnixNano()-oneMicroSecondInNano, 10) - latestTimeStr := strconv.FormatInt(request.LatestTime.UnixNano()+oneMicroSecondInNano, 10) - rangeQuery = rangeQuery. - Gte(earliestTimeStr). - Lte(latestTimeStr) - boolQuery := elastic.NewBoolQuery().Must(matchDomainQuery).Filter(rangeQuery) - if matchQuery != nil { - boolQuery = boolQuery.Must(matchQuery) - } + var rangeQueryField string + existClosedStatusQuery := elastic.NewExistsQuery(CloseStatus) if isOpen { + rangeQueryField = StartTime boolQuery = boolQuery.MustNot(existClosedStatusQuery) } else { boolQuery = boolQuery.Must(existClosedStatusQuery) + rangeQueryField = CloseTime } + earliestTimeStr := strconv.FormatInt(request.EarliestTime.UnixNano()-oneMicroSecondInNano, 10) + latestTimeStr := strconv.FormatInt(request.LatestTime.UnixNano()+oneMicroSecondInNano, 10) + rangeQuery := elastic.NewRangeQuery(rangeQueryField).Gte(earliestTimeStr).Lte(latestTimeStr) + boolQuery = boolQuery.Filter(rangeQuery) + params := &searchParametersV7{ Index: index, Query: boolQuery, From: token.From, PageSize: request.PageSize, } - if isOpen { - params.Sorter = append(params.Sorter, elastic.NewFieldSort(StartTime).Desc()) - } else { - params.Sorter = append(params.Sorter, elastic.NewFieldSort(CloseTime).Desc()) - } + + params.Sorter = append(params.Sorter, elastic.NewFieldSort(rangeQueryField).Desc()) params.Sorter = append(params.Sorter, elastic.NewFieldSort(RunID).Desc()) if ShouldSearchAfter(token) { params.SearchAfter = []interface{}{token.SortValue, token.TieBreaker} } - return c.search(ctx, params) } diff --git a/common/elasticsearch/client_v7_bulk.go b/common/elasticsearch/client_v7_bulk.go new file mode 100644 index 00000000000..298aa005473 --- /dev/null +++ b/common/elasticsearch/client_v7_bulk.go @@ -0,0 +1,180 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package elasticsearch + +import ( + "context" + + "github.com/olivere/elastic/v7" +) + +var _ GenericBulkProcessor = (*v7BulkProcessor)(nil) + +type v7BulkProcessor struct { + processor *elastic.BulkProcessor +} + +func (c *elasticV7) RunBulkProcessor(ctx context.Context, parameters *BulkProcessorParameters) (GenericBulkProcessor, error) { + beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { + parameters.BeforeFunc(executionId, fromV7ToGenericBulkableRequests(requests)) + } + + afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + gerr := convertV7ErrorToGenericError(err) + parameters.AfterFunc( + executionId, + fromV7ToGenericBulkableRequests(requests), + fromV7toGenericBulkResponse(response), + gerr) + } + + processor, err := c.client.BulkProcessor(). + Name(parameters.Name). + Workers(parameters.NumOfWorkers). + BulkActions(parameters.BulkActions). + BulkSize(parameters.BulkSize). + FlushInterval(parameters.FlushInterval). + Backoff(parameters.Backoff). + Before(beforeFunc). + After(afterFunc). + Do(ctx) + if err != nil { + return nil, err + } + + return &v7BulkProcessor{ + processor: processor, + }, nil +} + +func (v *v7BulkProcessor) Flush() error { + return v.processor.Flush() +} + +func (v *v7BulkProcessor) Start(ctx context.Context) error { + return v.processor.Start(ctx) +} + +func (v *v7BulkProcessor) Stop() error { + return v.processor.Stop() +} + +func (v *v7BulkProcessor) Close() error { + return v.processor.Close() +} + +func (v *v7BulkProcessor) Add(request *GenericBulkableAddRequest) { + var req elastic.BulkableRequest + switch request.RequestType { + case BulkableDeleteRequest: + req = elastic.NewBulkDeleteRequest(). + Index(request.Index). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version) + case BulkableIndexRequest: + req = elastic.NewBulkIndexRequest(). + Index(request.Index). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version). + Doc(request.Doc) + case BulkableCreateRequest: + //for bulk create request still calls the bulk index method + //with providing operation type + req = elastic.NewBulkIndexRequest(). + OpType("create"). + Index(request.Index). + Id(request.ID). + VersionType("internal"). + Doc(request.Doc) + } + v.processor.Add(req) +} + +func convertV7ErrorToGenericError(err error) *GenericError { + if err == nil { + return nil + } + status := unknownStatusCode + switch e := err.(type) { + case *elastic.Error: + status = e.Status + } + return &GenericError{ + Status: status, + Details: err, + } +} + +func fromV7toGenericBulkResponse(response *elastic.BulkResponse) *GenericBulkResponse { + if response == nil { + return &GenericBulkResponse{} + } + return &GenericBulkResponse{ + Took: response.Took, + Errors: response.Errors, + Items: fromV7ToGenericBulkResponseItemMaps(response.Items), + } +} + +func fromV7ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*GenericBulkResponseItem { + var gitems []map[string]*GenericBulkResponseItem + for _, it := range items { + gitems = append(gitems, fromV7ToGenericBulkResponseItemMap(it)) + } + return gitems +} + +func fromV7ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*GenericBulkResponseItem { + if m == nil { + return nil + } + gm := make(map[string]*GenericBulkResponseItem, len(m)) + for k, v := range m { + gm[k] = fromV7ToGenericBulkResponseItem(v) + } + return gm +} + +func fromV7ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkResponseItem { + return &GenericBulkResponseItem{ + Index: v.Index, + Type: v.Type, + ID: v.Id, + Version: v.Version, + Result: v.Result, + SeqNo: v.SeqNo, + PrimaryTerm: v.PrimaryTerm, + Status: v.Status, + ForcedRefresh: v.ForcedRefresh, + } +} + +func fromV7ToGenericBulkableRequests(requests []elastic.BulkableRequest) []GenericBulkableRequest { + var v7Reqs []GenericBulkableRequest + for _, req := range requests { + v7Reqs = append(v7Reqs, req) + } + return v7Reqs +} diff --git a/common/elasticsearch/common.go b/common/elasticsearch/common.go index b44c3fed74f..6d8e366f082 100644 --- a/common/elasticsearch/common.go +++ b/common/elasticsearch/common.go @@ -24,6 +24,11 @@ import ( "net/http" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + esaws "github.com/olivere/elastic/aws/v4" + "github.com/uber/cadence/common/config" ) @@ -52,6 +57,18 @@ func buildTLSHTTPClient(config config.TLS) (*http.Client, error) { return tlsClient, nil } +func buildAWSSigningClient(awsconfig config.AWSSigning) (*http.Client, error) { + if err := config.CheckAWSSigningConfig(awsconfig); err != nil { + return nil, err + } + + if awsconfig.EnvironmentCredential != nil { + return signingClientFromEnv(*awsconfig.EnvironmentCredential) + } + + return signingClientFromStatic(*awsconfig.StaticCredential) +} + func GetESDocIDSizeLimit() int { return esDocIDSizeLimit } @@ -67,3 +84,23 @@ func GetESDocDelimiter() string { func GenerateDocID(wid, rid string) string { return wid + esDocIDDelimiter + rid } + +// refer to https://github.com/olivere/elastic/blob/release-branch.v7/recipes/aws-connect-v4/main.go +func signingClientFromStatic(credentialConfig config.AWSStaticCredential) (*http.Client, error) { + awsCredentials := credentials.NewStaticCredentials( + credentialConfig.AccessKey, + credentialConfig.SecretKey, + credentialConfig.SessionToken, + ) + return esaws.NewV4SigningClient(awsCredentials, credentialConfig.Region), nil +} + +func signingClientFromEnv(credentialConfig config.AWSEnvironmentCredential) (*http.Client, error) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(credentialConfig.Region)}, + ) + if err != nil { + return nil, err + } + return esaws.NewV4SigningClient(sess.Config.Credentials, credentialConfig.Region), nil +} diff --git a/common/elasticsearch/interfaces.go b/common/elasticsearch/interfaces.go index da6ed5f950f..5c0fa48f5da 100644 --- a/common/elasticsearch/interfaces.go +++ b/common/elasticsearch/interfaces.go @@ -24,6 +24,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "time" workflow "github.com/uber/cadence/.gen/go/shared" @@ -40,11 +41,30 @@ func NewGenericClient( if connectConfig.Version == "" { connectConfig.Version = "v6" } + var tlsClient *http.Client + var signingAWSClient *http.Client + + if connectConfig.AWSSigning.Enable { + var err error + signingAWSClient, err = buildAWSSigningClient(connectConfig.AWSSigning) + if err != nil { + return nil, err + } + } + + if connectConfig.TLS.Enabled { + var err error + tlsClient, err = buildTLSHTTPClient(connectConfig.TLS) + if err != nil { + return nil, err + } + } + switch connectConfig.Version { case "v6": - return NewV6Client(connectConfig, logger) + return NewV6Client(connectConfig, tlsClient, signingAWSClient, logger) case "v7": - return NewV7Client(connectConfig, logger) + return NewV7Client(connectConfig, tlsClient, signingAWSClient, logger) default: return nil, fmt.Errorf("not supported ElasticSearch version: %v", connectConfig.Version) }