Skip to content

Commit

Permalink
Port changes in v3 over to v5
Browse files Browse the repository at this point in the history
* Add Nodes Stats API
* Add `AliasAction`, `AliasAddAction`, and `AliasRemoveAction`
* Use `context.Context` everywhere with `Do` and `PerformRequest`
  • Loading branch information
olivere committed Sep 1, 2016
1 parent 3a37793 commit f00a5dd
Show file tree
Hide file tree
Showing 116 changed files with 2,042 additions and 613 deletions.
6 changes: 4 additions & 2 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"fmt"
"net/url"

"golang.org/x/net/context"

"gopkg.in/olivere/elastic.v5/uritemplates"
)

Expand Down Expand Up @@ -146,7 +148,7 @@ func (s *BulkService) bodyAsString() (string, error) {
// Do sends the batched requests to Elasticsearch. Note that, when successful,
// you can reuse the BulkService for the next batch as the list of bulk
// requests is cleared on success.
func (s *BulkService) Do() (*BulkResponse, error) {
func (s *BulkService) Do(ctx context.Context) (*BulkResponse, error) {
// No actions?
if s.NumberOfActions() == 0 {
return nil, errors.New("elastic: No bulk actions to commit")
Expand Down Expand Up @@ -193,7 +195,7 @@ func (s *BulkService) Do() (*BulkResponse, error) {
}

// Get response
res, err := s.client.PerformRequest("POST", path, params, body)
res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"

"gopkg.in/olivere/elastic.v5/backoff"
)

Expand Down Expand Up @@ -464,7 +466,7 @@ func (w *bulkWorker) commit() error {
// via exponential backoff
commitFunc := func() error {
var err error
res, err = w.service.Do()
res, err = w.service.Do(context.Background())
return err
}
// notifyFunc will be called if retry fails
Expand Down
18 changes: 10 additions & 8 deletions bulk_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync/atomic"
"testing"
"time"

"golang.org/x/net/context"
)

func TestBulkProcessorDefaults(t *testing.T) {
Expand Down Expand Up @@ -157,11 +159,11 @@ func TestBulkProcessorBasedOnFlushInterval(t *testing.T) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do()
_, err = p.c.Flush(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
count, err := p.c.Count(testIndexName).Do()
count, err := p.c.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -241,11 +243,11 @@ func TestBulkProcessorClose(t *testing.T) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do()
_, err = p.c.Flush(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
count, err := p.c.Count(testIndexName).Do()
count, err := p.c.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -314,11 +316,11 @@ func TestBulkProcessorFlush(t *testing.T) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do()
_, err = p.c.Flush(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
count, err := p.c.Count(testIndexName).Do()
count, err := p.c.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -407,11 +409,11 @@ func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) {
}

// Check number of documents that were bulk indexed
_, err = p.c.Flush(testIndexName).Do()
_, err = p.c.Flush(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
count, err := p.c.Count(testIndexName).Do()
count, err := p.c.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down
24 changes: 13 additions & 11 deletions bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package elastic
import (
"encoding/json"
"testing"

"golang.org/x/net/context"
)

func TestBulk(t *testing.T) {
Expand All @@ -29,7 +31,7 @@ func TestBulk(t *testing.T) {
t.Errorf("expected bulkRequest.NumberOfActions %d; got %d", 3, bulkRequest.NumberOfActions())
}

bulkResponse, err := bulkRequest.Do()
bulkResponse, err := bulkRequest.Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -42,7 +44,7 @@ func TestBulk(t *testing.T) {
}

// Document with Id="1" should not exist
exists, err := client.Exists().Index(testIndexName).Type("tweet").Id("1").Do()
exists, err := client.Exists().Index(testIndexName).Type("tweet").Id("1").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -51,7 +53,7 @@ func TestBulk(t *testing.T) {
}

// Document with Id="2" should exist
exists, err = client.Exists().Index(testIndexName).Type("tweet").Id("2").Do()
exists, err = client.Exists().Index(testIndexName).Type("tweet").Id("2").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,7 +75,7 @@ func TestBulk(t *testing.T) {
t.Errorf("expected bulkRequest.NumberOfActions %d; got %d", 1, bulkRequest.NumberOfActions())
}

bulkResponse, err = bulkRequest.Do()
bulkResponse, err = bulkRequest.Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -86,7 +88,7 @@ func TestBulk(t *testing.T) {
}

// Document with Id="1" should have a retweets count of 42
doc, err := client.Get().Index(testIndexName).Type("tweet").Id("2").Do()
doc, err := client.Get().Index(testIndexName).Type("tweet").Id("2").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -117,7 +119,7 @@ func TestBulk(t *testing.T) {
if bulkRequest.NumberOfActions() != 1 {
t.Errorf("expected bulkRequest.NumberOfActions %d; got %d", 1, bulkRequest.NumberOfActions())
}
bulkResponse, err = bulkRequest.Refresh("wait_for").Do()
bulkResponse, err = bulkRequest.Refresh("wait_for").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -130,7 +132,7 @@ func TestBulk(t *testing.T) {
}

// Document with Id="1" should have a retweets count of 43
doc, err = client.Get().Index(testIndexName).Type("tweet").Id("2").Do()
doc, err = client.Get().Index(testIndexName).Type("tweet").Id("2").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -171,7 +173,7 @@ func TestBulkWithIndexSetOnClient(t *testing.T) {
t.Errorf("expected bulkRequest.NumberOfActions %d; got %d", 3, bulkRequest.NumberOfActions())
}

bulkResponse, err := bulkRequest.Do()
bulkResponse, err := bulkRequest.Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -180,7 +182,7 @@ func TestBulkWithIndexSetOnClient(t *testing.T) {
}

// Document with Id="1" should not exist
exists, err := client.Exists().Index(testIndexName).Type("tweet").Id("1").Do()
exists, err := client.Exists().Index(testIndexName).Type("tweet").Id("1").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -189,7 +191,7 @@ func TestBulkWithIndexSetOnClient(t *testing.T) {
}

// Document with Id="2" should exist
exists, err = client.Exists().Index(testIndexName).Type("tweet").Id("2").Do()
exists, err = client.Exists().Index(testIndexName).Type("tweet").Id("2").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -241,7 +243,7 @@ func TestBulkRequestsSerialization(t *testing.T) {
}

// Run the bulk request
bulkResponse, err := bulkRequest.Do()
bulkResponse, err := bulkRequest.Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions clear_scroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"net/url"
"strings"

"golang.org/x/net/context"
)

// ClearScrollService clears one or more scroll contexts by their ids.
Expand Down Expand Up @@ -67,7 +69,7 @@ func (s *ClearScrollService) Validate() error {
}

// Do executes the operation.
func (s *ClearScrollService) Do() (*ClearScrollResponse, error) {
func (s *ClearScrollService) Do(ctx context.Context) (*ClearScrollResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
Expand All @@ -83,7 +85,7 @@ func (s *ClearScrollService) Do() (*ClearScrollResponse, error) {
body := strings.Join(s.scrollId, ",")

// Get HTTP response
res, err := s.client.PerformRequest("DELETE", path, params, body)
res, err := s.client.PerformRequest(ctx, "DELETE", path, params, body)
if err != nil {
return nil, err
}
Expand Down
20 changes: 11 additions & 9 deletions clear_scroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package elastic
import (
_ "net/http"
"testing"

"golang.org/x/net/context"
)

func TestClearScroll(t *testing.T) {
Expand All @@ -17,28 +19,28 @@ func TestClearScroll(t *testing.T) {
tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}

// Add all documents
_, err := client.Index().Index(testIndexName).Type("tweet").Id("1").BodyJson(&tweet1).Do()
_, err := client.Index().Index(testIndexName).Type("tweet").Id("1").BodyJson(&tweet1).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Type("tweet").Id("2").BodyJson(&tweet2).Do()
_, err = client.Index().Index(testIndexName).Type("tweet").Id("2").BodyJson(&tweet2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Type("tweet").Id("3").BodyJson(&tweet3).Do()
_, err = client.Index().Index(testIndexName).Type("tweet").Id("3").BodyJson(&tweet3).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Flush().Index(testIndexName).Do()
_, err = client.Flush().Index(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

// Match all should return all documents
res, err := client.Scroll(testIndexName).Size(1).Do()
res, err := client.Scroll(testIndexName).Size(1).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -50,13 +52,13 @@ func TestClearScroll(t *testing.T) {
}

// Search should succeed
_, err = client.Scroll(testIndexName).Size(1).ScrollId(res.ScrollId).Do()
_, err = client.Scroll(testIndexName).Size(1).ScrollId(res.ScrollId).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

// Clear scroll id
clearScrollRes, err := client.ClearScroll().ScrollId(res.ScrollId).Do()
clearScrollRes, err := client.ClearScroll().ScrollId(res.ScrollId).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand All @@ -65,7 +67,7 @@ func TestClearScroll(t *testing.T) {
}

// Search result should fail
_, err = client.Scroll(testIndexName).Size(1).ScrollId(res.ScrollId).Do()
_, err = client.Scroll(testIndexName).Size(1).ScrollId(res.ScrollId).Do(context.TODO())
if err == nil {
t.Fatalf("expected scroll to fail")
}
Expand All @@ -75,7 +77,7 @@ func TestClearScrollValidate(t *testing.T) {
client := setupTestClient(t)

// No scroll id -> fail with error
res, err := NewClearScrollService(client).Do()
res, err := NewClearScrollService(client).Do(context.TODO())
if err == nil {
t.Fatalf("expected ClearScroll to fail without scroll ids")
}
Expand Down
18 changes: 13 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"strings"
"sync"
"time"

"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
)

const (
Expand Down Expand Up @@ -1029,7 +1032,7 @@ func (c *Client) mustActiveConn() error {
// Optionally, a list of HTTP error codes to ignore can be passed.
// This is necessary for services that expect e.g. HTTP status 404 as a
// valid outcome (Exists, IndicesExists, IndicesTypeExists).
func (c *Client) PerformRequest(method, path string, params url.Values, body interface{}, ignoreErrors ...int) (*Response, error) {
func (c *Client) PerformRequest(ctx context.Context, method, path string, params url.Values, body interface{}, ignoreErrors ...int) (*Response, error) {
start := time.Now().UTC()

c.mu.RLock()
Expand Down Expand Up @@ -1107,7 +1110,7 @@ func (c *Client) PerformRequest(method, path string, params url.Values, body int
c.dumpRequest((*http.Request)(req))

// Get response
res, err := c.c.Do((*http.Request)(req))
res, err := ctxhttp.Do(ctx, c.c, (*http.Request)(req))
if err != nil {
retries--
if retries <= 0 {
Expand Down Expand Up @@ -1459,6 +1462,11 @@ func (c *Client) NodesInfo() *NodesInfoService {
return NewNodesInfoService(c)
}

// NodesStats retrieves one or more or all of the cluster nodes statistics.
func (c *Client) NodesStats() *NodesStatsService {
return NewNodesStatsService(c)
}

// TasksCancel cancels tasks running on the specified nodes.
func (c *Client) TasksCancel() *TasksCancelService {
return NewTasksCancelService(c)
Expand Down Expand Up @@ -1492,7 +1500,7 @@ func (c *Client) TasksList() *TasksListService {
// ElasticsearchVersion returns the version number of Elasticsearch
// running on the given URL.
func (c *Client) ElasticsearchVersion(url string) (string, error) {
res, _, err := c.Ping(url).Do()
res, _, err := c.Ping(url).Do(context.Background())
if err != nil {
return "", err
}
Expand All @@ -1501,7 +1509,7 @@ func (c *Client) ElasticsearchVersion(url string) (string, error) {

// IndexNames returns the names of all indices in the cluster.
func (c *Client) IndexNames() ([]string, error) {
res, err := c.IndexGetSettings().Index("_all").Do()
res, err := c.IndexGetSettings().Index("_all").Do(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -1528,7 +1536,7 @@ func (c *Client) Ping(url string) *PingService {
// If the cluster will have the given state within the timeout, nil is returned.
// If the request timed out, ErrTimeout is returned.
func (c *Client) WaitForStatus(status string, timeout string) error {
health, err := c.ClusterHealth().WaitForStatus(status).Timeout(timeout).Do()
health, err := c.ClusterHealth().WaitForStatus(status).Timeout(timeout).Do(context.Background())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f00a5dd

Please sign in to comment.