Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORAGE-4262: Automatically Recover From Failed PRS Operation #5

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stream_query_plan_cache.patch
Signed-off-by: Adam Saponara <as@php.net>
  • Loading branch information
adsr committed Jun 16, 2020
commit 16fa4591934fd074af4b45b3b20c96cc5d8de16b
9 changes: 2 additions & 7 deletions go/vt/vttablet/tabletserver/planbuilder/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,8 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table) (*Pla
}

// BuildStreaming builds a streaming plan based on the schema.
func BuildStreaming(sql string, tables map[string]*schema.Table) (*Plan, error) {
statement, err := sqlparser.Parse(sql)
if err != nil {
return nil, err
}

err = checkForPoolingUnsafeConstructs(statement)
func BuildStreaming(statement sqlparser.Statement, tables map[string]*schema.Table) (*Plan, error) {
err := checkForPoolingUnsafeConstructs(statement)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ func TestCustom(t *testing.T) {
func TestStreamPlan(t *testing.T) {
testSchema := loadSchema("schema_test.json")
for tcase := range iterateExecFile("stream_cases.txt") {
plan, err := BuildStreaming(tcase.input, testSchema)
var plan *Plan
var err error
statement, err := sqlparser.Parse(tcase.input)
if err == nil {
plan, err = BuildStreaming(statement, testSchema)
}
var out string
if err != nil {
out = err.Error()
Expand Down
34 changes: 31 additions & 3 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type QueryEngine struct {
mu sync.RWMutex
tables map[string]*schema.Table
plans *cache.LRUCache
streamPlans *cache.LRUCache
queryRuleSources *rules.Map

queryStatsMu sync.RWMutex
Expand Down Expand Up @@ -182,6 +183,7 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab
se: se,
tables: make(map[string]*schema.Table),
plans: cache.NewLRUCache(int64(config.QueryPlanCacheSize)),
streamPlans: cache.NewLRUCache(int64(config.QueryPlanCacheSize)),
queryRuleSources: rules.NewMap(),
queryPoolWaiterCap: sync2.NewAtomicInt64(int64(config.QueryPoolWaiterCap)),
queryStats: make(map[string]*QueryStats),
Expand Down Expand Up @@ -256,6 +258,15 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab
stats.Publish("QueryCacheOldest", stats.StringFunc(func() string {
return fmt.Sprintf("%v", qe.plans.Oldest())
}))

stats.NewGaugeFunc("StreamQueryCacheLength", "Streaming query engine query cache length", qe.streamPlans.Length)
stats.NewGaugeFunc("StreamQueryCacheSize", "Streaming query engine query cache size", qe.streamPlans.Size)
stats.NewGaugeFunc("StreamQueryCacheCapacity", "Streaming query engine query cache capacity", qe.streamPlans.Capacity)
stats.NewCounterFunc("StreamQueryCacheEvictions", "Streaming query engine query cache evictions", qe.streamPlans.Evictions)
stats.Publish("StreamQueryCacheOldest", stats.StringFunc(func() string {
return fmt.Sprintf("%v", qe.streamPlans.Oldest())
}))

_ = stats.NewCountersFuncWithMultiLabels("QueryCounts", "query counts", []string{"Table", "Plan"}, qe.getQueryCount)
_ = stats.NewCountersFuncWithMultiLabels("QueryTimesNs", "query times in ns", []string{"Table", "Plan"}, qe.getQueryTime)
_ = stats.NewCountersFuncWithMultiLabels("QueryRowCounts", "query row counts", []string{"Table", "Plan"}, qe.getQueryRowCount)
Expand Down Expand Up @@ -312,6 +323,7 @@ func (qe *QueryEngine) Close() {
// Close in reverse order of Open.
qe.se.UnregisterNotifier("qe")
qe.plans.Clear()
qe.streamPlans.Clear()
qe.tables = make(map[string]*schema.Table)
qe.streamConns.Close()
qe.conns.Close()
Expand Down Expand Up @@ -394,18 +406,31 @@ func (qe *QueryEngine) getQueryConn(ctx context.Context) (*connpool.DBConn, erro
return qe.conns.Get(ctx)
}

// GetStreamPlan is similar to GetPlan, but doesn't use the cache
// and doesn't enforce a limit. It just returns the parsed query.
// GetStreamPlan returns the TabletPlan that for the query. Plans are cached in
// a separate cache.LRUCache.
func (qe *QueryEngine) GetStreamPlan(sql string) (*TabletPlan, error) {
if cacheResult, ok := qe.streamPlans.Get(sql); ok {
return cacheResult.(*TabletPlan), nil
}

qe.mu.RLock()
defer qe.mu.RUnlock()
splan, err := planbuilder.BuildStreaming(sql, qe.tables)
statement, err := sqlparser.Parse(sql)
if err != nil {
return nil, err
}
splan, err := planbuilder.BuildStreaming(statement, qe.tables)
if err != nil {
return nil, err
}
plan := &TabletPlan{Plan: splan}
plan.Rules = qe.queryRuleSources.FilterByPlan(sql, plan.PlanID, plan.TableName().String())
plan.buildAuthorized()

if !sqlparser.SkipQueryPlanCacheDirective(statement) {
qe.streamPlans.Set(sql, plan)
}

return plan, nil
}

Expand All @@ -426,6 +451,7 @@ func (qe *QueryEngine) GetMessageStreamPlan(name string) (*TabletPlan, error) {
// ClearQueryPlanCache should be called if query plan cache is potentially obsolete
func (qe *QueryEngine) ClearQueryPlanCache() {
qe.plans.Clear()
qe.streamPlans.Clear()
}

// IsMySQLReachable returns true if we can connect to MySQL.
Expand All @@ -448,6 +474,7 @@ func (qe *QueryEngine) schemaChanged(tables map[string]*schema.Table, created, a
qe.tables = tables
if len(altered) != 0 || len(dropped) != 0 {
qe.plans.Clear()
qe.streamPlans.Clear()
}
}

Expand All @@ -473,6 +500,7 @@ func (qe *QueryEngine) SetQueryPlanCacheCap(size int) {
size = 1
}
qe.plans.SetCapacity(int64(size))
qe.streamPlans.SetCapacity(int64(size))
}

// QueryPlanCacheCap returns the capacity of the query cache.
Expand Down