Skip to content

Fix healthcheck interceptors #6257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224

## 1.18.0 2024-09-03
Expand Down
47 changes: 28 additions & 19 deletions pkg/util/grpcclient/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -55,15 +56,17 @@ type HealthCheckInterceptors struct {
activeInstances map[string]*healthCheckEntry

instanceGcTimeout time.Duration
healthClientFactory func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient
healthClientFactory func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer)
}

func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors {
h := &HealthCheckInterceptors{
logger: logger,
instanceGcTimeout: 2 * time.Minute,
healthClientFactory: grpc_health_v1.NewHealthClient,
activeInstances: make(map[string]*healthCheckEntry),
logger: logger,
instanceGcTimeout: 2 * time.Minute,
healthClientFactory: func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
return grpc_health_v1.NewHealthClient(cc), cc
},
activeInstances: make(map[string]*healthCheckEntry),
}

h.Service = services.
Expand Down Expand Up @@ -107,16 +110,6 @@ func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry {
func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
level.Debug(h.logger).Log("msg", "Performing health check", "registeredInstances", len(h.registeredInstances()))
for _, instance := range h.registeredInstances() {
dialOpts, err := instance.clientConfig.Config.DialOption(nil, nil)
if err != nil {
return err
}
conn, err := grpc.NewClient(instance.address, dialOpts...)
c := h.healthClientFactory(conn)
if err != nil {
return err
}

if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout {
h.Lock()
delete(h.activeInstances, instance.address)
Expand All @@ -131,11 +124,27 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
instance.lastCheckTime.Store(time.Now())

go func(i *healthCheckEntry) {
if err := i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
dialOpts, err := i.clientConfig.Config.DialOption(nil, nil)
if err != nil {
level.Error(h.logger).Log("msg", "error creating dialOpts to perform healthcheck", "address", i.address, "err", err)
return
}
conn, err := grpc.NewClient(i.address, dialOpts...)
if err != nil {
level.Error(h.logger).Log("msg", "error creating client to perform healthcheck", "address", i.address, "err", err)
return
}
if err := conn.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)

client, closer := h.healthClientFactory(conn)

defer func() {
if err := closer.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)
}
}()

if err := i.recordHealth(healthCheck(client, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
}
}(instance)
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/util/grpcclient/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"testing"
"time"

Expand All @@ -20,7 +21,13 @@ import (

type healthClientMock struct {
grpc_health_v1.HealthClient
err atomic.Error
err atomic.Error
open atomic.Bool
}

func (h *healthClientMock) Close() error {
h.open.Store(false)
return nil
}

func (h *healthClientMock) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
Expand All @@ -36,8 +43,9 @@ func TestNewHealthCheckService(t *testing.T) {
i.instanceGcTimeout = time.Second * 5

hMock := &healthClientMock{}
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
return hMock
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
hMock.open.Store(true)
return hMock, hMock
}

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -79,6 +87,8 @@ func TestNewHealthCheckService(t *testing.T) {
cortex_testutil.Poll(t, i.instanceGcTimeout*2, 0, func() interface{} {
return len(i.registeredInstances())
})

require.False(t, hMock.open.Load())
}

func TestNewHealthCheckInterceptors(t *testing.T) {
Expand All @@ -92,8 +102,9 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
Timeout: time.Second,
},
}
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
return hMock
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
hMock.open.Store(true)
return hMock, hMock
}

ui := i.UnaryHealthCheckInterceptor(&cfg)
Expand All @@ -113,6 +124,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {

// first health check
require.NoError(t, i.iteration(context.Background()))
require.False(t, hMock.open.Load())

//Should second call even with error
require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker))
Expand All @@ -121,6 +133,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {

// Second Healthcheck -> should mark as unhealthy
require.NoError(t, i.iteration(context.Background()))
require.False(t, hMock.open.Load())

cortex_testutil.Poll(t, time.Second, true, func() interface{} {
return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)
Expand Down
Loading