diff --git a/pkg/pgclient/client.go b/pkg/pgclient/client.go index e6747082c0..acc3aff90f 100644 --- a/pkg/pgclient/client.go +++ b/pkg/pgclient/client.go @@ -202,15 +202,10 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri metricsCache := cache.NewMetricCache(cfg.CacheConfig) labelsCache := cache.NewLabelsCache(cfg.CacheConfig) seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose) - c := ingestor.Cfg{ + c := ingestor.Parameters{ NumCopiers: numCopiers, - IgnoreCompressedChunks: cfg.IgnoreCompressedChunks, - MetricsAsyncAcks: cfg.MetricsAsyncAcks, - TracesAsyncAcks: cfg.TracesAsyncAcks, InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize, - TracesBatchTimeout: cfg.TracesBatchTimeout, - TracesMaxBatchSize: cfg.TracesMaxBatchSize, - TracesBatchWorkers: cfg.TracesBatchWorkers, + Config: &cfg.IngestorFlags, } var ( diff --git a/pkg/pgclient/config.go b/pkg/pgclient/config.go index 8d03f5be4a..fb9b4edf51 100644 --- a/pkg/pgclient/config.go +++ b/pkg/pgclient/config.go @@ -17,13 +17,14 @@ import ( "github.com/timescale/promscale/pkg/limits" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" - "github.com/timescale/promscale/pkg/pgmodel/ingestor/trace" + "github.com/timescale/promscale/pkg/pgmodel/ingestor" "github.com/timescale/promscale/pkg/version" ) // Config for the database. type Config struct { CacheConfig cache.Config + IngestorFlags ingestor.Config AppName string Host string Port int @@ -32,9 +33,6 @@ type Config struct { Database string SslMode string DbConnectionTimeout time.Duration - IgnoreCompressedChunks bool - MetricsAsyncAcks bool - TracesAsyncAcks bool WriteConnections int WriterPoolSize int WriterSynchronousCommit bool @@ -44,9 +42,6 @@ type Config struct { UsesHA bool DbUri string EnableStatementsCache bool - TracesBatchTimeout time.Duration - TracesMaxBatchSize int - TracesBatchWorkers int } const ( @@ -75,6 +70,7 @@ var ( // ParseFlags parses the configuration flags specific to PostgreSQL and TimescaleDB func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { cache.ParseFlags(fs, &cfg.CacheConfig) + ingestor.ParseFlags(fs, &cfg.IngestorFlags) fs.StringVar(&cfg.AppName, "db.app", DefaultApp, "This sets the application_name in database connection string. "+ "This is helpful during debugging when looking at pg_stat_activity.") @@ -85,9 +81,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { fs.StringVar(&cfg.Database, "db.name", defaultDBName, "Database name.") fs.StringVar(&cfg.SslMode, "db.ssl-mode", defaultSSLMode, "TimescaleDB connection ssl mode. If you do not want to use ssl, pass 'allow' as value.") fs.DurationVar(&cfg.DbConnectionTimeout, "db.connection-timeout", defaultConnectionTime, "Timeout for establishing the connection between Promscale and TimescaleDB.") - fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+ - "Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+ - "However, setting this to true will save your resources that may be required during decompression. ") + fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics/traces to database. "+ "By default, this will be set based on the number of CPUs available to the DB Promscale is connected to.") fs.IntVar(&cfg.WriterPoolSize, "db.connections.writer-pool.size", defaultPoolSize, "Maximum size of the writer pool of database connections. This defaults to 50% of max_connections "+ @@ -102,11 +96,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { "Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`") fs.BoolVar(&cfg.EnableStatementsCache, "db.statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+ "Disable if using PgBouncer") - fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.") - fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", true, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.") - fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB") - fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created") - fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.") + return cfg } @@ -114,6 +104,9 @@ func Validate(cfg *Config, lcfg limits.Config) error { if err := cfg.validateConnectionSettings(); err != nil { return err } + if err := ingestor.Validate(&cfg.IngestorFlags); err != nil { + return err + } return cache.Validate(&cfg.CacheConfig, lcfg) } diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..337fa4fb97 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -52,8 +52,8 @@ type pgxDispatcher struct { var _ model.Dispatcher = &pgxDispatcher{} -func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) { - numCopiers := cfg.NumCopiers +func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, params *Parameters) (*pgxDispatcher, error) { + numCopiers := params.NumCopiers if numCopiers < 1 { log.Warn("msg", "num copiers less than 1, setting to 1") numCopiers = 1 @@ -66,7 +66,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh))) metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) }) - if cfg.IgnoreCompressedChunks { + if params.Config.IgnoreCompressedChunks { // Handle decompression to not decompress anything. handleDecompression = skipDecompression } @@ -76,7 +76,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac } labelArrayOID := model.GetCustomTypeOID(model.LabelArray) - labelsCache, err := cache.NewInvertedLabelsCache(cfg.InvertedLabelsCacheSize) + labelsCache, err := cache.NewInvertedLabelsCache(params.InvertedLabelsCacheSize) if err != nil { return nil, err } @@ -94,7 +94,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac invertedLabelsCache: labelsCache, exemplarKeyPosCache: eCache, completeMetricCreation: make(chan struct{}, 1), - asyncAcks: cfg.MetricsAsyncAcks, + asyncAcks: params.Config.MetricsAsyncAcks, copierReadRequestCh: copierReadRequestCh, // set to run at half our deletion interval seriesEpochRefresh: time.NewTicker(30 * time.Minute), @@ -112,7 +112,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac go inserter.runCompleteMetricCreationWorker() - if !cfg.DisableEpochSync { + if !params.Config.DisableEpochSync { inserter.doneWG.Add(1) go func() { defer inserter.doneWG.Done() diff --git a/pkg/pgmodel/ingestor/flags.go b/pkg/pgmodel/ingestor/flags.go new file mode 100644 index 0000000000..65df5baaae --- /dev/null +++ b/pkg/pgmodel/ingestor/flags.go @@ -0,0 +1,37 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package ingestor + +import ( + "flag" + "time" + + "github.com/timescale/promscale/pkg/pgmodel/ingestor/trace" +) + +type Config struct { + MetricsAsyncAcks bool + TracesAsyncAcks bool + DisableEpochSync bool + IgnoreCompressedChunks bool + TracesBatchTimeout time.Duration + TracesMaxBatchSize int + TracesBatchWorkers int +} + +func ParseFlags(fs *flag.FlagSet, cfg *Config) { + fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+ + "Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+ + "However, setting this to true will save your resources that may be required during decompression. ") + fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.") + fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", true, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.") + fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB") + fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created") + fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.") +} + +func Validate(cfg *Config) error { + return nil +} diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 262fd3f59f..002c862714 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -23,18 +23,6 @@ import ( "github.com/timescale/promscale/pkg/tracer" ) -type Cfg struct { - MetricsAsyncAcks bool - TracesAsyncAcks bool - NumCopiers int - DisableEpochSync bool - IgnoreCompressedChunks bool - InvertedLabelsCacheSize uint64 - TracesBatchTimeout time.Duration - TracesMaxBatchSize int - TracesBatchWorkers int -} - // DBIngestor ingest the TimeSeries data into Timescale database. type DBIngestor struct { sCache cache.SeriesCache @@ -43,33 +31,42 @@ type DBIngestor struct { closed *atomic.Bool } +type Parameters struct { + NumCopiers int + InvertedLabelsCacheSize uint64 + Config *Config +} + // NewPgxIngestor returns a new Ingestor that uses connection pool and a metrics cache // for caching metric table names. -func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*DBIngestor, error) { - dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, cfg) +func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, params *Parameters) (*DBIngestor, error) { + dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, params) if err != nil { return nil, err } batcherConfg := trace.BatcherConfig{ - MaxBatchSize: cfg.TracesMaxBatchSize, - BatchTimeout: cfg.TracesBatchTimeout, - Writers: cfg.NumCopiers, + MaxBatchSize: params.Config.TracesMaxBatchSize, + BatchTimeout: params.Config.TracesBatchTimeout, + Writers: params.NumCopiers, } traceWriter := trace.NewWriter(conn) return &DBIngestor{ sCache: sCache, dispatcher: dispatcher, - tWriter: trace.NewDispatcher(traceWriter, cfg.TracesAsyncAcks, batcherConfg), + tWriter: trace.NewDispatcher(traceWriter, params.Config.TracesAsyncAcks, batcherConfg), closed: atomic.NewBool(false), }, nil } // NewPgxIngestorForTests returns a new Ingestor that write to PostgreSQL using PGX // with an empty config, a new default size metrics cache and a non-ha-aware data parser -func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) { +func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Parameters) (*DBIngestor, error) { if cfg == nil { - cfg = &Cfg{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2} + cfg = &Parameters{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2} + } + if cfg.Config == nil { + cfg.Config = &Config{} } cacheConfig := cache.DefaultConfig c := cache.NewMetricCache(cacheConfig) @@ -101,8 +98,9 @@ func (ingestor *DBIngestor) IngestTraces(ctx context.Context, traces ptrace.Trac // IngestMetrics transforms and ingests the timeseries data into Timescale database. // input: -// req the WriteRequest backing tts. It will be added to our WriteRequest -// pool when it is no longer needed. +// +// req the WriteRequest backing tts. It will be added to our WriteRequest +// pool when it is no longer needed. func (ingestor *DBIngestor) IngestMetrics(ctx context.Context, r *prompb.WriteRequest) (numInsertablesIngested uint64, numMetadataIngested uint64, err error) { if ingestor.closed.Load() { return 0, 0, fmt.Errorf("ingestor is closed and can't ingest metrics") diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..37cab6f2f7 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -932,7 +932,7 @@ func TestPGXInserterInsertData(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10, NumCopiers: 2}) + inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Parameters{Config: &Config{DisableEpochSync: true}, InvertedLabelsCacheSize: 10, NumCopiers: 2}) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index bd9fdf8e3e..0a4f6db6be 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -459,8 +459,8 @@ func TestSQLDropMetricChunk(t *testing.T) { } c := cache.NewMetricCache(cache.DefaultConfig) - ingestor, err := ingstr.NewPgxIngestor(pgxconn.NewPgxConn(db), c, scache, nil, &ingstr.Cfg{ - DisableEpochSync: true, + ingestor, err := ingstr.NewPgxIngestor(pgxconn.NewPgxConn(db), c, scache, nil, &ingstr.Parameters{ + Config: &ingstr.Config{DisableEpochSync: true}, InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2, }) diff --git a/pkg/tests/end_to_end_tests/ha_single_promscale_test.go b/pkg/tests/end_to_end_tests/ha_single_promscale_test.go index 41af753276..4c14acf0c6 100644 --- a/pkg/tests/end_to_end_tests/ha_single_promscale_test.go +++ b/pkg/tests/end_to_end_tests/ha_single_promscale_test.go @@ -124,8 +124,8 @@ func prepareWriterWithHa(db *pgxpool.Pool, t testing.TB) (*util.ManualTicker, ht dataParser.AddPreprocessor(ha.NewFilter(haService)) mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)} - ing, err := ingestor.NewPgxIngestor(pgxconn.NewPgxConn(db), mCache, sCache, nil, &ingestor.Cfg{ - InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2, + ing, err := ingestor.NewPgxIngestor(pgxconn.NewPgxConn(db), mCache, sCache, nil, &ingestor.Parameters{ + InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2, Config: &ingestor.Config{}, }) if err != nil { t.Fatalf("could not create ingestor: %v", err) diff --git a/pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go b/pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go index 85a2ad080a..1ca08a30ef 100644 --- a/pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go +++ b/pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go @@ -64,8 +64,8 @@ func TestInsertInCompressedChunks(t *testing.T) { // With decompress chunks being false. withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Cfg{ - IgnoreCompressedChunks: true, + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Parameters{ + Config: &ingstr.Config{IgnoreCompressedChunks: true}, InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2, }) diff --git a/pkg/tests/end_to_end_tests/jaeger_store_integration_test.go b/pkg/tests/end_to_end_tests/jaeger_store_integration_test.go index e91ddc7c6f..38459d7512 100644 --- a/pkg/tests/end_to_end_tests/jaeger_store_integration_test.go +++ b/pkg/tests/end_to_end_tests/jaeger_store_integration_test.go @@ -60,10 +60,10 @@ func TestJaegerStorageIntegration(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { withDB(t, "jaeger_storage_integration_tests", func(db *pgxpool.Pool, t testing.TB) { - cfg := &ingestor.Cfg{ + cfg := &ingestor.Parameters{ InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: runtime.NumCPU() / 2, - TracesAsyncAcks: true, // To make GetLargeSpans happy, otherwise it takes quite a few time to ingest. + Config: &ingestor.Config{TracesAsyncAcks: true}, // To make GetLargeSpans happy, otherwise it takes quite a few time to ingest. } ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), cfg) require.NoError(t, err) diff --git a/pkg/tests/end_to_end_tests/metric_ingest_bench_test.go b/pkg/tests/end_to_end_tests/metric_ingest_bench_test.go index a586caa71e..b1b2027e5e 100644 --- a/pkg/tests/end_to_end_tests/metric_ingest_bench_test.go +++ b/pkg/tests/end_to_end_tests/metric_ingest_bench_test.go @@ -64,7 +64,7 @@ func BenchmarkMetricIngest(b *testing.B) { withDB(b, "bench_e2e_metric_ingest", func(db *pgxpool.Pool, t testing.TB) { b.StopTimer() - metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Cfg{ + metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Parameters{ NumCopiers: 8, InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, }) @@ -119,7 +119,7 @@ func BenchmarkNewSeriesIngestion(b *testing.B) { for i := 0; i < b.N; i++ { withDB(b, "bench_e2e_new_series_ingest", func(db *pgxpool.Pool, t testing.TB) { - metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Cfg{ + metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Parameters{ NumCopiers: 8, InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, }) diff --git a/pkg/tests/end_to_end_tests/trace_ingest_bench_test.go b/pkg/tests/end_to_end_tests/trace_ingest_bench_test.go index 06a6351202..5259ebba3e 100644 --- a/pkg/tests/end_to_end_tests/trace_ingest_bench_test.go +++ b/pkg/tests/end_to_end_tests/trace_ingest_bench_test.go @@ -33,10 +33,10 @@ func BenchmarkTracesIngest(b *testing.B) { for _, c := range cases { b.Run(c.name, func(b *testing.B) { withDB(b, "trace_ingest_bench", func(db *pgxpool.Pool, t testing.TB) { - cfg := &ingestor.Cfg{ + cfg := &ingestor.Parameters{ InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: runtime.NumCPU() / 2, - TracesAsyncAcks: c.async, + Config: &ingestor.Config{TracesAsyncAcks: c.async}, } ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), cfg) require.NoError(t, err)