Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enforce bounds on query-related CLI/config parameters to avoid startup panics #20149

Merged
merged 5 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ want to use the default.
1. [20110](https://github.com/influxdata/influxdb/pull/20110): Use V2 directory for default V2 config path in `influxd upgrade`.
1. [20137](https://github.com/influxdata/influxdb/pull/20137): Fix panic when writing a point with 100 tags. Thanks @foobar!
1. [20151](https://github.com/influxdata/influxdb/pull/20151): Don't log bodies of V1 write requests.
1. [20097](https://github.com/influxdata/influxdb/pull/20097): Ensure Index.Walk fetches matching foreign keys only.
1. [20149](https://github.com/influxdata/influxdb/pull/20149): Enforce max value of 2147483647 on query concurrency to avoid startup panic.
1. [20149](https://github.com/influxdata/influxdb/pull/20149): Enforce max value of 2147483647 on query queue size to avoid startup panic.

## v2.0.2 [2020-11-19]

Expand All @@ -34,7 +37,6 @@ want to use the default.
1. [20053](https://github.com/influxdata/influxdb/pull/20053): Upgrade Flux to v0.95.0.
1. [20058](https://github.com/influxdata/influxdb/pull/20058): UI: Upgrade flux-lsp-browser to v0.5.23.
1. [20067](https://github.com/influxdata/influxdb/pull/20067): Add DBRP cli commands as `influxd v1 dbrp`.
1. [20097](https://github.com/influxdata/influxdb/pull/20097): Ensure Index.Walk fetches matching foreign keys only.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally filed under the wrong release


### Bug Fixes

Expand Down
16 changes: 8 additions & 8 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,11 @@ type Launcher struct {
flagger feature.Flagger

// Query options.
concurrencyQuota int
initialMemoryBytesQuotaPerQuery int
memoryBytesQuotaPerQuery int
maxMemoryBytes int
queueSize int
concurrencyQuota int32
initialMemoryBytesQuotaPerQuery int64
memoryBytesQuotaPerQuery int64
maxMemoryBytes int64
queueSize int32

boltClient *bolt.Client
kvStore kv.SchemaStore
Expand Down Expand Up @@ -900,9 +900,9 @@ func (m *Launcher) run(ctx context.Context) (err error) {

m.queryController, err = control.New(control.Config{
ConcurrencyQuota: m.concurrencyQuota,
InitialMemoryBytesQuotaPerQuery: int64(m.initialMemoryBytesQuotaPerQuery),
MemoryBytesQuotaPerQuery: int64(m.memoryBytesQuotaPerQuery),
MaxMemoryBytes: int64(m.maxMemoryBytes),
InitialMemoryBytesQuotaPerQuery: m.initialMemoryBytesQuotaPerQuery,
MemoryBytesQuotaPerQuery: m.memoryBytesQuotaPerQuery,
MaxMemoryBytes: m.maxMemoryBytes,
QueueSize: m.queueSize,
Logger: m.log.With(zap.String("service", "storage-reads")),
ExecutorDependencies: []flux.Dependency{deps},
Expand Down
52 changes: 52 additions & 0 deletions kit/cli/viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,58 @@ func BindOptions(v *viper.Viper, cmd *cobra.Command, opts []Opt) {
}
mustBindPFlag(v, o.Flag, flagset)
*destP = v.GetInt(envVar)
case *int32:
var d int32
if o.Default != nil {
// N.B. since our CLI kit types default values as interface{} and
// literal numbers get typed as int by default, it's very easy to
// create an int32 CLI flag with an int default value.
//
// The compiler doesn't know to complain in that case, so you end up
// with a runtime panic when trying to bind the CLI options.
//
// To avoid that headache, we support both int32 and int defaults
// for int32 fields. This introduces a new runtime bomb if somebody
// specifies an int default > math.MaxInt32, but that's hopefully
// less likely.
var ok bool
d, ok = o.Default.(int32)
if !ok {
d = int32(o.Default.(int))
}
}
if hasShort {
flagset.Int32VarP(destP, o.Flag, string(o.Short), d, o.Desc)
} else {
flagset.Int32Var(destP, o.Flag, d, o.Desc)
}
mustBindPFlag(v, o.Flag, flagset)
*destP = v.GetInt32(envVar)
case *int64:
var d int64
if o.Default != nil {
// N.B. since our CLI kit types default values as interface{} and
// literal numbers get typed as int by default, it's very easy to
// create an int64 CLI flag with an int default value.
//
// The compiler doesn't know to complain in that case, so you end up
// with a runtime panic when trying to bind the CLI options.
//
// To avoid that headache, we support both int64 and int defaults
// for int64 fields.
var ok bool
d, ok = o.Default.(int64)
if !ok {
d = int64(o.Default.(int))
}
}
if hasShort {
flagset.Int64VarP(destP, o.Flag, string(o.Short), d, o.Desc)
} else {
flagset.Int64Var(destP, o.Flag, d, o.Desc)
}
mustBindPFlag(v, o.Flag, flagset)
*destP = v.GetInt64(envVar)
case *bool:
var d bool
if o.Default != nil {
Expand Down
35 changes: 33 additions & 2 deletions kit/cli/viper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"os"
"path"
"testing"
Expand Down Expand Up @@ -40,6 +41,8 @@ func (c *customFlag) Type() string {
func ExampleNewCommand() {
var monitorHost string
var number int
var smallerNumber int32
var longerNumber int64
var sleep bool
var duration time.Duration
var stringSlice []string
Expand All @@ -50,6 +53,7 @@ func ExampleNewCommand() {
for i := 0; i < number; i++ {
fmt.Printf("%d\n", i)
}
fmt.Println(longerNumber - int64(smallerNumber))
fmt.Println(sleep)
fmt.Println(duration)
fmt.Println(stringSlice)
Expand All @@ -70,6 +74,18 @@ func ExampleNewCommand() {
Default: 2,
Desc: "number of times to loop",
},
{
DestP: &smallerNumber,
Flag: "smaller-number",
Default: math.MaxInt32,
Desc: "limited size number",
},
{
DestP: &longerNumber,
Flag: "longer-number",
Default: math.MaxInt64,
Desc: "explicitly expanded-size number",
},
{
DestP: &sleep,
Flag: "sleep",
Expand Down Expand Up @@ -104,6 +120,7 @@ func ExampleNewCommand() {
// http://localhost:8086
// 0
// 1
// 9223372034707292160
// true
// 1m0s
// [foo bar]
Expand All @@ -113,8 +130,10 @@ func ExampleNewCommand() {
func Test_NewProgram(t *testing.T) {
testFilePath, cleanup := newConfigFile(t, map[string]string{
// config values should be same as flags
"foo": "bar",
"shoe-fly": "yadon",
"foo": "bar",
"shoe-fly": "yadon",
"number": "2147483647",
"long-number": "9223372036854775807",
})
defer cleanup()
defer setEnvVar("TEST_CONFIG_PATH", testFilePath)()
Expand Down Expand Up @@ -155,6 +174,8 @@ func Test_NewProgram(t *testing.T) {

var testVar string
var testFly string
var testNumber int32
var testLongNumber int64
program := &Program{
Name: "test",
Opts: []Opt{
Expand All @@ -167,6 +188,14 @@ func Test_NewProgram(t *testing.T) {
DestP: &testFly,
Flag: "shoe-fly",
},
{
DestP: &testNumber,
Flag: "number",
},
{
DestP: &testLongNumber,
Flag: "long-number",
},
},
Run: func() error { return nil },
}
Expand All @@ -177,6 +206,8 @@ func Test_NewProgram(t *testing.T) {

require.Equal(t, tt.expected, testVar)
assert.Equal(t, "yadon", testFly)
assert.Equal(t, int32(math.MaxInt32), testNumber)
assert.Equal(t, int64(math.MaxInt64), testLongNumber)
}

t.Run(tt.name, fn)
Expand Down
33 changes: 24 additions & 9 deletions query/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ type Controller struct {

type Config struct {
// ConcurrencyQuota is the number of queries that are allowed to execute concurrently.
ConcurrencyQuota int
//
// This value is limited to an int32 because it's used to set the initial delta on the
// controller's WaitGroup, and WG deltas have an effective limit of math.MaxInt32.
// See: https://github.com/golang/go/issues/20687
ConcurrencyQuota int32

// InitialMemoryBytesQuotaPerQuery is the initial number of bytes allocated for a query
// when it is started. If this is unset, then the MemoryBytesQuotaPerQuery will be used.
Expand All @@ -91,10 +95,20 @@ type Config struct {
// This number may be less than the ConcurrencyQuota * MemoryBytesQuotaPerQuery.
MaxMemoryBytes int64

// QueueSize is the number of queries that are allowed to be awaiting execution before new queries are
// rejected.
QueueSize int
Logger *zap.Logger
// QueueSize is the number of queries that are allowed to be awaiting execution before new queries are rejected.
//
// This value is limited to an int32 because it's used to make(chan *Query, QueueSize) on controller startup.
// Through trial-and-error I found that make(chan *Query, N) starts to panic for N > 1<<45 - 12, so not all
// ints or int64s are safe to pass here. Using that max value still immediately crashes the program with an OOM,
// because it tries to allocate TBs of memory for the channel.
// I was able to boot influxd locally using math.MaxInt32 for this parameter.
//
// Less-scientifically, this was the only Config parameter other than ConcurrencyQuota to be typed as an int
// instead of an explicit int64. When ConcurrencyQuota changed to an int32, it felt like a decent idea for
// this to follow suit.
QueueSize int32

Logger *zap.Logger
// MetricLabelKeys is a list of labels to add to the metrics produced by the controller.
// The value for a given key will be read off the context.
// The context value must be a string or an implementation of the Stringer interface.
Expand Down Expand Up @@ -159,11 +173,11 @@ func New(config Config) (*Controller, error) {
logger = zap.NewNop()
}
logger.Info("Starting query controller",
zap.Int("concurrency_quota", c.ConcurrencyQuota),
zap.Int32("concurrency_quota", c.ConcurrencyQuota),
zap.Int64("initial_memory_bytes_quota_per_query", c.InitialMemoryBytesQuotaPerQuery),
zap.Int64("memory_bytes_quota_per_query", c.MemoryBytesQuotaPerQuery),
zap.Int64("max_memory_bytes", c.MaxMemoryBytes),
zap.Int("queue_size", c.QueueSize))
zap.Int32("queue_size", c.QueueSize))

mm := &memoryManager{
initialBytesQuotaPerQuery: c.InitialMemoryBytesQuotaPerQuery,
Expand All @@ -186,8 +200,9 @@ func New(config Config) (*Controller, error) {
labelKeys: c.MetricLabelKeys,
dependencies: c.ExecutorDependencies,
}
ctrl.wg.Add(c.ConcurrencyQuota)
for i := 0; i < c.ConcurrencyQuota; i++ {
quota := int(c.ConcurrencyQuota)
ctrl.wg.Add(quota)
for i := 0; i < quota; i++ {
go func() {
defer ctrl.wg.Done()
ctrl.processQueryQueue()
Expand Down