Skip to content

Commit

Permalink
config: add validation for configuration (pingcap#5864)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Mar 6, 2018
1 parent 9a3463f commit 50e98f4
Show file tree
Hide file tree
Showing 23 changed files with 122 additions and 97 deletions.
55 changes: 34 additions & 21 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,31 @@ import (
tracing "github.com/uber/jaeger-client-go/config"
)

// Config number limitations
const (
MaxLogFileSize = 4096 // MB
)

// Valid config maps
var (
ValidStorage = map[string]bool{
"mocktikv": true,
"tikv": true,
}
)

// Config contains configuration options.
type Config struct {
Host string `toml:"host" json:"host"`
Port int `toml:"port" json:"port"`
Port uint `toml:"port" json:"port"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit int `toml:"token-limit" json:"token-limit"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
EnableChunk bool `toml:"enable-chunk" json:"enable-chunk"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
Expand Down Expand Up @@ -65,9 +78,9 @@ type Log struct {
File logutil.FileLogConfig `toml:"file" json:"file"`

SlowQueryFile string `toml:"slow-query-file" json:"slow-query-file"`
SlowThreshold int `toml:"slow-threshold" json:"slow-threshold"`
ExpensiveThreshold int `toml:"expensive-threshold" json:"expensive-threshold"`
QueryLogMaxLen int `toml:"query-log-max-len" json:"query-log-max-len"`
SlowThreshold uint `toml:"slow-threshold" json:"slow-threshold"`
ExpensiveThreshold uint `toml:"expensive-threshold" json:"expensive-threshold"`
QueryLogMaxLen uint `toml:"query-log-max-len" json:"query-log-max-len"`
}

// Security is the security section of the config.
Expand Down Expand Up @@ -119,42 +132,42 @@ func (s *Security) ToTLSConfig() (*tls.Config, error) {
// Status is the status section of the config.
type Status struct {
ReportStatus bool `toml:"report-status" json:"report-status"`
StatusPort int `toml:"status-port" json:"status-port"`
StatusPort uint `toml:"status-port" json:"status-port"`
MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"`
MetricsInterval int `toml:"metrics-interval" json:"metrics-interval"`
MetricsInterval uint `toml:"metrics-interval" json:"metrics-interval"`
}

// Performance is the performance section of the config.
type Performance struct {
MaxProcs int `toml:"max-procs" json:"max-procs"`
MaxProcs uint `toml:"max-procs" json:"max-procs"`
TCPKeepAlive bool `toml:"tcp-keep-alive" json:"tcp-keep-alive"`
RetryLimit int `toml:"retry-limit" json:"retry-limit"`
JoinConcurrency int `toml:"join-concurrency" json:"join-concurrency"`
RetryLimit uint `toml:"retry-limit" json:"retry-limit"`
JoinConcurrency uint `toml:"join-concurrency" json:"join-concurrency"`
CrossJoin bool `toml:"cross-join" json:"cross-join"`
StatsLease string `toml:"stats-lease" json:"stats-lease"`
RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"`
StmtCountLimit int `toml:"stmt-count-limit" json:"stmt-count-limit"`
StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"`
}

// XProtocol is the XProtocol section of the config.
type XProtocol struct {
XServer bool `toml:"xserver" json:"xserver"`
XHost string `toml:"xhost" json:"xhost"`
XPort int `toml:"xport" json:"xport"`
XPort uint `toml:"xport" json:"xport"`
XSocket string `toml:"xsocket" json:"xsocket"`
}

// PlanCache is the PlanCache section of the config.
type PlanCache struct {
Enabled bool `toml:"enabled" json:"enabled"`
Capacity int64 `toml:"capacity" json:"capacity"`
Shards int64 `toml:"shards" json:"shards"`
Enabled bool `toml:"enabled" json:"enabled"`
Capacity uint `toml:"capacity" json:"capacity"`
Shards uint `toml:"shards" json:"shards"`
}

// PreparedPlanCache is the PreparedPlanCache section of the config.
type PreparedPlanCache struct {
Enabled bool `toml:"enabled" json:"enabled"`
Capacity int64 `toml:"capacity" json:"capacity"`
Enabled bool `toml:"enabled" json:"enabled"`
Capacity uint `toml:"capacity" json:"capacity"`
}

// OpenTracing is the opentracing section of the config.
Expand Down Expand Up @@ -191,14 +204,14 @@ type ProxyProtocol struct {
// * means all networks.
Networks string `toml:"networks" json:"networks"`
// PROXY protocol header read timeout, Unit is second.
HeaderTimeout int `toml:"header-timeout" json:"header-timeout"`
HeaderTimeout uint `toml:"header-timeout" json:"header-timeout"`
}

// TiKVClient is the config for tikv client.
type TiKVClient struct {
// GrpcConnectionCount is the max gRPC connections that will be established
// with each tikv-server.
GrpcConnectionCount int `toml:"grpc-connection-count" json:"grpc-connection-count"`
GrpcConnectionCount uint `toml:"grpc-connection-count" json:"grpc-connection-count"`
// CommitTimeout is the max time which command 'commit' will wait.
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
}
Expand Down Expand Up @@ -240,8 +253,8 @@ var defaultConf = Config{
StmtCountLimit: 5000,
},
XProtocol: XProtocol{
XHost: "0.0.0.0",
XPort: 14000,
XHost: "",
XPort: 0,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
15 changes: 1 addition & 14 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ oom-action = "log"
enable-streaming = false

[log]
# Log level: info, debug, warn, error, fatal.
# Log level: debug, info, warn, error, fatal.
level = "info"

# Log format, one of json, text, console.
Expand Down Expand Up @@ -136,19 +136,6 @@ stats-lease = "3s"
# Run auto analyze worker on this tidb-server.
run-auto-analyze = true

[xprotocol]
# Start TiDB x server.
xserver = false

# TiDB x protocol server host.
xhost = "0.0.0.0"

# TiDB x protocol server port.
xport = 14000

# The socket file to use for x protocol connection.
xsocket = ""

[proxy-protocol]
# PROXY protocol acceptable client networks.
# Empty string means disable PROXY protocol, * means all networks.
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *testConfigSuite) TestConfig(c *C) {
c.Assert(conf.BinlogSocket, Equals, "/tmp/socket")

// Test that the value will be overwritten by the config file.
c.Assert(conf.Performance.RetryLimit, Equals, 10)
c.Assert(conf.Performance.RetryLimit, Equals, uint(10))

c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")

Expand Down
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
return
}
sql := a.Text
if len(sql) > cfg.Log.QueryLogMaxLen {
if len(sql) > int(cfg.Log.QueryLogMaxLen) {
sql = fmt.Sprintf("%.*q(len:%d)", cfg.Log.QueryLogMaxLen, sql, len(a.Text))
}
connID := a.Ctx.GetSessionVars().ConnectionID
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (b *executorBuilder) buildHashJoin(v *plan.PhysicalHashJoin) Executor {
}
}
e.resultGenerators = make([]joinResultGenerator, e.concurrency)
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
e.resultGenerators[i] = newJoinResultGenerator(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
v.OtherConditions, lhsTypes, rhsTypes)
}
Expand Down
32 changes: 17 additions & 15 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type HashJoinExec struct {
innerKeys []*expression.Column

prepared bool
concurrency int // concurrency is number of concurrent channels and join workers.
concurrency uint // concurrency is number of concurrent channels and join workers.
hashTable *mvmap.MVMap
hashJoinBuffers []*hashJoinBuffer
outerBufferChs []chan *execResult
Expand Down Expand Up @@ -151,7 +151,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error {

e.hashTableValBufs = make([][][]byte, e.concurrency)
e.hashJoinBuffers = make([]*hashJoinBuffer, 0, e.concurrency)
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
buffer := &hashJoinBuffer{
data: make([]types.Datum, len(e.outerKeys)),
bytes: make([]byte, 0, 10000),
Expand Down Expand Up @@ -255,7 +255,7 @@ func (e *HashJoinExec) fetchOuterRows(ctx context.Context) {
}()

bufferCapacity, maxBufferCapacity := 1, 128
for i, noMoreData := 0, false; !noMoreData; i = (i + 1) % e.concurrency {
for i, noMoreData := uint(0), false; !noMoreData; i = (i + 1) % e.concurrency {
outerBuffer := &execResult{rows: make([]Row, 0, bufferCapacity)}

for !noMoreData && len(outerBuffer.rows) < bufferCapacity {
Expand Down Expand Up @@ -350,13 +350,13 @@ func (e *HashJoinExec) initializeForProbe() {
// e.outerResultChs is for transmitting the chunks which store the data of outerExec,
// it'll be written by outer worker goroutine, and read by join workers.
e.outerResultChs = make([]chan *chunk.Chunk, e.concurrency)
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
e.outerResultChs[i] = make(chan *chunk.Chunk, 1)
}

// e.outerChkResourceCh is for transmitting the used outerExec chunks from join workers to outerExec worker.
e.outerChkResourceCh = make(chan *outerChkResource, e.concurrency)
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
e.outerChkResourceCh <- &outerChkResource{
chk: e.outerExec.newChunk(),
dest: e.outerResultChs[i],
Expand All @@ -366,7 +366,7 @@ func (e *HashJoinExec) initializeForProbe() {
// e.joinChkResourceCh is for transmitting the reused join result chunks
// from the main thread to join worker goroutines.
e.joinChkResourceCh = make([]chan *chunk.Chunk, e.concurrency)
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1)
e.joinChkResourceCh[i] <- e.newChunk()
}
Expand All @@ -389,7 +389,7 @@ func (e *HashJoinExec) fetchOuterAndProbeHashTable(ctx context.Context) {
go e.fetchOuterChunks(ctx)

// Start e.concurrency join workers to probe hash table and join inner and outer rows.
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
e.workerWaitGroup.Add(1)
go e.runJoinWorker4Chunk(i)
}
Expand Down Expand Up @@ -437,7 +437,7 @@ func (e *HashJoinExec) prepare4Row(ctx context.Context) error {
// and e.concurrency goroutines to concatenate the matched inner and outer rows and filter the result.
if !(e.hashTable.Len() == 0 && e.joinType == plan.InnerJoin) {
e.outerBufferChs = make([]chan *execResult, e.concurrency)
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
e.outerBufferChs[i] = make(chan *execResult, e.concurrency)
}

Expand All @@ -446,7 +446,7 @@ func (e *HashJoinExec) prepare4Row(ctx context.Context) error {
go e.fetchOuterRows(ctx)

// Start e.concurrency join workers to probe hash table and join inner and outer rows.
for i := 0; i < e.concurrency; i++ {
for i := uint(0); i < e.concurrency; i++ {
e.workerWaitGroup.Add(1)
go e.runJoinWorker(i)
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func (e *HashJoinExec) filterOuters(outerBuffer *execResult, outerFilterResult [
}

// runJoinWorker does join job in one goroutine.
func (e *HashJoinExec) runJoinWorker(workerID int) {
func (e *HashJoinExec) runJoinWorker(workerID uint) {
bufferCapacity := 1024
resultBuffer := &execResult{rows: make([]Row, 0, bufferCapacity)}
outerFilterResult := make([]bool, 0, bufferCapacity)
Expand Down Expand Up @@ -556,7 +556,7 @@ func (e *HashJoinExec) runJoinWorker(workerID int) {
e.workerWaitGroup.Done()
}

func (e *HashJoinExec) runJoinWorker4Chunk(workerID int) {
func (e *HashJoinExec) runJoinWorker4Chunk(workerID uint) {
defer func() {
if r := recover(); r != nil {
e.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)}
Expand Down Expand Up @@ -606,7 +606,7 @@ func (e *HashJoinExec) runJoinWorker4Chunk(workerID int) {
// joinOuterRow creates result rows from a row in a big table and sends them to resultRows channel.
// Every matching row generates a result row.
// If there are no matching rows and it is outer join, a null filled result row is created.
func (e *HashJoinExec) joinOuterRow(workerID int, outerRow Row, resultBuffer *execResult) bool {
func (e *HashJoinExec) joinOuterRow(workerID uint, outerRow Row, resultBuffer *execResult) bool {
buffer := e.hashJoinBuffers[workerID]
hasNull, joinKey, err := getJoinKey(e.ctx.GetSessionVars().StmtCtx, e.outerKeys, outerRow, buffer.data, buffer.bytes[:0:cap(buffer.bytes)])
if err != nil {
Expand Down Expand Up @@ -647,7 +647,8 @@ func (e *HashJoinExec) joinOuterRow(workerID int, outerRow Row, resultBuffer *ex
return true
}

func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID int, outerRow chunk.Row, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.Row,
joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
buffer := e.hashJoinBuffers[workerID]
hasNull, joinKey, err := e.getJoinKeyFromChkRow(true, outerRow, buffer.bytes)
if err != nil {
Expand Down Expand Up @@ -695,7 +696,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID int, outerRow chunk.Ro
return true, joinResult
}

func (e *HashJoinExec) getNewJoinResult(workerID int) (bool, *hashjoinWorkerResult) {
func (e *HashJoinExec) getNewJoinResult(workerID uint) (bool, *hashjoinWorkerResult) {
joinResult := &hashjoinWorkerResult{
src: e.joinChkResourceCh[workerID],
}
Expand All @@ -708,7 +709,8 @@ func (e *HashJoinExec) getNewJoinResult(workerID int) (bool, *hashjoinWorkerResu
return ok, joinResult
}

func (e *HashJoinExec) join2Chunk(workerID int, outerChk *chunk.Chunk, joinResult *hashjoinWorkerResult, selected []bool) (ok bool, _ *hashjoinWorkerResult) {
func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResult *hashjoinWorkerResult,
selected []bool) (ok bool, _ *hashjoinWorkerResult) {
var err error
selected, err = expression.VectorizedFilter(e.ctx, e.outerFilter, chunk.NewIterator4Chunk(outerChk), selected)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e
originalTxnTS uint64
txn Transaction
)
for i := 0; i < maxRetryCnt; i++ {
for i := uint(0); i < maxRetryCnt; i++ {
txn, err = store.Begin()
if err != nil {
log.Errorf("[kv] RunInNewTxn error - %v", err)
Expand Down Expand Up @@ -73,7 +73,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e

var (
// Max retry count in RunInNewTxn
maxRetryCnt = 100
maxRetryCnt uint = 100
// retryBackOffBase is the initial duration, in microsecond, a failed transaction stays dormancy before it retries
retryBackOffBase = 1
// retryBackOffCap is the max amount of duration, in microsecond, a failed transaction stays dormancy before it retries
Expand All @@ -83,7 +83,7 @@ var (
// BackOff Implements exponential backoff with full jitter.
// Returns real back off time in microsecond.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html.
func BackOff(attempts int) int {
func BackOff(attempts uint) int {
upper := int(math.Min(float64(retryBackOffCap), float64(retryBackOffBase)*math.Pow(2.0, float64(attempts))))
sleep := time.Duration(rand.Intn(upper)) * time.Millisecond
time.Sleep(sleep)
Expand Down
2 changes: 1 addition & 1 deletion kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *testTxnSuite) TestBackOff(c *C) {
mustBackOff(c, 100000, 100)
}

func mustBackOff(c *C, cnt, sleep int) {
func mustBackOff(c *C, cnt uint, sleep int) {
c.Assert(BackOff(cnt), LessEqual, sleep*int(time.Millisecond))
}

Expand Down
6 changes: 3 additions & 3 deletions plan/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ var (
// PlanCacheEnabled stores the global config "plan-cache-enabled".
PlanCacheEnabled bool
// PlanCacheShards stores the global config "plan-cache-shards".
PlanCacheShards int64
PlanCacheShards uint
// PlanCacheCapacity stores the global config "plan-cache-capacity".
PlanCacheCapacity int64
PlanCacheCapacity uint
// GlobalPlanCache stores the global plan cache for every session in a tidb-server.
GlobalPlanCache *kvcache.ShardedLRUCache

// PreparedPlanCacheEnabled stores the global config "prepared-plan-cache-enabled".
PreparedPlanCacheEnabled bool
// PreparedPlanCacheCapacity stores the global config "prepared-plan-cache-capacity".
PreparedPlanCacheCapacity int64
PreparedPlanCacheCapacity uint
)

type sqlCacheKey struct {
Expand Down
2 changes: 1 addition & 1 deletion plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

// JoinConcurrency means the number of goroutines that participate in joining.
var JoinConcurrency = 5
var JoinConcurrency uint = 5

// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get
// these tasks one by one.
Expand Down
2 changes: 1 addition & 1 deletion plan/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ type PhysicalHashJoin struct {
// For inner join, the smaller one will be chosen.
// For outer join or semi join, it's exactly the inner one.
InnerChildIdx int
Concurrency int
Concurrency uint

DefaultValues []types.Datum
}
Expand Down
Loading

0 comments on commit 50e98f4

Please sign in to comment.