Skip to content

Commit

Permalink
Create ElasticSearch client via factory (uber#4660)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Nov 30, 2021
1 parent c7727c0 commit 75a992a
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 66 deletions.
12 changes: 0 additions & 12 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,6 @@ func newAdminElasticSearchCommands() []cli.Command {
Name: FlagURL,
Usage: "URL of ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagMuttleyDestinationWithAlias,
Usage: "Optional muttely destination to ElasticSearch cluster",
},
},
Action: func(c *cli.Context) {
AdminCatIndices(c)
Expand All @@ -552,10 +548,6 @@ func newAdminElasticSearchCommands() []cli.Command {
Name: FlagURL,
Usage: "URL of ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagMuttleyDestinationWithAlias,
Usage: "Optional muttely destination to ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagIndex,
Usage: "ElasticSearch target index",
Expand Down Expand Up @@ -583,10 +575,6 @@ func newAdminElasticSearchCommands() []cli.Command {
Name: FlagURL,
Usage: "URL of ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagMuttleyDestinationWithAlias,
Usage: "Optional muttely destination to ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagIndex,
Usage: "ElasticSearch target index",
Expand Down
55 changes: 3 additions & 52 deletions tools/cli/adminElasticSearchCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"encoding/json"
"fmt"
"math"
"net/http"
"os"
"strconv"
"strings"
Expand All @@ -51,19 +50,6 @@ const (
versionTypeExternal = "external"
)

const (
headerSource = "rpc-caller"
headerDestination = "rpc-service"
)

// muttleyTransport wraps around default http.Transport to add muttley specific headers to all requests
type muttleyTransport struct {
http.Transport

source string
destination string
}

var timeKeys = map[string]bool{
"StartTime": true,
"CloseTime": true,
Expand All @@ -87,44 +73,9 @@ func timeValProcess(timeStr string) (string, error) {
return fmt.Sprintf("%v", parsedTime.UnixNano()), nil
}

func (t *muttleyTransport) RoundTrip(r *http.Request) (*http.Response, error) {
r.Header.Set(headerSource, t.source)
r.Header.Set(headerDestination, t.destination)
return t.Transport.RoundTrip(r)
}

func getESClient(c *cli.Context) *elastic.Client {
url := getRequiredOption(c, FlagURL)
var client *elastic.Client
var err error
retrier := elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))
if c.IsSet(FlagMuttleyDestination) {
httpClient := &http.Client{
Transport: &muttleyTransport{
source: "cadence-cli",
destination: c.String(FlagMuttleyDestination),
},
}
client, err = elastic.NewClient(
elastic.SetHttpClient(httpClient),
elastic.SetURL(url),
elastic.SetRetrier(retrier),
)
} else {
client, err = elastic.NewClient(
elastic.SetURL(url),
elastic.SetRetrier(retrier),
)
}
if err != nil {
ErrorAndExit("Unable to create ElasticSearch client", err)
}
return client
}

// AdminCatIndices cat indices for ES cluster
func AdminCatIndices(c *cli.Context) {
esClient := getESClient(c)
esClient := cFactory.ElasticSearchClient(c)

ctx := context.Background()
resp, err := esClient.CatIndices().Do(ctx)
Expand Down Expand Up @@ -153,7 +104,7 @@ func AdminCatIndices(c *cli.Context) {

// AdminIndex used to bulk insert message from kafka parse
func AdminIndex(c *cli.Context) {
esClient := getESClient(c)
esClient := cFactory.ElasticSearchClient(c)
indexName := getRequiredOption(c, FlagIndex)
inputFileName := getRequiredOption(c, FlagInputFile)
batchSize := c.Int(FlagBatchSize)
Expand Down Expand Up @@ -209,7 +160,7 @@ func AdminIndex(c *cli.Context) {

// AdminDelete used to delete documents from ElasticSearch with input of list result
func AdminDelete(c *cli.Context) {
esClient := getESClient(c)
esClient := cFactory.ElasticSearchClient(c)
indexName := getRequiredOption(c, FlagIndex)
inputFileName := getRequiredOption(c, FlagInputFile)
batchSize := c.Int(FlagBatchSize)
Expand Down
5 changes: 5 additions & 0 deletions tools/cli/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/olekukonko/tablewriter"
"github.com/olivere/elastic"
"github.com/pborman/uuid"
"github.com/stretchr/testify/suite"
"github.com/urfave/cli"
Expand Down Expand Up @@ -70,6 +71,10 @@ func (m *clientFactoryMock) ServerAdminClient(c *cli.Context) admin.Client {
return m.serverAdminClient
}

func (m *clientFactoryMock) ElasticSearchClient(c *cli.Context) *elastic.Client {
panic("not implemented")
}

// this is the mock for yarpcCallOptions, make sure length are the same
var callOptions = []interface{}{gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()}

Expand Down
20 changes: 20 additions & 0 deletions tools/cli/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package cli

import (
"context"
"time"

"github.com/olivere/elastic"
"github.com/urfave/cli"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
Expand Down Expand Up @@ -57,6 +59,8 @@ type ClientFactory interface {
ClientFrontendClient(c *cli.Context) clientFrontend.Interface
ServerFrontendClient(c *cli.Context) frontend.Client
ServerAdminClient(c *cli.Context) admin.Client

ElasticSearchClient(c *cli.Context) *elastic.Client
}

type clientFactory struct {
Expand Down Expand Up @@ -95,6 +99,22 @@ func (b *clientFactory) ServerAdminClient(c *cli.Context) admin.Client {
return admin.NewThriftClient(serverAdmin.New(b.dispatcher.ClientConfig(cadenceFrontendService)))
}

// ElasticSearchClient builds an ElasticSearch client
func (b *clientFactory) ElasticSearchClient(c *cli.Context) *elastic.Client {
url := getRequiredOption(c, FlagURL)
retrier := elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))

client, err := elastic.NewClient(
elastic.SetURL(url),
elastic.SetRetrier(retrier),
)
if err != nil {
b.logger.Fatal("Unable to create ElasticSearch client", zap.Error(err))
}

return client
}

func (b *clientFactory) ensureDispatcher(c *cli.Context) {
if b.dispatcher != nil {
return
Expand Down
2 changes: 0 additions & 2 deletions tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ const (
FlagMessageType = "message_type"
FlagMessageTypeWithAlias = FlagMessageType + ", mt"
FlagURL = "url"
FlagMuttleyDestination = "muttely_destination"
FlagMuttleyDestinationWithAlias = FlagMuttleyDestination + ", muttley"
FlagIndex = "index"
FlagBatchSize = "batch_size"
FlagBatchSizeWithAlias = FlagBatchSize + ", bs"
Expand Down

0 comments on commit 75a992a

Please sign in to comment.