Skip to content

Commit

Permalink
Initial support for Elasticsearch 7
Browse files Browse the repository at this point in the history
  • Loading branch information
telendt committed Mar 29, 2019
1 parent 352af50 commit a32902f
Show file tree
Hide file tree
Showing 249 changed files with 1,033 additions and 1,191 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG-7.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Changes from 6.0 to 7.0

See [breaking changes](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/breaking-changes-7.0.html).

## ???
4 changes: 2 additions & 2 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// reuse BulkService to send many batches. You do not have to create a new
// BulkService for each batch.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-bulk.html
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-bulk.html
// for more details.
type BulkService struct {
client *Client
Expand Down Expand Up @@ -94,7 +94,7 @@ func (s *BulkService) Timeout(timeout string) *BulkService {
// changes to be made visible by a refresh before reying), or "false"
// (no refresh related actions). The default value is "false".
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-refresh.html
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-refresh.html
// for details.
func (s *BulkService) Refresh(refresh string) *BulkService {
s.refresh = refresh
Expand Down
4 changes: 2 additions & 2 deletions bulk_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// BulkDeleteRequest is a request to remove a document from Elasticsearch.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-bulk.html
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-bulk.html
// for details.
type BulkDeleteRequest struct {
BulkableRequest
Expand Down Expand Up @@ -128,7 +128,7 @@ func (r *BulkDeleteRequest) String() string {

// Source returns the on-wire representation of the delete request,
// split into an action-and-meta-data line and an (optional) source line.
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-bulk.html
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-bulk.html
// for details.
func (r *BulkDeleteRequest) Source() ([]string, error) {
if r.source != nil {
Expand Down
16 changes: 8 additions & 8 deletions bulk_delete_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ func TestBulkDeleteRequestSerialization(t *testing.T) {
}{
// #0
{
Request: NewBulkDeleteRequest().Index("index1").Type("doc").Id("1"),
Request: NewBulkDeleteRequest().Index("index1").Id("1"),
Expected: []string{
`{"delete":{"_index":"index1","_type":"doc","_id":"1"}}`,
`{"delete":{"_index":"index1","_id":"1"}}`,
},
},
// #1
{
Request: NewBulkDeleteRequest().Index("index1").Type("doc").Id("1").Parent("2"),
Request: NewBulkDeleteRequest().Index("index1").Id("1").Parent("2"),
Expected: []string{
`{"delete":{"_index":"index1","_type":"doc","_id":"1","parent":"2"}}`,
`{"delete":{"_index":"index1","_id":"1","parent":"2"}}`,
},
},
// #2
{
Request: NewBulkDeleteRequest().Index("index1").Type("doc").Id("1").Routing("3"),
Request: NewBulkDeleteRequest().Index("index1").Id("1").Routing("3"),
Expected: []string{
`{"delete":{"_index":"index1","_type":"doc","_id":"1","routing":"3"}}`,
`{"delete":{"_index":"index1","_id":"1","routing":"3"}}`,
},
},
}
Expand Down Expand Up @@ -59,11 +59,11 @@ var bulkDeleteRequestSerializationResult string

func BenchmarkBulkDeleteRequestSerialization(b *testing.B) {
b.Run("stdlib", func(b *testing.B) {
r := NewBulkDeleteRequest().Index(testIndexName).Type("doc").Id("1")
r := NewBulkDeleteRequest().Index(testIndexName).Id("1")
benchmarkBulkDeleteRequestSerialization(b, r.UseEasyJSON(false))
})
b.Run("easyjson", func(b *testing.B) {
r := NewBulkDeleteRequest().Index(testIndexName).Type("doc").Id("1")
r := NewBulkDeleteRequest().Index(testIndexName).Id("1")
benchmarkBulkDeleteRequestSerialization(b, r.UseEasyJSON(true))
})
}
Expand Down
8 changes: 4 additions & 4 deletions bulk_index_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// BulkIndexRequest is a request to add a document to Elasticsearch.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-bulk.html
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-bulk.html
// for details.
type BulkIndexRequest struct {
BulkableRequest
Expand Down Expand Up @@ -95,7 +95,7 @@ func (r *BulkIndexRequest) Id(id string) *BulkIndexRequest {

// OpType specifies if this request should follow create-only or upsert
// behavior. This follows the OpType of the standard document index API.
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-index_.html#operation-type
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-index_.html#operation-type
// for details.
func (r *BulkIndexRequest) OpType(opType string) *BulkIndexRequest {
r.opType = opType
Expand Down Expand Up @@ -129,7 +129,7 @@ func (r *BulkIndexRequest) Version(version int64) *BulkIndexRequest {
// VersionType specifies how versions are created. It can be e.g. internal,
// external, external_gte, or force.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-index_.html#index-versioning
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-index_.html#index-versioning
// for details.
func (r *BulkIndexRequest) VersionType(versionType string) *BulkIndexRequest {
r.versionType = versionType
Expand Down Expand Up @@ -170,7 +170,7 @@ func (r *BulkIndexRequest) String() string {

// Source returns the on-wire representation of the index request,
// split into an action-and-meta-data line and an (optional) source line.
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-bulk.html
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-bulk.html
// for details.
func (r *BulkIndexRequest) Source() ([]string, error) {
// { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
Expand Down
28 changes: 14 additions & 14 deletions bulk_index_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,56 +16,56 @@ func TestBulkIndexRequestSerialization(t *testing.T) {
}{
// #0
{
Request: NewBulkIndexRequest().Index("index1").Type("doc").Id("1").
Request: NewBulkIndexRequest().Index("index1").Id("1").
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)}),
Expected: []string{
`{"index":{"_index":"index1","_id":"1","_type":"doc"}}`,
`{"index":{"_index":"index1","_id":"1"}}`,
`{"user":"olivere","message":"","retweets":0,"created":"2014-01-18T23:59:58Z"}`,
},
},
// #1
{
Request: NewBulkIndexRequest().OpType("create").Index("index1").Type("doc").Id("1").
Request: NewBulkIndexRequest().OpType("create").Index("index1").Id("1").
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)}),
Expected: []string{
`{"create":{"_index":"index1","_id":"1","_type":"doc"}}`,
`{"create":{"_index":"index1","_id":"1"}}`,
`{"user":"olivere","message":"","retweets":0,"created":"2014-01-18T23:59:58Z"}`,
},
},
// #2
{
Request: NewBulkIndexRequest().OpType("index").Index("index1").Type("doc").Id("1").
Request: NewBulkIndexRequest().OpType("index").Index("index1").Id("1").
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)}),
Expected: []string{
`{"index":{"_index":"index1","_id":"1","_type":"doc"}}`,
`{"index":{"_index":"index1","_id":"1"}}`,
`{"user":"olivere","message":"","retweets":0,"created":"2014-01-18T23:59:58Z"}`,
},
},
// #3
{
Request: NewBulkIndexRequest().OpType("index").Index("index1").Type("doc").Id("1").RetryOnConflict(42).
Request: NewBulkIndexRequest().OpType("index").Index("index1").Id("1").RetryOnConflict(42).
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)}),
Expected: []string{
`{"index":{"_index":"index1","_id":"1","_type":"doc","retry_on_conflict":42}}`,
`{"index":{"_index":"index1","_id":"1","retry_on_conflict":42}}`,
`{"user":"olivere","message":"","retweets":0,"created":"2014-01-18T23:59:58Z"}`,
},
},
// #4
{
Request: NewBulkIndexRequest().OpType("index").Index("index1").Type("doc").Id("1").Pipeline("my_pipeline").
Request: NewBulkIndexRequest().OpType("index").Index("index1").Id("1").Pipeline("my_pipeline").
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)}),
Expected: []string{
`{"index":{"_index":"index1","_id":"1","_type":"doc","pipeline":"my_pipeline"}}`,
`{"index":{"_index":"index1","_id":"1","pipeline":"my_pipeline"}}`,
`{"user":"olivere","message":"","retweets":0,"created":"2014-01-18T23:59:58Z"}`,
},
},
// #5
{
Request: NewBulkIndexRequest().OpType("index").Index("index1").Type("doc").Id("1").
Request: NewBulkIndexRequest().OpType("index").Index("index1").Id("1").
Routing("123").
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)}),
Expected: []string{
`{"index":{"_index":"index1","_id":"1","_type":"doc","routing":"123"}}`,
`{"index":{"_index":"index1","_id":"1","routing":"123"}}`,
`{"user":"olivere","message":"","retweets":0,"created":"2014-01-18T23:59:58Z"}`,
},
},
Expand Down Expand Up @@ -105,12 +105,12 @@ var bulkIndexRequestSerializationResult string

func BenchmarkBulkIndexRequestSerialization(b *testing.B) {
b.Run("stdlib", func(b *testing.B) {
r := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id("1").
r := NewBulkIndexRequest().Index(testIndexName).Id("1").
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)})
benchmarkBulkIndexRequestSerialization(b, r.UseEasyJSON(false))
})
b.Run("easyjson", func(b *testing.B) {
r := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id("1").
r := NewBulkIndexRequest().Index(testIndexName).Id("1").
Doc(tweet{User: "olivere", Created: time.Date(2014, 1, 18, 23, 59, 58, 0, time.UTC)})
benchmarkBulkIndexRequestSerialization(b, r.UseEasyJSON(true))
})
Expand Down
16 changes: 8 additions & 8 deletions bulk_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestBulkProcessorBasedOnFlushInterval(t *testing.T) {

for i := 1; i <= numDocs; i++ {
tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
request := NewBulkIndexRequest().Index(testIndexName).Id(fmt.Sprintf("%d", i)).Doc(tweet)
p.Add(request)
}

Expand Down Expand Up @@ -165,7 +165,7 @@ func TestBulkProcessorBasedOnFlushInterval(t *testing.T) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do(context.TODO())
_, err = p.c.Refresh(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestBulkProcessorClose(t *testing.T) {

for i := 1; i <= numDocs; i++ {
tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
request := NewBulkIndexRequest().Index(testIndexName).Id(fmt.Sprintf("%d", i)).Doc(tweet)
p.Add(request)
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestBulkProcessorClose(t *testing.T) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do(context.TODO())
_, err = p.c.Refresh(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestBulkProcessorFlush(t *testing.T) {

for i := 1; i <= numDocs; i++ {
tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
request := NewBulkIndexRequest().Index(testIndexName).Id(fmt.Sprintf("%d", i)).Doc(tweet)
p.Add(request)
}

Expand Down Expand Up @@ -322,7 +322,7 @@ func TestBulkProcessorFlush(t *testing.T) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do(context.TODO())
_, err = p.c.Refresh(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -363,7 +363,7 @@ func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) {

for i := 1; i <= numDocs; i++ {
tweet := tweet{User: "olivere", Message: fmt.Sprintf("%07d. %s", i, randomString(1+rand.Intn(63)))}
request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
request := NewBulkIndexRequest().Index(testIndexName).Id(fmt.Sprintf("%d", i)).Doc(tweet)
p.Add(request)
}

Expand Down Expand Up @@ -415,7 +415,7 @@ func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do(context.TODO())
_, err = p.c.Refresh(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit a32902f

Please sign in to comment.