Skip to content

Refactoring: Create a common ring configuration #2391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions cmd/pyroscope/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,19 @@ Usage of ./pyroscope:
-distributor.ring.etcd.username string
Etcd username.
-distributor.ring.heartbeat-period duration
Period at which to heartbeat to the ring. 0 = disabled. (default 5s)
Period at which to heartbeat to the ring. 0 = disabled. (default 15s)
-distributor.ring.heartbeat-timeout duration
The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled). (default 1m0s)
-distributor.ring.instance-addr string
IP address to advertise in the ring.
IP address to advertise in the ring. Default is auto-detected.
-distributor.ring.instance-enable-ipv6
Enable using a IPv6 instance address. (default false)
-distributor.ring.instance-id string
Instance ID to register in the ring. (default "<hostname>")
-distributor.ring.instance-interface-names string
Name of network interface to read address from. (default [<private network interfaces>])
List of network interface names to look up when finding the instance IP address. (default [<private network interfaces>])
-distributor.ring.instance-port int
Port to advertise in the ring (defaults to server.http-listen-port). (default 4040)
Port to advertise in the ring (defaults to -server.http-listen-port). (default 4040)
-distributor.ring.multi.mirror-enabled
Mirror writes to secondary store.
-distributor.ring.multi.mirror-timeout duration
Expand Down Expand Up @@ -313,6 +315,8 @@ Usage of ./pyroscope:
The heartbeat timeout after which overrides-exporters are considered unhealthy within the ring. 0 = never (timeout disabled). (default 1m0s)
-overrides-exporter.ring.instance-addr string
IP address to advertise in the ring. Default is auto-detected.
-overrides-exporter.ring.instance-enable-ipv6
Enable using a IPv6 instance address. (default false)
-overrides-exporter.ring.instance-id string
Instance ID to register in the ring. (default "<hostname>")
-overrides-exporter.ring.instance-interface-names string
Expand Down Expand Up @@ -548,15 +552,17 @@ Usage of ./pyroscope:
-query-scheduler.ring.heartbeat-period duration
Period at which to heartbeat to the ring. 0 = disabled. (default 15s)
-query-scheduler.ring.heartbeat-timeout duration
The heartbeat timeout after which query-schedulers are considered unhealthy within the ring. When query-scheduler ring-based service discovery is enabled, this option needs be set on query-schedulers, query-frontends and queriers. (default 1m0s)
The heartbeat timeout after which query-scheduler are considered unhealthy within the ring. 0 = never (timeout disabled). (default 1m0s)
-query-scheduler.ring.instance-addr string
IP address to advertise in the ring. Default is auto-detected.
-query-scheduler.ring.instance-enable-ipv6
Enable using a IPv6 instance address. (default false)
-query-scheduler.ring.instance-id string
Instance ID to register in the ring. (default "<hostname>")
-query-scheduler.ring.instance-interface-names string
List of network interface names to look up when finding the instance IP address. (default [<private network interfaces>])
-query-scheduler.ring.instance-port int
Port to advertise in the ring (defaults to -server.grpc-listen-port).
Port to advertise in the ring (defaults to -server.http-listen-port). (default 4040)
-query-scheduler.ring.multi.mirror-enabled
Mirror writes to secondary store.
-query-scheduler.ring.multi.mirror-timeout duration
Expand Down Expand Up @@ -832,7 +838,7 @@ Usage of ./pyroscope:
-store-gateway.sharding-ring.heartbeat-period duration
Period at which to heartbeat to the ring. 0 = disabled. (default 15s)
-store-gateway.sharding-ring.heartbeat-timeout duration
The heartbeat timeout after which store gateways are considered unhealthy within the ring. 0 = never (timeout disabled). This option needs be set both on the store-gateway, querier and ruler when running in microservices mode. (default 1m0s)
The heartbeat timeout after which store-gateways are considered unhealthy within the ring. 0 = never (timeout disabled). (default 1m0s)
-store-gateway.sharding-ring.instance-addr string
IP address to advertise in the ring. Default is auto-detected.
-store-gateway.sharding-ring.instance-availability-zone string
Expand All @@ -844,7 +850,7 @@ Usage of ./pyroscope:
-store-gateway.sharding-ring.instance-interface-names string
List of network interface names to look up when finding the instance IP address. (default [<private network interfaces>])
-store-gateway.sharding-ring.instance-port int
Port to advertise in the ring (defaults to -server.grpc-listen-port).
Port to advertise in the ring (defaults to -server.http-listen-port).
-store-gateway.sharding-ring.multi.mirror-enabled
Mirror writes to secondary store.
-store-gateway.sharding-ring.multi.mirror-timeout duration
Expand Down
22 changes: 1 addition & 21 deletions cmd/pyroscope/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,8 @@ Usage of ./pyroscope:
Etcd password.
-distributor.ring.etcd.username string
Etcd username.
-distributor.ring.heartbeat-period duration
Period at which to heartbeat to the ring. 0 = disabled. (default 5s)
-distributor.ring.heartbeat-timeout duration
The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled). (default 1m0s)
-distributor.ring.instance-addr string
IP address to advertise in the ring.
-distributor.ring.instance-id string
Instance ID to register in the ring. (default "<hostname>")
-distributor.ring.instance-interface-names string
Name of network interface to read address from. (default [<private network interfaces>])
-distributor.ring.instance-port int
Port to advertise in the ring (defaults to server.http-listen-port). (default 4040)
Comment on lines -38 to -49
Copy link
Collaborator

@kolesnikovae kolesnikovae Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check: we're hiding these config options from the short output, not removing them, right?

Copy link
Contributor Author

@cyriltovena cyriltovena Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct those are removed from the normal output (all has it).

List of network interface names to look up when finding the instance IP address. (default [<private network interfaces>])
-distributor.ring.store string
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-distributor.zone-awareness-enabled
Expand Down Expand Up @@ -99,18 +89,8 @@ Usage of ./pyroscope:
Etcd password.
-overrides-exporter.ring.etcd.username string
Etcd username.
-overrides-exporter.ring.heartbeat-period duration
Period at which to heartbeat to the ring. 0 = disabled. (default 15s)
-overrides-exporter.ring.heartbeat-timeout duration
The heartbeat timeout after which overrides-exporters are considered unhealthy within the ring. 0 = never (timeout disabled). (default 1m0s)
-overrides-exporter.ring.instance-addr string
IP address to advertise in the ring. Default is auto-detected.
-overrides-exporter.ring.instance-id string
Instance ID to register in the ring. (default "<hostname>")
-overrides-exporter.ring.instance-interface-names string
List of network interface names to look up when finding the instance IP address. (default [<private network interfaces>])
-overrides-exporter.ring.instance-port int
Port to advertise in the ring (defaults to -server.http-listen-port). (default 4040)
-overrides-exporter.ring.store string
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-pyroscopedb.data-path string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ store_gateway:
# The hash ring configuration.
sharding_ring:
# The key-value store used to share the hash ring across multiple instances.
# This option needs be set both on the store-gateway, querier and ruler when
# running in microservices mode.
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
Expand Down Expand Up @@ -341,13 +339,32 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. 0 = never (timeout disabled). This option needs be set
# both on the store-gateway, querier and ruler when running in microservices
# mode.
# The heartbeat timeout after which store-gateways are considered unhealthy
# within the ring. 0 = never (timeout disabled).
# CLI flag: -store-gateway.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Instance ID to register in the ring.
# CLI flag: -store-gateway.sharding-ring.instance-id
[instance_id: <string> | default = "<hostname>"]

# List of network interface names to look up when finding the instance IP
# address.
# CLI flag: -store-gateway.sharding-ring.instance-interface-names
[instance_interface_names: <list of strings> | default = [<private network interfaces>]]

# Port to advertise in the ring (defaults to -server.http-listen-port).
# CLI flag: -store-gateway.sharding-ring.instance-port
[instance_port: <int> | default = 0]

# IP address to advertise in the ring. Default is auto-detected.
# CLI flag: -store-gateway.sharding-ring.instance-addr
[instance_addr: <string> | default = ""]

# Enable using a IPv6 instance address. (default false)
# CLI flag: -store-gateway.sharding-ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]

# The replication factor to use when sharding blocks. This option needs be
# set both on the store-gateway, querier and ruler when running in
# microservices mode.
Expand Down Expand Up @@ -376,27 +393,6 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.wait-stability-max-duration
[wait_stability_max_duration: <duration> | default = 5m]

# Instance ID to register in the ring.
# CLI flag: -store-gateway.sharding-ring.instance-id
[instance_id: <string> | default = "<hostname>"]

# List of network interface names to look up when finding the instance IP
# address.
# CLI flag: -store-gateway.sharding-ring.instance-interface-names
[instance_interface_names: <list of strings> | default = [<private network interfaces>]]

# Port to advertise in the ring (defaults to -server.grpc-listen-port).
# CLI flag: -store-gateway.sharding-ring.instance-port
[instance_port: <int> | default = 0]

# IP address to advertise in the ring. Default is auto-detected.
# CLI flag: -store-gateway.sharding-ring.instance-addr
[instance_addr: <string> | default = ""]

# Enable using a IPv6 instance address. (default false)
# CLI flag: -store-gateway.sharding-ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]

# The availability zone where this instance is running. Required if
# zone-awareness is enabled.
# CLI flag: -store-gateway.sharding-ring.instance-availability-zone
Expand Down
14 changes: 7 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ type Config struct {
PoolConfig clientpool.PoolConfig `yaml:"pool_config,omitempty"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring" doc:"hidden"`
DistributorRing util.CommonRingConfig `yaml:"ring" doc:"hidden"`
}

// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
func (cfg *Config) RegisterFlags(fs *flag.FlagSet, logger log.Logger) {
cfg.PoolConfig.RegisterFlagsWithPrefix("distributor", fs)
fs.DurationVar(&cfg.PushTimeout, "distributor.push.timeout", 5*time.Second, "Timeout when pushing data to ingester.")
cfg.DistributorRing.RegisterFlags(fs)
cfg.DistributorRing.RegisterFlags("distributor.ring.", "collectors/", "distributors", fs, logger)
}

// Distributor coordinates replicates and distribution of log streams.
Expand Down Expand Up @@ -206,7 +206,7 @@ func (d *Distributor) Push(ctx context.Context, grpcReq *connect.Request[pushv1.
}

func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error) {
//todo defer close all profiles in case of error
// todo defer close all profiles in case of error
tenantID, err := tenant.ExtractTenantIDFromContext(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeUnauthenticated, err)
Expand Down Expand Up @@ -264,7 +264,7 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
totalPushUncompressedBytes += int64(decompressedSize)

if err := validation.ValidateProfile(d.limits, tenantID, p.Profile, decompressedSize, series.Labels); err != nil {
//todo this actually discards more if multiple Samples in a Series request
// todo this actually discards more if multiple Samples in a Series request
validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(totalProfiles))
validation.DiscardedBytes.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(totalPushUncompressedBytes))
p.Close()
Expand Down Expand Up @@ -511,14 +511,14 @@ func TokenFor(tenantID, labels string) uint32 {
}

// newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates
func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) {
func newRingAndLifecycler(cfg util.CommonRingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) {
reg = prometheus.WrapRegistererWithPrefix("pyroscope_", reg)
kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' KV store")
}

lifecyclerCfg, err := cfg.ToBasicLifecyclerConfig(logger)
lifecyclerCfg, err := toBasicLifecyclerConfig(cfg, logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to build distributors' lifecycler config")
}
Expand Down
59 changes: 1 addition & 58 deletions pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
package distributor

import (
"flag"
"fmt"
"os"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"

"github.com/grafana/pyroscope/pkg/util"
Expand All @@ -23,48 +16,7 @@ const (
ringNumTokens = 1
)

// RingConfig masks the ring lifecycler config which contains
// many options not really required by the distributors ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

// Instance details
InstanceID string `yaml:"instance_id" doc:"default=<hostname>"`
InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
InstancePort int `yaml:"instance_port" doc:"hidden"`
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`

// Injected internally
ListenPort int `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
hostname, err := os.Hostname()
if err != nil {
level.Error(util.Logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}

cfg.KVStore.Store = "memberlist" // Override default value.
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
cfg.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util.Logger)
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "distributor.ring.instance-interface-names", "Name of network interface to read address from.")
f.StringVar(&cfg.InstanceAddr, "distributor.ring.instance-addr", "", "IP address to advertise in the ring.")
f.IntVar(&cfg.InstancePort, "distributor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.http-listen-port).")
f.StringVar(&cfg.InstanceID, "distributor.ring.instance-id", hostname, "Instance ID to register in the ring.")
}

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error) {
func toBasicLifecyclerConfig(cfg util.CommonRingConfig, logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger, false)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
Expand All @@ -82,12 +34,3 @@ func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLif
KeepInstanceInTheRingOnShutdown: false,
}, nil
}

func (cfg *RingConfig) ToRingConfig() ring.Config {
rc := ring.Config{}
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = 1

return rc
}
3 changes: 2 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"

phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/util"

pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
Expand All @@ -35,7 +36,7 @@ import (
"github.com/grafana/pyroscope/pkg/validation"
)

var ringConfig = RingConfig{
var ringConfig = util.CommonRingConfig{
KVStore: kv.Config{Store: "inmemory"},
InstanceID: "foo",
InstancePort: 8080,
Expand Down
6 changes: 3 additions & 3 deletions pkg/phlare/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ func (f *Phlare) initMemberlistKV() (services.Service, error) {
f.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = f.MemberlistKV.GetMemberlistKV
f.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = f.MemberlistKV.GetMemberlistKV
f.Cfg.QueryScheduler.ServiceDiscovery.SchedulerRing.KVStore.MemberlistKV = f.MemberlistKV.GetMemberlistKV
f.Cfg.OverridesExporter.Ring.KVStore.MemberlistKV = f.MemberlistKV.GetMemberlistKV
f.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = f.MemberlistKV.GetMemberlistKV
f.Cfg.OverridesExporter.Ring.Ring.KVStore.MemberlistKV = f.MemberlistKV.GetMemberlistKV
f.Cfg.StoreGateway.ShardingRing.Ring.KVStore.MemberlistKV = f.MemberlistKV.GetMemberlistKV

f.Cfg.Frontend.QuerySchedulerDiscovery = f.Cfg.QueryScheduler.ServiceDiscovery
f.Cfg.Worker.QuerySchedulerDiscovery = f.Cfg.QueryScheduler.ServiceDiscovery
Expand Down Expand Up @@ -345,7 +345,7 @@ func (f *Phlare) initIngester() (_ services.Service, err error) {
}

func (f *Phlare) initStoreGateway() (serv services.Service, err error) {
f.Cfg.StoreGateway.ShardingRing.ListenPort = f.Cfg.Server.HTTPListenPort
f.Cfg.StoreGateway.ShardingRing.Ring.ListenPort = f.Cfg.Server.HTTPListenPort
if f.storageBucket == nil {
return nil, nil
}
Expand Down
Loading