Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

395 elastic parent #406

Merged
merged 19 commits into from
Aug 29, 2017
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/pipeline.js
/transformer.js
.vscode
/vendor/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to exclude the vendor from changes

1 change: 1 addition & 0 deletions adaptor/elasticsearch/clients/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ type ClientOptions struct {
UserInfo *url.Userinfo
HTTPClient *http.Client
Index string
ParentId string
}
26 changes: 22 additions & 4 deletions adaptor/elasticsearch/clients/v5/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Writer struct {
confirmChan chan struct{}
logger log.Logger
writeErr error
parentId string
}

func init() {
Expand All @@ -51,8 +52,9 @@ func init() {
return nil, err
}
w := &Writer{
index: opts.Index,
logger: log.With("writer", "elasticsearch").With("version", 5),
index: opts.Index,
parentId: opts.ParentId,
logger: log.With("writer", "elasticsearch").With("version", 5),
}
p, err := esClient.BulkProcessor().
Name("TransporterWorker-1").
Expand Down Expand Up @@ -85,6 +87,11 @@ func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error
id = msg.ID()
msg.Data().Delete("_id")
}
var parent_id string
if _, ok := msg.Data()[w.parentId]; ok {
parent_id = msg.Data()[w.parentId].(string)
msg.Data().Delete(w.parentId)
}

var br elastic.BulkableRequest
switch msg.OP() {
Expand All @@ -94,10 +101,21 @@ func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error
w.bp.Flush()
br = elastic.NewBulkDeleteRequest().Index(w.index).Type(indexType).Id(id)
case ops.Insert:
br = elastic.NewBulkIndexRequest().Index(w.index).Type(indexType).Id(id).Doc(msg.Data())
indexReq := elastic.NewBulkIndexRequest().Index(w.index).Type(indexType).Id(id)
if parent_id != "" {
indexReq.Parent(parent_id)
}
indexReq.Doc(msg.Data())
br = indexReq
case ops.Update:
br = elastic.NewBulkUpdateRequest().Index(w.index).Type(indexType).Id(id).Doc(msg.Data())
indexReq := elastic.NewBulkUpdateRequest().Index(w.index).Type(indexType).Id(id)
if parent_id != "" {
indexReq.Parent(parent_id)
}
indexReq.Doc(msg.Data())
br = indexReq
}

w.bp.Add(br)
return msg, nil
}
Expand Down
9 changes: 7 additions & 2 deletions adaptor/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
// "timeout": "10s", // defaults to 30s
// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
// "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearchl
}`
)

Expand All @@ -43,6 +44,7 @@ type Elasticsearch struct {
adaptor.BaseConfig
AWSAccessKeyID string `json:"aws_access_key" doc:"credentials for use with AWS Elasticsearch service"`
AWSAccessSecret string `json:"aws_access_secret" doc:"credentials for use with AWS Elasticsearch service"`
ParentId string
}

// Description for the Elasticsearcb adaptor
Expand Down Expand Up @@ -109,12 +111,14 @@ func setupWriter(conf *Elasticsearch) (client.Writer, error) {
if err != nil {
return nil, err
}

v, err := version.NewVersion(stringVersion)
if err != nil {
return nil, client.VersionError{URI: conf.URI, V: stringVersion, Err: err.Error()}
}

var parentIdentifier = "elastic_parent"
if conf.ParentId != "" {
parentIdentifier = conf.ParentId
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block is not necessary since the writer doesn't do a check for whether ParentID is an empty string, we can simplify by removing this bit of code and just call ParentID: conf.ParentID below.

for _, vc := range clients.Clients {
if vc.Constraint.Check(v) {
urls := make([]string, len(hostsAndPorts))
Expand All @@ -126,6 +130,7 @@ func setupWriter(conf *Elasticsearch) (client.Writer, error) {
UserInfo: uri.User,
HTTPClient: httpClient,
Index: uri.Path[1:],
ParentId: parentIdentifier,
}
versionedClient, _ := vc.Creator(opts)
return versionedClient, nil
Expand Down