Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of watch: support -filter for consul watch: checks, services, nodes, service into release/1.15.x #17986

Prev Previous commit
Next Next commit
backport of commit 55d927d
  • Loading branch information
huikang committed Jun 16, 2023
commit 61ab6b2c43f6078226f28ffa9baf1e167bccd272
12 changes: 12 additions & 0 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ func (s *HTTPHandlers) HealthServiceChecks(resp http.ResponseWriter, req *http.R
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"}
}

// build tag filter
params := req.URL.Query()
if tags, ok := params["tag"]; ok {
for i, tag := range tags {
expr := fmt.Sprintf(`%s in ServiceTags`, tag)
if i < len(tags)-1 {
expr += " and "
}
args.Filter += expr
}
}

// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
Expand Down
10 changes: 10 additions & 0 deletions api/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,17 @@ func (h *Health) Node(node string, q *QueryOptions) (HealthChecks, *QueryMeta, e

// Checks is used to return the checks associated with a service
func (h *Health) Checks(service string, q *QueryOptions) (HealthChecks, *QueryMeta, error) {
return h.ChecksTags(service, nil, q)
}

// ChecksTags is used to return the checks associated with a service filtered by tags
func (h *Health) ChecksTags(service string, tags []string, q *QueryOptions) (HealthChecks, *QueryMeta, error) {
r := h.c.newRequest("GET", "/v1/health/checks/"+service)
if len(tags) > 0 {
for _, tag := range tags {
r.params.Add("tag", tag)
}
}
r.setQueryOptions(q)
rtt, resp, err := h.c.doRequest(r)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/watch/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
if state != "" {
checks, meta, err = health.StateTags(state, tags, &opts)
} else {
checks, meta, err = health.Checks(service, &opts)
checks, meta, err = health.ChecksTags(service, tags, &opts)
}
if err != nil {
return nil, nil, err
Expand Down
106 changes: 105 additions & 1 deletion api/watch/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,111 @@ func TestChecksWatch_Service(t *testing.T) {
}
}

func TestChecksWatch_Service_Tags(t *testing.T) {
func TestChecksWatch_Service_Tag(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

s.WaitForSerfCheck(t)

var (
wakeups [][]*api.HealthCheck
notifyCh = make(chan struct{})
)

plan := mustParse(t, `{"type":"checks", "service":"foobar", "tag":["b", "a"]}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.HealthCheck)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()

// Wait for first wakeup.
<-notifyCh
{
catalog := c.Catalog()

// we want to find this one
reg := &api.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foobar",
Service: "foobar",
Tags: []string{"a", "b"},
},
Check: &api.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: api.HealthPassing,
ServiceID: "foobar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}

// we don't want to find this one
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foobar",
Service: "foobar",
Tags: []string{"a"},
},
Check: &api.AgentCheck{
Node: "bar",
CheckID: "foobar",
Name: "foobar",
Status: api.HealthPassing,
ServiceID: "foobar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}

// Wait for second wakeup.
<-notifyCh

plan.Stop()
wg.Wait()

require.Len(t, wakeups, 2)

{
v := wakeups[0]
require.Len(t, v, 0)
}
{
v := wakeups[1]
require.Len(t, v, 1)
require.Equal(t, "foobar", v[0].CheckID)
}
}

func TestChecksWatch_Tag(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
Expand Down