From 515f82ee1ed2d80e7ca17dc64a2d95ae1b98570b Mon Sep 17 00:00:00 2001 From: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Date: Tue, 20 Apr 2021 08:52:18 +0100 Subject: [PATCH] Add interceptor override and make ingester and cfg public (#3618) * Add interceptor override and make ingester and cfg public Signed-off-by: Michel Hollands * Remove extraneous comment Signed-off-by: Michel Hollands --- pkg/ingester/client/client.go | 37 +++--- pkg/loki/loki.go | 34 +++--- pkg/loki/modules.go | 212 +++++++++++++++++----------------- 3 files changed, 146 insertions(+), 137 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 9ff7d3ffb611f..b24f056ff2cdc 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -42,9 +42,11 @@ type ClosableHealthAndIngesterClient struct { // Config for an ingester client. type Config struct { - PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"` - RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"` - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"` + RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"` + GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"` } // RegisterFlags registers flags. @@ -63,7 +65,7 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) { grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...), } - dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation()) + dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(&cfg)) if err != nil { return nil, err } @@ -82,14 +84,21 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) { }, nil } -func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { - return []grpc.UnaryClientInterceptor{ - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration), - }, []grpc.StreamClientInterceptor{ - otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), - middleware.StreamClientUserHeaderInterceptor, - cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration), - } +func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + var unaryInterceptors []grpc.UnaryClientInterceptor + unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...) + unaryInterceptors = append(unaryInterceptors, + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration), + ) + var streamInterceptors []grpc.StreamClientInterceptor + streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...) + streamInterceptors = append(streamInterceptors, + otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), + middleware.StreamClientUserHeaderInterceptor, + cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration), + ) + + return unaryInterceptors, streamInterceptors } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 7a01dce165f9d..f4eacb9d2659a 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -143,7 +143,7 @@ func (c *Config) Validate() error { // Loki is the root datastructure for Loki. type Loki struct { - cfg Config + Cfg Config // set during initialization ModuleManager *modules.Manager @@ -154,7 +154,7 @@ type Loki struct { overrides *validation.Overrides tenantConfigs *runtime.TenantConfigs distributor *distributor.Distributor - ingester *ingester.Ingester + Ingester *ingester.Ingester Querier *querier.Querier ingesterQuerier *querier.IngesterQuerier Store storage.Store @@ -175,28 +175,28 @@ type Loki struct { // New makes a new Loki. func New(cfg Config) (*Loki, error) { loki := &Loki{ - cfg: cfg, + Cfg: cfg, } loki.setupAuthMiddleware() if err := loki.setupModuleManager(); err != nil { return nil, err } - storage.RegisterCustomIndexClients(&loki.cfg.StorageConfig, prometheus.DefaultRegisterer) + storage.RegisterCustomIndexClients(&loki.Cfg.StorageConfig, prometheus.DefaultRegisterer) return loki, nil } func (t *Loki) setupAuthMiddleware() { - t.cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor} - t.cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor} - if t.cfg.AuthEnabled { - t.cfg.Server.GRPCMiddleware = append(t.cfg.Server.GRPCMiddleware, middleware.ServerUserHeaderInterceptor) - t.cfg.Server.GRPCStreamMiddleware = append(t.cfg.Server.GRPCStreamMiddleware, GRPCStreamAuthInterceptor) + t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor} + t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor} + if t.Cfg.AuthEnabled { + t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, middleware.ServerUserHeaderInterceptor) + t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, GRPCStreamAuthInterceptor) t.HTTPAuthMiddleware = middleware.AuthenticateUser } else { - t.cfg.Server.GRPCMiddleware = append(t.cfg.Server.GRPCMiddleware, fakeGRPCAuthUnaryMiddleware) - t.cfg.Server.GRPCStreamMiddleware = append(t.cfg.Server.GRPCStreamMiddleware, fakeGRPCAuthStreamMiddleware) + t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, fakeGRPCAuthUnaryMiddleware) + t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, fakeGRPCAuthStreamMiddleware) t.HTTPAuthMiddleware = fakeHTTPAuthMiddleware } } @@ -224,7 +224,7 @@ func newDefaultConfig() *Config { // Run starts Loki running, and blocks until a Loki stops. func (t *Loki) Run() error { - serviceMap, err := t.ModuleManager.InitModuleServices(t.cfg.Target) + serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target) if err != nil { return err } @@ -247,7 +247,7 @@ func (t *Loki) Run() error { t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm)) // This adds a way to see the config and the changes compared to the defaults - t.Server.HTTP.Path("/config").HandlerFunc(configHandler(t.cfg, newDefaultConfig())) + t.Server.HTTP.Path("/config").HandlerFunc(configHandler(t.Cfg, newDefaultConfig())) t.Server.HTTP.Path("/debug/fgprof").Handler(fgprof.Handler()) @@ -325,8 +325,8 @@ func (t *Loki) readyHandler(sm *services.Manager) http.HandlerFunc { // Ingester has a special check that makes sure that it was able to register into the ring, // and that all other ring entries are OK too. - if t.ingester != nil { - if err := t.ingester.CheckReady(r.Context()); err != nil { + if t.Ingester != nil { + if err := t.Ingester.CheckReady(r.Context()); err != nil { http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable) return } @@ -384,13 +384,13 @@ func (t *Loki) setupModuleManager() error { } // Add IngesterQuerier as a dependency for store when target is either ingester or querier. - if t.cfg.Target == Querier || t.cfg.Target == Ruler { + if t.Cfg.Target == Querier || t.Cfg.Target == Ruler { deps[Store] = append(deps[Store], IngesterQuerier) } // If we are running Loki with boltdb-shipper as a single binary, without clustered mode(which should always be the case when using inmemory ring), // we should start compactor as well for better user experience. - if storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) && t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" { + if storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" { deps[All] = append(deps[All], Compactor) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index e52169264ec57..0bfe498220439 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -80,8 +80,8 @@ const ( func (t *Loki) initServer() (services.Service, error) { // Loki handles signals on its own. - cortex.DisableSignalHandling(&t.cfg.Server) - serv, err := server.New(t.cfg.Server) + cortex.DisableSignalHandling(&t.Cfg.Server) + serv, err := server.New(t.Cfg.Server) if err != nil { return nil, err } @@ -105,9 +105,9 @@ func (t *Loki) initServer() (services.Service, error) { } func (t *Loki) initRing() (_ services.Service, err error) { - t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - t.ring, err = ring.New(t.cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV + t.ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer) if err != nil { return } @@ -117,28 +117,28 @@ func (t *Loki) initRing() (_ services.Service, err error) { } func (t *Loki) initRuntimeConfig() (services.Service, error) { - if t.cfg.RuntimeConfig.LoadPath == "" { - t.cfg.RuntimeConfig.LoadPath = t.cfg.LimitsConfig.PerTenantOverrideConfig - t.cfg.RuntimeConfig.ReloadPeriod = t.cfg.LimitsConfig.PerTenantOverridePeriod + if t.Cfg.RuntimeConfig.LoadPath == "" { + t.Cfg.RuntimeConfig.LoadPath = t.Cfg.LimitsConfig.PerTenantOverrideConfig + t.Cfg.RuntimeConfig.ReloadPeriod = t.Cfg.LimitsConfig.PerTenantOverridePeriod } - if t.cfg.RuntimeConfig.LoadPath == "" { + if t.Cfg.RuntimeConfig.LoadPath == "" { // no need to initialize module if load path is empty return nil, nil } - t.cfg.RuntimeConfig.Loader = loadRuntimeConfig + t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig // make sure to set default limits before we start loading configuration into memory - validation.SetDefaultLimitsForYAMLUnmarshalling(t.cfg.LimitsConfig) + validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig) var err error - t.runtimeConfig, err = runtimeconfig.NewRuntimeConfigManager(t.cfg.RuntimeConfig, prometheus.DefaultRegisterer) + t.runtimeConfig, err = runtimeconfig.NewRuntimeConfigManager(t.Cfg.RuntimeConfig, prometheus.DefaultRegisterer) return t.runtimeConfig, err } func (t *Loki) initOverrides() (_ services.Service, err error) { - t.overrides, err = validation.NewOverrides(t.cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig)) + t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig)) // overrides are not a service, since they don't have any operational state. return nil, err } @@ -150,15 +150,15 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { } func (t *Loki) initDistributor() (services.Service, error) { - t.cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV + t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV var err error - t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer) + t.distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer) if err != nil { return nil, err } - if t.cfg.Target != All { + if t.Cfg.Target != All { logproto.RegisterPusherServer(t.Server.GRPC, t.distributor) } @@ -179,21 +179,21 @@ func (t *Loki) initQuerier() (services.Service, error) { ) // NewQuerierWorker now expects Frontend (or Scheduler) address to be set. Loki only supports Frontend for now. - if t.cfg.Worker.FrontendAddress != "" { + if t.Cfg.Worker.FrontendAddress != "" { // In case someone set scheduler address, we ignore it. - t.cfg.Worker.SchedulerAddress = "" - t.cfg.Worker.MaxConcurrentRequests = t.cfg.Querier.MaxConcurrent - level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker)) - worker, err = cortex_querier_worker.NewQuerierWorker(t.cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer) + t.Cfg.Worker.SchedulerAddress = "" + t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent + level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.Cfg.Worker)) + worker, err = cortex_querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } } - if t.cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { - t.cfg.Querier.IngesterQueryStoreMaxLookback = t.cfg.Ingester.QueryStoreMaxLookBackPeriod + if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { + t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod } - t.Querier, err = querier.New(t.cfg.Querier, t.Store, t.ingesterQuerier, t.overrides) + t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides) if err != nil { return nil, err } @@ -223,57 +223,57 @@ func (t *Loki) initQuerier() (services.Service, error) { } func (t *Loki) initIngester() (_ services.Service, err error) { - t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV + t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort - t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer) if err != nil { return } - logproto.RegisterPusherServer(t.Server.GRPC, t.ingester) - logproto.RegisterQuerierServer(t.Server.GRPC, t.ingester) - logproto.RegisterIngesterServer(t.Server.GRPC, t.ingester) - grpc_health_v1.RegisterHealthServer(t.Server.GRPC, t.ingester) - t.Server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) - t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler)) - return t.ingester, nil + logproto.RegisterPusherServer(t.Server.GRPC, t.Ingester) + logproto.RegisterQuerierServer(t.Server.GRPC, t.Ingester) + logproto.RegisterIngesterServer(t.Server.GRPC, t.Ingester) + grpc_health_v1.RegisterHealthServer(t.Server.GRPC, t.Ingester) + t.Server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.Ingester.FlushHandler)) + t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(http.HandlerFunc(t.Ingester.ShutdownHandler)) + return t.Ingester, nil } func (t *Loki) initTableManager() (services.Service, error) { - err := t.cfg.SchemaConfig.Load() + err := t.Cfg.SchemaConfig.Load() if err != nil { return nil, err } // Assume the newest config is the one to use - lastConfig := &t.cfg.SchemaConfig.Configs[len(t.cfg.SchemaConfig.Configs)-1] - - if (t.cfg.TableManager.ChunkTables.WriteScale.Enabled || - t.cfg.TableManager.IndexTables.WriteScale.Enabled || - t.cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || - t.cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || - t.cfg.TableManager.ChunkTables.ReadScale.Enabled || - t.cfg.TableManager.IndexTables.ReadScale.Enabled || - t.cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || - t.cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && - t.cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "" { + lastConfig := &t.Cfg.SchemaConfig.Configs[len(t.Cfg.SchemaConfig.Configs)-1] + + if (t.Cfg.TableManager.ChunkTables.WriteScale.Enabled || + t.Cfg.TableManager.IndexTables.WriteScale.Enabled || + t.Cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || + t.Cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || + t.Cfg.TableManager.ChunkTables.ReadScale.Enabled || + t.Cfg.TableManager.IndexTables.ReadScale.Enabled || + t.Cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || + t.Cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && + t.Cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "" { level.Error(util_log.Logger).Log("msg", "WriteScale is enabled but no Metrics URL has been provided") os.Exit(1) } reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer) - tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig.Config, reg) + tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg) if err != nil { return nil, err } - bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig.Config) + bucketClient, err := storage.NewBucketClient(t.Cfg.StorageConfig.Config) util_log.CheckFatal("initializing bucket client", err) - t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer) + t.tableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -284,19 +284,19 @@ func (t *Loki) initTableManager() (services.Service, error) { func (t *Loki) initStore() (_ services.Service, err error) { // If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache. // This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data. - if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) { - t.cfg.ChunkStoreConfig.DisableIndexDeduplication = true - t.cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{} + if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { + t.Cfg.ChunkStoreConfig.DisableIndexDeduplication = true + t.Cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{} } - if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) { - t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID - switch t.cfg.Target { + if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { + t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID + switch t.Cfg.Target { case Ingester: // We do not want ingester to unnecessarily keep downloading files - t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly + t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly // Use fifo cache for caching index in memory. - t.cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{ + t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{ EnableFifoCache: true, Fifocache: cache.FifoCacheConfig{ MaxSizeBytes: "200 MB", @@ -304,52 +304,52 @@ func (t *Loki) initStore() (_ services.Service, err error) { // This is usually set in StorageConfig.IndexCacheValidity but since this is exclusively used for caching the index entries, // I(Sandeep) am setting it here which also helps reduce some CPU cycles and allocations required for // unmarshalling the cached data to check the expiry. - Validity: t.cfg.StorageConfig.IndexCacheValidity - 1*time.Minute, + Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute, }, } - t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute + t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute case Querier, Ruler: // We do not want query to do any updates to index - t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly + t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly default: - t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite - t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute + t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite + t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute } } - chunkStore, err := cortex_storage.NewStore(t.cfg.StorageConfig.Config, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger) + chunkStore, err := cortex_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger) if err != nil { return } - if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) { - boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.cfg) - switch t.cfg.Target { + if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { + boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg) + switch t.Cfg.Target { case Querier, Ruler: // Use AsyncStore to query both ingesters local store and chunk store for store queries. // Only queriers should use the AsyncStore, it should never be used in ingesters. chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier, - calculateAsyncStoreQueryIngestersWithin(t.cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration), + calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration), ) case All: // We want ingester to also query the store when using boltdb-shipper but only when running with target All. // We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store. // ToDo: See if we can avoid doing this when not running loki in clustered mode. - t.cfg.Ingester.QueryStore = true - boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.cfg.SchemaConfig.Configs) - if t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType { + t.Cfg.Ingester.QueryStore = true + boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs) + if t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType { boltdbShipperConfigIdx++ } - mlb, err := calculateMaxLookBack(t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.cfg.Ingester.QueryStoreMaxLookBackPeriod, + mlb, err := calculateMaxLookBack(t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.Cfg.Ingester.QueryStoreMaxLookBackPeriod, boltdbShipperMinIngesterQueryStoreDuration) if err != nil { return nil, err } - t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb + t.Cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb } } - t.Store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.SchemaConfig, chunkStore, prometheus.DefaultRegisterer) + t.Store, err = loki_storage.NewStore(t.Cfg.StorageConfig, t.Cfg.SchemaConfig, chunkStore, prometheus.DefaultRegisterer) if err != nil { return } @@ -361,7 +361,7 @@ func (t *Loki) initStore() (_ services.Service, err error) { } func (t *Loki) initIngesterQuerier() (_ services.Service, err error) { - t.ingesterQuerier, err = querier.NewIngesterQuerier(t.cfg.IngesterClient, t.ring, t.cfg.Querier.ExtraQueryDelay) + t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay) if err != nil { return nil, err } @@ -375,15 +375,15 @@ type disabledShuffleShardingLimits struct{} func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { return 0 } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { - level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.cfg.Frontend)) + level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend)) roundTripper, frontendV1, _, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{ // Don't set FrontendV2 field to make sure that only frontendV1 can be initialized. - Handler: t.cfg.Frontend.Handler, - FrontendV1: t.cfg.Frontend.FrontendV1, - CompressResponses: t.cfg.Frontend.CompressResponses, - DownstreamURL: t.cfg.Frontend.DownstreamURL, - }, disabledShuffleShardingLimits{}, t.cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer) + Handler: t.Cfg.Frontend.Handler, + FrontendV1: t.Cfg.Frontend.FrontendV1, + CompressResponses: t.Cfg.Frontend.CompressResponses, + DownstreamURL: t.Cfg.Frontend.DownstreamURL, + }, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -393,15 +393,15 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { } level.Debug(util_log.Logger).Log("msg", "initializing query range tripperware", - "config", fmt.Sprintf("%+v", t.cfg.QueryRange), - "limits", fmt.Sprintf("%+v", t.cfg.LimitsConfig), + "config", fmt.Sprintf("%+v", t.Cfg.QueryRange), + "limits", fmt.Sprintf("%+v", t.Cfg.LimitsConfig), ) tripperware, stopper, err := queryrange.NewTripperware( - t.cfg.QueryRange, + t.Cfg.QueryRange, util_log.Logger, t.overrides, - t.cfg.SchemaConfig.SchemaConfig, - t.cfg.Querier.QueryIngestersWithin, + t.Cfg.SchemaConfig.SchemaConfig, + t.Cfg.Querier.QueryIngestersWithin, prometheus.DefaultRegisterer, ) if err != nil { @@ -414,8 +414,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { roundTripper = t.QueryFrontEndTripperware(roundTripper) } - frontendHandler := transport.NewHandler(t.cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer) - if t.cfg.Frontend.CompressResponses { + frontendHandler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer) + if t.Cfg.Frontend.CompressResponses { frontendHandler = gziphandler.GzipHandler(frontendHandler) } @@ -428,12 +428,12 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { ).Wrap(frontendHandler) var defaultHandler http.Handler - if t.cfg.Frontend.TailProxyURL != "" { + if t.Cfg.Frontend.TailProxyURL != "" { httpMiddleware := middleware.Merge( t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, ) - tailURL, err := url.Parse(t.cfg.Frontend.TailProxyURL) + tailURL, err := url.Parse(t.Cfg.Frontend.TailProxyURL) if err != nil { return nil, err } @@ -485,26 +485,26 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) { // unfortunately there is no way to generate a "default" config and compare default against actual // to determine if it's unconfigured. the following check, however, correctly tests this. // Single binary integration tests will break if this ever drifts - if t.cfg.Target == All && t.cfg.Ruler.StoreConfig.IsDefaults() { + if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() { level.Info(util_log.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.") return } // Loki doesn't support the configdb backend, but without excessive mangling/refactoring // it's hard to enforce this at validation time. Therefore detect this and fail early. - if t.cfg.Ruler.StoreConfig.Type == "configdb" { + if t.Cfg.Ruler.StoreConfig.Type == "configdb" { return nil, errors.New("configdb is not supported as a Loki rules backend type") } // Make sure storage directory exists if using filesystem store - if t.cfg.Ruler.StoreConfig.Type == "local" && t.cfg.Ruler.StoreConfig.Local.Directory != "" { - err := chunk_util.EnsureDirectory(t.cfg.Ruler.StoreConfig.Local.Directory) + if t.Cfg.Ruler.StoreConfig.Type == "local" && t.Cfg.Ruler.StoreConfig.Local.Directory != "" { + err := chunk_util.EnsureDirectory(t.Cfg.Ruler.StoreConfig.Local.Directory) if err != nil { return nil, err } } - t.RulerStorage, err = cortex_ruler.NewLegacyRuleStore(t.cfg.Ruler.StoreConfig, manager.GroupLoader{}, util_log.Logger) + t.RulerStorage, err = cortex_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, manager.GroupLoader{}, util_log.Logger) return } @@ -515,17 +515,17 @@ func (t *Loki) initRuler() (_ services.Service, err error) { return nil, nil } - t.cfg.Ruler.Ring.ListenPort = t.cfg.Server.GRPCListenPort - t.cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - q, err := querier.New(t.cfg.Querier, t.Store, t.ingesterQuerier, t.overrides) + t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV + q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides) if err != nil { return nil, err } - engine := logql.NewEngine(t.cfg.Querier.Engine, q, t.overrides) + engine := logql.NewEngine(t.Cfg.Querier.Engine, q, t.overrides) t.ruler, err = ruler.NewRuler( - t.cfg.Ruler, + t.Cfg.Ruler, engine, prometheus.DefaultRegisterer, util_log.Logger, @@ -540,7 +540,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) { t.rulerAPI = cortex_ruler.NewAPI(t.ruler, t.RulerStorage, util_log.Logger) // Expose HTTP endpoints. - if t.cfg.Ruler.EnableAPI { + if t.Cfg.Ruler.EnableAPI { t.Server.HTTP.Handle("/ruler/ring", t.ruler) cortex_ruler.RegisterRulerServer(t.Server.GRPC, t.ruler) @@ -570,18 +570,18 @@ func (t *Loki) initRuler() (_ services.Service, err error) { } func (t *Loki) initMemberlistKV() (services.Service, error) { - t.cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer - t.cfg.MemberlistKV.Codecs = []codec.Codec{ + t.Cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer + t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), } - t.memberlistKV = memberlist.NewKVInitService(&t.cfg.MemberlistKV, util_log.Logger) + t.memberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV, util_log.Logger) return t.memberlistKV, nil } func (t *Loki) initCompactor() (services.Service, error) { var err error - t.compactor, err = compactor.NewCompactor(t.cfg.CompactorConfig, t.cfg.StorageConfig.Config, prometheus.DefaultRegisterer) + t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, prometheus.DefaultRegisterer) if err != nil { return nil, err }