diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5def8a0cbcb3..7654a1385e71 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -135,8 +135,6 @@ func (t *Loki) initServer() (services.Service, error) { } func (t *Loki) initRing() (_ services.Service, err error) { - t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ingester.RingKey, util_log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", prometheus.DefaultRegisterer)) if err != nil { return @@ -164,6 +162,19 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) { var err error t.runtimeConfig, err = runtimeconfig.New(t.Cfg.RuntimeConfig, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), util_log.Logger) t.TenantLimits = newtenantLimitsFromRuntimeConfig(t.runtimeConfig) + + // Update config fields using runtime config. Only if multiKV is used for given ring these returned functions will be + // called and register the listener. + // + // By doing the initialization here instead of per-module init function, we avoid the problem + // of projects based on Loki forgetting the wiring if they override module's init method (they also don't have access to private symbols). + t.Cfg.CompactorConfig.CompactorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.IndexGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.QueryScheduler.SchedulerRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.Ruler.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + return t.runtimeConfig, err } @@ -194,8 +205,6 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { } func (t *Loki) initDistributor() (services.Service, error) { - t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV var err error t.distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer) if err != nil { @@ -315,8 +324,6 @@ func (t *Loki) initQuerier() (services.Service, error) { } func (t *Loki) initIngester() (_ services.Service, err error) { - t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer) @@ -735,7 +742,6 @@ func (t *Loki) initRuler() (_ services.Service, err error) { } t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV deleteStore, err := t.deleteRequestsStore() if err != nil { @@ -814,13 +820,20 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { dnsProvider := dns.NewProvider(util_log.Logger, dnsProviderReg, dns.GolangResolverType) t.MemberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV, util_log.Logger, dnsProvider, reg) + + t.Cfg.CompactorConfig.CompactorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + return t.MemberlistKV, nil } func (t *Loki) initCompactor() (services.Service, error) { // Set some config sections from other config sections in the config struct t.Cfg.CompactorConfig.CompactorRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.Cfg.CompactorConfig.CompactorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { level.Info(util_log.Logger).Log("msg", "Not using boltdb-shipper index, not starting compactor") @@ -853,7 +866,6 @@ func (t *Loki) initCompactor() (services.Service, error) { } func (t *Loki) initIndexGateway() (services.Service, error) { - t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant, prometheus.DefaultRegisterer) @@ -875,7 +887,6 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { } t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly - t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort managerMode := indexgateway.ClientMode @@ -897,7 +908,6 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { func (t *Loki) initQueryScheduler() (services.Service, error) { // Set some config sections from other config sections in the config struct t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index b29ab27ff4ac..c0c905cd71ab 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -1,10 +1,20 @@ package loki import ( + "path/filepath" "testing" "time" + "github.com/grafana/dskit/flagext" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/shipper" ) func Test_calculateMaxLookBack(t *testing.T) { @@ -81,3 +91,101 @@ func Test_calculateMaxLookBack(t *testing.T) { }) } } + +func prepareGlobalMetricsRegistry(t *testing.T) { + oldReg, oldGat := prometheus.DefaultRegisterer, prometheus.DefaultGatherer + + reg := prometheus.NewRegistry() + prometheus.DefaultRegisterer, prometheus.DefaultGatherer = reg, reg + + t.Cleanup(func() { + prometheus.DefaultRegisterer, prometheus.DefaultGatherer = oldReg, oldGat + }) +} + +func TestMultiKVSetup(t *testing.T) { + dir := t.TempDir() + + for target, checkFn := range map[string]func(t *testing.T, c Config){ + All: func(t *testing.T, c Config) { + require.NotNil(t, c.CompactorConfig.CompactorRing.KVStore.Multi.ConfigProvider) + require.NotNil(t, c.Distributor.DistributorRing.KVStore.Multi.ConfigProvider) + require.NotNil(t, c.IndexGateway.Ring.KVStore.Multi.ConfigProvider) + require.NotNil(t, c.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider) + require.NotNil(t, c.QueryScheduler.SchedulerRing.KVStore.Multi.ConfigProvider) + require.NotNil(t, c.Ruler.Ring.KVStore.Multi.ConfigProvider) + }, + + Compactor: func(t *testing.T, c Config) { + require.NotNil(t, c.CompactorConfig.CompactorRing.KVStore.Multi.ConfigProvider) + }, + + Distributor: func(t *testing.T, c Config) { + require.NotNil(t, c.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider) + }, + + IndexGateway: func(t *testing.T, c Config) { + require.NotNil(t, c.IndexGateway.Ring.KVStore.Multi.ConfigProvider) + }, + + Ingester: func(t *testing.T, c Config) { + require.NotNil(t, c.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider) + }, + + QueryScheduler: func(t *testing.T, c Config) { + require.NotNil(t, c.QueryScheduler.SchedulerRing.KVStore.Multi.ConfigProvider) + }, + + Ruler: func(t *testing.T, c Config) { + require.NotNil(t, c.Ruler.Ring.KVStore.Multi.ConfigProvider) + }, + } { + t.Run(target, func(t *testing.T) { + prepareGlobalMetricsRegistry(t) + + cfg := Config{} + cfg.SchemaConfig = config.SchemaConfig{ + Configs: []config.PeriodConfig{ + { + IndexType: config.StorageTypeInMemory, + ObjectType: config.StorageTypeFileSystem, + RowShards: 16, + Schema: "v11", + From: config.DayTime{ + Time: model.Now(), + }, + }, + }, + } + flagext.DefaultValues(&cfg) + // Set to 0 to find any free port. + cfg.Server.HTTPListenPort = 0 + cfg.Server.GRPCListenPort = 0 + cfg.Target = []string{target} + + // Must be set, otherwise MultiKV config provider will not be set. + cfg.RuntimeConfig.LoadPath = filepath.Join(dir, "config.yaml") + + // This would be overwritten by the default values setting. + cfg.StorageConfig = storage.Config{ + FSConfig: local.FSConfig{Directory: dir}, + BoltDBShipperConfig: shipper.Config{ + SharedStoreType: config.StorageTypeFileSystem, + ActiveIndexDirectory: dir, + CacheLocation: dir, + Mode: shipper.ModeWriteOnly}, + } + cfg.Ruler.Config.StoreConfig.Type = config.StorageTypeLocal + cfg.Ruler.Config.StoreConfig.Local.Directory = dir + + c, err := New(cfg) + require.NoError(t, err) + + _, err = c.ModuleManager.InitModuleServices(cfg.Target...) + require.NoError(t, err) + defer c.Server.Stop() + + checkFn(t, c.Cfg) + }) + } +} diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index d7752e9fd056..2e652e492847 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -36,6 +36,7 @@ const ( StorageTypeGCPColumnKey = "gcp-columnkey" StorageTypeGCS = "gcs" StorageTypeGrpc = "grpc-store" + StorageTypeLocal = "local" StorageTypeS3 = "s3" StorageTypeSwift = "swift" // BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage