Skip to content

RFC: Add integration test for etcd #4113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))

// Start other Cortex components (ingester running on previous version).
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", "consul", consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1))

// Wait until the distributor has updated the ring.
Expand All @@ -133,7 +133,7 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags(), map[string]string{
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags(), map[string]string{
"-ingester.join-after": "10s",
}), "")
// Start ingester-2 on new version, to ensure the transfer is backward compatible.
Expand Down Expand Up @@ -181,10 +181,10 @@ func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previo
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))

// Start other Cortex components (ingester running on previous version).
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flagsForNewImage, "")
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", "consul", consul.NetworkHTTPEndpoint(), flagsForNewImage, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))

// Wait until the distributor has updated the ring.
Expand Down Expand Up @@ -252,7 +252,7 @@ func checkQueries(
}()

// Start querier.
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), e2e.MergeFlagsWithoutRemovingEmpty(c.querierFlags, map[string]string{
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), e2e.MergeFlagsWithoutRemovingEmpty(c.querierFlags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), c.querierImage)

Expand Down
78 changes: 78 additions & 0 deletions integration/blocks_storage_backends_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// +build requires_docker

package integration

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestBlocksStorageWithEtcd(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.tsdb.head-compaction-interval": "1m",
"-store-gateway.sharding-enabled": "true",
"-querier.ingester-streaming": "true",
})

// Start dependencies.
etcd := e2edb.NewETCD()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(etcd, minio))

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreEtcd, etcd.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreEtcd, etcd.NetworkHTTPEndpoint(), flags, "")
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreEtcd, etcd.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway))

// Wait until the distributor and the store-gateway have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Sharding is disabled, pass gateway address.
querierFlags := mergeFlags(flags, map[string]string{
// "-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
"-blocks-storage.bucket-store.sync-interval": "1m",
// "-distributor.shard-by-all-labels": "true",
})
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreEtcd, etcd.NetworkHTTPEndpoint(), querierFlags, "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the distributor, the querier and the store-gateway have updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total"))

// Push a series for each user to Cortex.
now := time.Now()

distClient, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

series, _ := generateSeries("series_1", now)

res, err := distClient.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

queClient, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

_, err = queClient.Query("series_1", now)
require.NoError(t, err)

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester)
assertServiceMetricsPrefixes(t, Querier, querier)
}
87 changes: 62 additions & 25 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ const (
GossipPort = 9094
)

type RingStore string

const (
RingStoreConsul RingStore = "consul"
RingStoreEtcd RingStore = "etcd"
)

// GetDefaultImage returns the Docker image to use to run Cortex.
func GetDefaultImage() string {
// Get the cortex image from the CORTEX_IMAGE env variable,
Expand All @@ -24,15 +31,23 @@ func GetDefaultImage() string {
return "quay.io/cortexproject/cortex:latest"
}

func NewDistributor(name string, consulAddress string, flags map[string]string, image string) *CortexService {
return NewDistributorWithConfigFile(name, consulAddress, "", flags, image)
func NewDistributor(name string, store RingStore, address string, flags map[string]string, image string) *CortexService {
return NewDistributorWithConfigFile(name, store, address, "", flags, image)
}

func NewDistributorWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService {
func NewDistributorWithConfigFile(name string, store RingStore, address, configFile string, flags map[string]string, image string) *CortexService {
if configFile != "" {
flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
}

// Configure the ingesters ring backend
flags["-ring.store"] = string(store)
if store == RingStoreConsul {
flags["-consul.hostname"] = address
} else if store == RingStoreEtcd {
flags["-etcd.endpoints"] = address
}

if image == "" {
image = GetDefaultImage()
}
Expand All @@ -45,25 +60,39 @@ func NewDistributorWithConfigFile(name, consulAddress, configFile string, flags
"-log.level": "warn",
"-auth.enabled": "true",
"-distributor.replication-factor": "1",
// Configure the ingesters ring backend
"-ring.store": "consul",
"-consul.hostname": consulAddress,
}, flags))...),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
httpPort,
grpcPort,
)
}

func NewQuerier(name string, consulAddress string, flags map[string]string, image string) *CortexService {
return NewQuerierWithConfigFile(name, consulAddress, "", flags, image)
func NewQuerier(name string, store RingStore, address string, flags map[string]string, image string) *CortexService {
return NewQuerierWithConfigFile(name, store, address, "", flags, image)
}

func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService {
func NewQuerierWithConfigFile(name string, store RingStore, address, configFile string, flags map[string]string, image string) *CortexService {
if configFile != "" {
flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
}

// Configure the ingesters ring backend and the store-gateway ring backend.
ringBackendFlags := map[string]string{
"-ring.store": string(store),
"-store-gateway.sharding-ring.store": string(store),
}

if store == RingStoreConsul {
ringBackendFlags["-consul.hostname"] = address
ringBackendFlags["-store-gateway.sharding-ring.consul.hostname"] = address
} else if store == RingStoreEtcd {
ringBackendFlags["-etcd.endpoints"] = address
ringBackendFlags["-store-gateway.sharding-ring.etcd.endpoints"] = address
}

// For backward compatibility
flags = e2e.MergeFlagsWithoutRemovingEmpty(ringBackendFlags, flags)

if image == "" {
image = GetDefaultImage()
}
Expand All @@ -75,9 +104,6 @@ func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[
"-target": "querier",
"-log.level": "warn",
"-distributor.replication-factor": "1",
// Ingesters ring backend.
"-ring.store": "consul",
"-consul.hostname": consulAddress,
// Query-frontend worker.
"-querier.frontend-client.backoff-min-period": "100ms",
"-querier.frontend-client.backoff-max-period": "100ms",
Expand All @@ -87,8 +113,6 @@ func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[
"-querier.dns-lookup-period": "1s",
// Store-gateway ring backend.
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.store": "consul",
"-store-gateway.sharding-ring.consul.hostname": consulAddress,
"-store-gateway.sharding-ring.replication-factor": "1",
}, flags))...),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
Expand All @@ -97,15 +121,23 @@ func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[
)
}

func NewStoreGateway(name string, consulAddress string, flags map[string]string, image string) *CortexService {
return NewStoreGatewayWithConfigFile(name, consulAddress, "", flags, image)
func NewStoreGateway(name string, store RingStore, address string, flags map[string]string, image string) *CortexService {
return NewStoreGatewayWithConfigFile(name, store, address, "", flags, image)
}

func NewStoreGatewayWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService {
func NewStoreGatewayWithConfigFile(name string, store RingStore, address string, configFile string, flags map[string]string, image string) *CortexService {
if configFile != "" {
flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
}

if store == RingStoreConsul {
flags["-consul.hostname"] = address
flags["-store-gateway.sharding-ring.consul.hostname"] = address
} else if store == RingStoreEtcd {
flags["-etcd.endpoints"] = address
flags["-store-gateway.sharding-ring.etcd.endpoints"] = address
}

if image == "" {
image = GetDefaultImage()
}
Expand All @@ -118,8 +150,7 @@ func NewStoreGatewayWithConfigFile(name, consulAddress, configFile string, flags
"-log.level": "warn",
// Store-gateway ring backend.
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.store": "consul",
"-store-gateway.sharding-ring.consul.hostname": consulAddress,
"-store-gateway.sharding-ring.store": string(store),
"-store-gateway.sharding-ring.replication-factor": "1",
// Startup quickly.
"-store-gateway.sharding-ring.wait-stability-min-duration": "0",
Expand All @@ -131,14 +162,23 @@ func NewStoreGatewayWithConfigFile(name, consulAddress, configFile string, flags
)
}

func NewIngester(name string, consulAddress string, flags map[string]string, image string) *CortexService {
return NewIngesterWithConfigFile(name, consulAddress, "", flags, image)
func NewIngester(name string, store RingStore, address string, flags map[string]string, image string) *CortexService {
return NewIngesterWithConfigFile(name, store, address, "", flags, image)
}

func NewIngesterWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService {
func NewIngesterWithConfigFile(name string, store RingStore, address, configFile string, flags map[string]string, image string) *CortexService {
if configFile != "" {
flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
}

// Configure the ingesters ring backend
flags["-ring.store"] = string(store)
if store == RingStoreConsul {
flags["-consul.hostname"] = address
} else if store == RingStoreEtcd {
flags["-etcd.endpoints"] = address
}

if image == "" {
image = GetDefaultImage()
}
Expand All @@ -155,9 +195,6 @@ func NewIngesterWithConfigFile(name, consulAddress, configFile string, flags map
"-ingester.concurrent-flushes": "10",
"-ingester.max-transfer-retries": "10",
"-ingester.num-tokens": "512",
// Configure the ingesters ring backend
"-ring.store": "consul",
"-consul.hostname": consulAddress,
}, flags))...),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
httpPort,
Expand Down
8 changes: 4 additions & 4 deletions integration/ingester_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func TestIngesterGlobalLimits(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags, "")
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags, "")
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))

// Wait until distributor has updated the ring.
Expand Down
10 changes: 5 additions & 5 deletions integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func TestIngesterSharding(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags, "")
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags, "")
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingesters := e2ecortex.NewCompositeCortexService(ingester1, ingester2, ingester3)
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))

// Wait until distributor and queriers have updated the ring.
Expand Down
6 changes: 3 additions & 3 deletions integration/querier_remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestQuerierRemoteRead(t *testing.T) {
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))

// Start Cortex components for the write path.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
Expand All @@ -63,7 +63,7 @@ func TestQuerierRemoteRead(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
Expand Down
8 changes: 4 additions & 4 deletions integration/querier_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
}

// Start all other services.
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
querier1 := e2ecortex.NewQuerier("querier-1", consul.NetworkHTTPEndpoint(), flags, "")
querier2 := e2ecortex.NewQuerier("querier-2", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier1 := e2ecortex.NewQuerier("querier-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier2 := e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

require.NoError(t, s.StartAndWaitReady(querier1, querier2, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))
Expand Down
Loading