Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 committed Mar 19, 2019
1 parent 9bd8b65 commit 4e7c4bd
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 45 deletions.
65 changes: 36 additions & 29 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri

uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Default("false").Hidden().Bool()

validateProm := cmd.Flag("sidecar.validate-prom", "[Experimental]If true sidecar will check Prometheus' flags to ensure disabled compaction and 2h block-time.").Default("true").Hidden().Bool()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
Expand Down Expand Up @@ -83,6 +85,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
peer,
rl,
*uploadCompacted,
*validateProm,
)
}
}
Expand All @@ -103,6 +106,7 @@ func runSidecar(
peer cluster.Peer,
reloader *reloader.Reloader,
uploadCompacted bool,
validateProm bool,
) error {
var m = &promMetadata{
promURL: promURL,
Expand All @@ -127,40 +131,43 @@ func runSidecar(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Retry infinitely until we get Prometheus version.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err := m.FetchPromVersion(ctx, promURL, logger)
if validateProm {
// Retry infinitely until we get Prometheus version.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err := m.FetchPromVersion(promURL, logger)
if err != nil {
level.Warn(logger).Log(
"msg", "failed to get Prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return errors.Wrapf(err, "fetch Prometheus version")
}
return nil
})
if err != nil {
level.Warn(logger).Log(
"msg", "failed to get Prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return errors.Wrapf(err, "get Prometheus version")
return err
}
return nil
})
if err != nil {
return err
}

if m.version == nil {
level.Warn(logger).Log("msg", "Fetched version is nil, skip validation")
} else {
// Check if Prometheus has /status/flags endpoint.
if m.version.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original())
return nil
}
// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, promURL, dataDir); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
if m.version == nil {
level.Warn(logger).Log("msg", "can't fetch version, skip validation")
} else {
// Check if Prometheus has /status/flags endpoint.
if m.version.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original())
return nil
}

// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, promURL, dataDir); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx, logger); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -403,7 +410,7 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
return s.mint, s.maxt
}

func (s *promMetadata) FetchPromVersion(ctx context.Context, promURL *url.URL, logger log.Logger) (err error) {
s.version , err = promclient.GetPromVersion(ctx, logger, promURL)
return err
func (s *promMetadata) FetchPromVersion(promURL *url.URL, logger log.Logger) (err error) {
s.version, err = promclient.GetPromVersion(logger, promURL)
return err
}
33 changes: 17 additions & 16 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,19 +343,14 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q
}

// GetPromVersion will return the version of Prometheus by querying /version Prometheus endpoint.
func GetPromVersion(ctx context.Context, logger log.Logger, base *url.URL) (*version.Version, error) {
func GetPromVersion(logger log.Logger, base *url.URL) (*version.Version, error) {
if logger == nil {
logger = log.NewNopLogger()
}

u := *base
u.Path = path.Join(u.Path, "/version")
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "create request")
}

resp, err := http.DefaultClient.Do(req.WithContext(ctx))
resp, err := http.Get(u.String())
if err != nil {
return nil, errors.Wrapf(err, "request version against %s", u.String())
}
Expand All @@ -375,16 +370,8 @@ func GetPromVersion(ctx context.Context, logger log.Logger, base *url.URL) (*ver
if err := json.Unmarshal(b, &m); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(b))
}
if m.Version == "" {
return nil, nil
}

ver, err := version.NewVersion(m.Version)
if err != nil {
return nil, errors.Wrapf(err, "failed to init version %s", m.Version)
}

return ver, nil
return parseVersion(m.Version)
}

// Scalar response consists of array with mixed types so it needs to be
Expand Down Expand Up @@ -414,3 +401,17 @@ func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector,
Value: resultValue,
Timestamp: resultTime}}, nil
}

// parseVersion converts string to version.Version.
func parseVersion(str string) (*version.Version, error) {
if strings.TrimSpace(str) == "" {
return nil, nil
}

ver, err := version.NewVersion(str)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse version %s", str)
}

return ver, nil
}
16 changes: 16 additions & 0 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package promclient
import (
"context"
"fmt"
"github.com/hashicorp/go-version"
"io/ioutil"
"net/url"
"os"
Expand Down Expand Up @@ -142,3 +143,18 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) {
vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestParseVersion(t *testing.T) {
vers := []string{"2.4.3", "2.5.0", ""}

expects := make([]*version.Version, len(vers))
for i, v := range vers {
expects[i], _ = version.NewVersion(v)
}

for i, v := range vers {
ver, err := parseVersion(v)
testutil.Ok(t, err)
testutil.Equals(t, ver, expects[i])
}
}

0 comments on commit 4e7c4bd

Please sign in to comment.