From 038a0b22c5f326682c65ab38b2023c90830e3597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 3 May 2024 13:06:53 +0300 Subject: [PATCH] e2e/compact: add repro for issue #6775 (#7333) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding a minimal test case for issue #6775 - reproduces the panic in the compactor. Signed-off-by: Giedrius Statkevičius --- test/e2e/compact_test.go | 77 ++++++++++++++++++++++++++++++++++ test/e2e/e2ethanos/services.go | 30 +++++++++++++ 2 files changed, 107 insertions(+) diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index daa2fa615e..94a3f344fe 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -924,3 +924,80 @@ func TestCompactorDownsampleIgnoresMarked(t *testing.T) { testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics())) } + +// TestCompactorIssue6775 tests that the compactor does not crash when +// compacting 5m downsampled blocks with some overlap. +func TestCompactorIssue6775(t *testing.T) { + const minTime = 1710374400014 + const maxTime = 1711584000000 + + logger := log.NewNopLogger() + e, err := e2e.NewDockerEnvironment("c-issue6775") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + const bucket = "compact-test" + m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + bkt, err := s3.NewBucketWithConfig(logger, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + baseBlockDesc := blockDesc{ + series: []labels.Labels{ + labels.FromStrings("z", "1", "b", "2"), + labels.FromStrings("z", "1", "b", "5"), + }, + extLset: labels.FromStrings("case", "downsampled-block-with-overlap"), + mint: minTime, + maxt: maxTime, + } + + for i := 0; i < 2; i++ { + rawBlockID, err := baseBlockDesc.Create(context.Background(), dir, 0, metadata.NoneFunc, 1200+i) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, rawBlockID.String()), rawBlockID.String())) + } + + // Downsample them first. + bds := e2ethanos.NewToolsBucketDownsample(e, "downsample", client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }) + testutil.Ok(t, bds.Start()) + + // NOTE(GiedriusS): can't use WaitSumMetrics here because the e2e library doesn't + // work well with histograms. + testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stderr), 1*time.Second, make(<-chan struct{}), func() (rerr error) { + resp, err := http.Get(fmt.Sprintf("http://%s/metrics", bds.Endpoint("http"))) + if err != nil { + return fmt.Errorf("getting metrics: %w", err) + } + defer runutil.CloseWithErrCapture(&rerr, resp.Body, "close body") + + b, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("reading metrics: %w", err) + } + + if !bytes.Contains(b, []byte(`thanos_compact_downsample_duration_seconds_count{resolution="0"} 2`)) { + return fmt.Errorf("failed to find the right downsampling metric") + } + + return nil + })) + + testutil.Ok(t, bds.Stop()) + + // Run the compactor. + c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), + }, nil, "--compact.enable-vertical-compaction") + testutil.NotOk(t, e2e.StartAndWaitReady(c)) + testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics())) +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 1afd247a45..ce77cad5d2 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -1236,3 +1236,33 @@ func NewRedis(e e2e.Environment, name string) e2e.Runnable { }, ) } + +func NewToolsBucketDownsample(e e2e.Environment, name string, bucketConfig client.BucketConfig) *e2eobs.Observable { + f := e.Runnable(fmt.Sprintf("downsampler-%s", name)). + WithPorts(map[string]int{"http": 8080}). + Future() + + bktConfigBytes, err := yaml.Marshal(bucketConfig) + if err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "generate store config file: %v", bucketConfig))} + } + + args := []string{"bucket", "downsample"} + + args = append(args, e2e.BuildArgs(map[string]string{ + "--http-address": ":8080", + "--log.level": "debug", + "--objstore.config": string(bktConfigBytes), + "--data-dir": f.InternalDir(), + })...) + + return e2eobs.AsObservable(f.Init( + e2e.StartOptions{ + Image: DefaultImage(), + Command: e2e.NewCommand("tools", args...), + User: strconv.Itoa(os.Getuid()), + Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), + WaitReadyBackoff: &defaultBackoffConfig, + }, + ), "http") +}