Skip to content

support for cassandra in integration tests #2275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ jobs:
docker pull consul:0.9
docker pull quay.io/cortexproject/cortex:v0.6.0
docker pull shopify/bigtable-emulator:0.1.0
docker pull rinscy/cassandra:3.11.0
- run:
name: Integration Tests
command: |
Expand Down
2 changes: 1 addition & 1 deletion integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForOldImage, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flagsForOldImage, "")
// Old ring didn't have /ready probe, use /ring instead.
distributor.SetReadinessProbe(e2e.NewReadinessProbe(distributor.HTTPPort(), "/ring", 200))
distributor.SetReadinessProbe(e2e.NewHTTPReadinessProbe(distributor.HTTPPort(), "/ring", 200))
require.NoError(t, s.StartAndWaitReady(distributor, ingester1))

// Wait until the distributor has updated the ring.
Expand Down
18 changes: 12 additions & 6 deletions integration/chunks_storage_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
// Start dependencies.
dynamo := e2edb.NewDynamoDB()
bigtable := e2edb.NewBigtable()
cassandra := e2edb.NewCassandra()

stores := []string{"aws-dynamo", "bigtable"}
stores := []string{"aws-dynamo", "bigtable", "cassandra"}
perStoreDuration := 14 * 24 * time.Hour

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(dynamo, bigtable, consul))
require.NoError(t, s.StartAndWaitReady(cassandra, dynamo, bigtable, consul))

// lets build config for each type of Index Store.
now := time.Now()
Expand All @@ -39,6 +40,11 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
storeConfigs[i] = storeConfig{From: oldestStoreStartTime.Add(time.Duration(i) * perStoreDuration).Format("2006-01-02"), IndexStore: store}
}

storageFlags := mergeFlags(ChunksStorageFlags, map[string]string{
"-cassandra.addresses": cassandra.NetworkHTTPEndpoint(),
"-cassandra.keyspace": "tests", // keyspace gets created on startup if it does not exist
})

// bigtable client needs to set an environment variable when connecting to an emulator
bigtableFlag := map[string]string{"BIGTABLE_EMULATOR_HOST": bigtable.NetworkHTTPEndpoint()}

Expand All @@ -47,7 +53,7 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
for i := range storeConfigs {
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(buildSchemaConfigWith(storeConfigs[i:i+1]))))

tableManager := e2ecortex.NewTableManager("table-manager", mergeFlags(ChunksStorageFlags, map[string]string{
tableManager := e2ecortex.NewTableManager("table-manager", mergeFlags(storageFlags, map[string]string{
"-table-manager.retention-period": "2520h", // setting retention high enough
}), "")
tableManager.HTTPService.SetEnvVars(bigtableFlag)
Expand All @@ -62,13 +68,13 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
// Start rest of the Cortex components.
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(buildSchemaConfigWith(storeConfigs))))

ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), mergeFlags(storageFlags, map[string]string{
"-ingester.retain-period": "0s", // we want to make ingester not retain any chunks in memory after they are flushed so that queries get data only from the store
}), "")
ingester.HTTPService.SetEnvVars(bigtableFlag)

distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), storageFlags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), storageFlags, "")
querier.HTTPService.SetEnvVars(bigtableFlag)

require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
Expand Down
19 changes: 16 additions & 3 deletions integration/e2e/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewMinio(port int, bktName string) *e2e.HTTPService {
"minio/minio:RELEASE.2019-12-30T05-45-39Z",
// Create the "cortex" bucket before starting minio
e2e.NewCommandWithoutEntrypoint("sh", "-c", fmt.Sprintf("mkdir -p /data/%s && minio server --address :%v --quiet /data", bktName, port)),
e2e.NewReadinessProbe(port, "/minio/health/ready", 200),
e2e.NewHTTPReadinessProbe(port, "/minio/health/ready", 200),
port,
)
m.SetEnvVars(map[string]string{
Expand Down Expand Up @@ -78,7 +78,7 @@ func NewDynamoDB() *e2e.HTTPService {
"amazon/dynamodb-local:1.11.477",
e2e.NewCommand("-jar", "DynamoDBLocal.jar", "-inMemory", "-sharedDb"),
// DynamoDB doesn't have a readiness probe, so we check if the / works even if returns 400
e2e.NewReadinessProbe(8000, "/", 400),
e2e.NewHTTPReadinessProbe(8000, "/", 400),
8000,
)
}
Expand All @@ -90,8 +90,21 @@ func NewBigtable() *e2e.HTTPService {
// If you change the image tag, remember to update it in the preloading done
// by CircleCI too (see .circleci/config.yml).
"shopify/bigtable-emulator:0.1.0",
e2e.NewCommand(""),
nil,
nil,
9035,
)
}

func NewCassandra() *e2e.HTTPService {
return e2e.NewHTTPService(
"cassandra",
// If you change the image tag, remember to update it in the preloading done
// by CircleCI too (see .circleci/config.yml).
"rinscy/cassandra:3.11.0",
nil,
// readiness probe inspired from https://github.com/kubernetes/examples/blob/b86c9d50be45eaf5ce74dee7159ce38b0e149d38/cassandra/image/files/ready-probe.sh
e2e.NewCmdReadinessProbe(e2e.NewCommand("bash", "-c", "nodetool status | grep UN")),
9042,
)
}
80 changes: 61 additions & 19 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -37,7 +38,7 @@ type ConcreteService struct {
env map[string]string
user string
command *Command
readiness *ReadinessProbe
readiness ReadinessProbe

// Maps container ports to dynamically binded local ports.
networkPortsContainerToLocal map[int]int
Expand All @@ -54,7 +55,7 @@ func NewConcreteService(
name string,
image string,
command *Command,
readiness *ReadinessProbe,
readiness ReadinessProbe,
networkPorts ...int,
) *ConcreteService {
return &ConcreteService{
Expand Down Expand Up @@ -218,7 +219,7 @@ func (s *ConcreteService) NetworkEndpointFor(networkName string, port int) strin
return fmt.Sprintf("%s:%d", containerName(networkName, s.name), port)
}

func (s *ConcreteService) SetReadinessProbe(probe *ReadinessProbe) {
func (s *ConcreteService) SetReadinessProbe(probe ReadinessProbe) {
s.readiness = probe
}

Expand All @@ -232,13 +233,7 @@ func (s *ConcreteService) Ready() error {
return nil
}

// Map the container port to the local port
localPort, ok := s.networkPortsContainerToLocal[s.readiness.port]
if !ok {
return fmt.Errorf("unknown port %d configured in the readiness probe", s.readiness.port)
}

return s.readiness.Ready(localPort)
return s.readiness.Ready(s)
}

func containerName(netName string, name string) string {
Expand Down Expand Up @@ -303,16 +298,37 @@ func (s *ConcreteService) buildDockerRunArgs(networkName, sharedDir string) []st
}

// Disable entrypoint if required
if s.command.entrypointDisabled {
if s.command != nil && s.command.entrypointDisabled {
args = append(args, "--entrypoint", "")
}

args = append(args, s.image)
args = append(args, s.command.cmd)
args = append(args, s.command.args...)

if s.command != nil {
args = append(args, s.command.cmd)
args = append(args, s.command.args...)
}

return args
}

func (s *ConcreteService) Exec(command *Command) (string, error) {
args := []string{"exec", s.containerName()}
args = append(args, command.cmd)
args = append(args, command.args...)

cmd := exec.Command("docker", args...)
var stdout bytes.Buffer
cmd.Stdout = &stdout

err := cmd.Run()
if err != nil {
return "", err
}

return stdout.String(), nil
}

type Command struct {
cmd string
args []string
Expand All @@ -334,22 +350,34 @@ func NewCommandWithoutEntrypoint(cmd string, args ...string) *Command {
}
}

type ReadinessProbe struct {
type ReadinessProbe interface {
Ready(service *ConcreteService) (err error)
}

// HTTPReadinessProbe checks readiness by making HTTP call and checking for expected HTTP status code
type HTTPReadinessProbe struct {
port int
path string
expectedStatus int
}

func NewReadinessProbe(port int, path string, expectedStatus int) *ReadinessProbe {
return &ReadinessProbe{
func NewHTTPReadinessProbe(port int, path string, expectedStatus int) *HTTPReadinessProbe {
return &HTTPReadinessProbe{
port: port,
path: path,
expectedStatus: expectedStatus,
}
}

func (p *ReadinessProbe) Ready(localPort int) (err error) {
res, err := GetRequest(fmt.Sprintf("http://localhost:%d%s", localPort, p.path))
func (p *HTTPReadinessProbe) Ready(service *ConcreteService) (err error) {
endpoint := service.Endpoint(p.port)
if endpoint == "" {
return fmt.Errorf("cannot get service endpoint for port %d", p.port)
} else if endpoint == "stopped" {
return errors.New("service has stopped")
}

res, err := GetRequest("http://" + endpoint + p.path)
if err != nil {
return err
}
Expand All @@ -363,6 +391,20 @@ func (p *ReadinessProbe) Ready(localPort int) (err error) {
return fmt.Errorf("got no expected status code: %v, expected: %v", res.StatusCode, p.expectedStatus)
}

// CmdReadinessProbe checks readiness by `Exec`ing a command (within container) which returns 0 to consider status being ready
type CmdReadinessProbe struct {
cmd *Command
}

func NewCmdReadinessProbe(cmd *Command) *CmdReadinessProbe {
return &CmdReadinessProbe{cmd: cmd}
}

func (p *CmdReadinessProbe) Ready(service *ConcreteService) error {
_, err := service.Exec(p.cmd)
return err
}

type LinePrefixWriter struct {
prefix string
wrapped io.Writer
Expand Down Expand Up @@ -398,7 +440,7 @@ func NewHTTPService(
name string,
image string,
command *Command,
readiness *ReadinessProbe,
readiness ReadinessProbe,
httpPort int,
otherPorts ...int,
) *HTTPService {
Expand Down
2 changes: 1 addition & 1 deletion integration/e2ecortex/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func NewCortexService(
name string,
image string,
command *e2e.Command,
readiness *e2e.ReadinessProbe,
readiness e2e.ReadinessProbe,
httpPort int,
grpcPort int,
otherPorts ...int,
Expand Down
14 changes: 7 additions & 7 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewDistributorWithConfigFile(name, consulAddress, configFile string, flags
"-ring.store": "consul",
"-consul.hostname": consulAddress,
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand Down Expand Up @@ -83,7 +83,7 @@ func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[
"-querier.frontend-client.backoff-retries": "1",
"-querier.worker-parallelism": "1",
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand Down Expand Up @@ -117,7 +117,7 @@ func NewIngesterWithConfigFile(name, consulAddress, configFile string, flags map
"-ring.store": "consul",
"-consul.hostname": consulAddress,
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand All @@ -143,7 +143,7 @@ func NewTableManagerWithConfigFile(name, configFile string, flags map[string]str
"-target": "table-manager",
"-log.level": "warn",
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand All @@ -169,7 +169,7 @@ func NewQueryFrontendWithConfigFile(name, configFile string, flags map[string]st
"-target": "query-frontend",
"-log.level": "warn",
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand All @@ -186,7 +186,7 @@ func NewSingleBinary(name string, flags map[string]string, image string, httpPor
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
"-log.level": "warn",
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
otherPorts...,
Expand All @@ -205,7 +205,7 @@ func NewAlertmanager(name string, flags map[string]string, image string) *Cortex
"-target": "alertmanager",
"-log.level": "warn",
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand Down