Skip to content

Commit

Permalink
update row_count as per mysql spec
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed May 13, 2020
1 parent 2de37ae commit 830f31d
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 64 deletions.
57 changes: 37 additions & 20 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type Executor struct {
// and use the legacy one in production until we are comfortable with the new code.
// it's temporary and should be removed once we can do everything using the new planning strategy
type executeMethod interface {
execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) (*sqltypes.Result, error)
execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) (sqlparser.StatementType, *sqltypes.Result, error)
}

var executorOnce sync.Once
Expand Down Expand Up @@ -161,11 +161,9 @@ func (e *Executor) Execute(ctx context.Context, method string, safeSession *Safe
defer span.Finish()

logStats := NewLogStats(ctx, method, sql, bindVars)
result, err = e.exec.execute(ctx, safeSession, sql, bindVars, logStats)
if err == nil {
safeSession.FoundRows = result.RowsAffected
}
stmtType, result, err := e.exec.execute(ctx, safeSession, sql, bindVars, logStats)
logStats.Error = err
saveSessionStats(safeSession, stmtType, result, err)
if result != nil && len(result.Rows) > *warnMemoryRows {
warnings.Add("ResultsExceeded", 1)
}
Expand All @@ -179,21 +177,38 @@ func (e *Executor) Execute(ctx context.Context, method string, safeSession *Safe
return result, err
}

func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) (*sqltypes.Result, error) {
func saveSessionStats(safeSession *SafeSession, stmtType sqlparser.StatementType, result *sqltypes.Result, err error) {
safeSession.RowCount = -1
if err != nil {
return
}
safeSession.FoundRows = result.RowsAffected
if result.InsertID > 0 {
safeSession.LastInsertId = result.InsertID
}
switch stmtType {
case sqlparser.StmtInsert, sqlparser.StmtReplace, sqlparser.StmtUpdate, sqlparser.StmtDelete:
safeSession.RowCount = int64(result.RowsAffected)
case sqlparser.StmtDDL, sqlparser.StmtSet, sqlparser.StmtBegin, sqlparser.StmtCommit, sqlparser.StmtRollback:
safeSession.RowCount = 0
}
}

func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) (sqlparser.StatementType, *sqltypes.Result, error) {
//Start an implicit transaction if necessary.
if !safeSession.Autocommit && !safeSession.InTransaction() {
if err := e.txConn.Begin(ctx, safeSession); err != nil {
return nil, err
return 0, nil, err
}
}

destKeyspace, destTabletType, dest, err := e.ParseDestinationTarget(safeSession.TargetString)
if err != nil {
return nil, err
return 0, nil, err
}

if safeSession.InTransaction() && destTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType)
return 0, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType)
}
if bindVars == nil {
bindVars = make(map[string]*querypb.BindVariable)
Expand All @@ -216,15 +231,19 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st
switch stmtType {
case sqlparser.StmtSelect, sqlparser.StmtInsert, sqlparser.StmtReplace, sqlparser.StmtUpdate,
sqlparser.StmtDelete, sqlparser.StmtDDL, sqlparser.StmtUse, sqlparser.StmtExplain, sqlparser.StmtOther:
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: not reachable as handled with plan execute")
return 0, nil, vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: not reachable as handled with plan execute")
case sqlparser.StmtSet:
return e.handleSet(ctx, safeSession, sql, logStats)
qr, err := e.handleSet(ctx, safeSession, sql, logStats)
return sqlparser.StmtSet, qr, err
case sqlparser.StmtShow:
return e.handleShow(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats)
qr, err := e.handleShow(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats)
return sqlparser.StmtShow, qr, err
case sqlparser.StmtComment:
return e.handleComment(sql)
// Effectively should be done through new plan.
// There are some statements which are not planned for special comments.
return sqlparser.StmtComment, &sqltypes.Result{}, nil
}
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unrecognized statement: %s", sql)
return 0, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unrecognized statement: %s", sql)
}

// addNeededBindVars adds bind vars that are needed by the plan
Expand All @@ -248,6 +267,10 @@ func (e *Executor) addNeededBindVars(bindVarNeeds sqlparser.BindVarNeeds, bindVa
bindVars[sqlparser.FoundRowsName] = sqltypes.Uint64BindVariable(session.FoundRows)
}

if bindVarNeeds.NeedRowCount {
bindVars[sqlparser.RowCountName] = sqltypes.Int64BindVariable(session.RowCount)
}

return nil
}

Expand Down Expand Up @@ -1033,12 +1056,6 @@ func (e *Executor) handleOther(ctx context.Context, safeSession *SafeSession, sq
return result, err
}

func (e *Executor) handleComment(sql string) (*sqltypes.Result, error) {
_, _ = sqlparser.ExtractMysqlComment(sql)
// Not sure if this is a good idea.
return &sqltypes.Result{}, nil
}

// StreamExecute executes a streaming query.
func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, callback func(*sqltypes.Result) error) (err error) {
logStats := NewLogStats(ctx, method, sql, bindVars)
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,35 @@ func TestFoundRows(t *testing.T) {
utils.MustMatch(t, result, wantResult, "Mismatch")
}

func TestRowCount(t *testing.T) {
executor, _, _, _ := createExecutorEnv()
executor.normalize = true
logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)

_, err := executorExec(executor, "select 42", map[string]*querypb.BindVariable{})
require.NoError(t, err)
testRowCount(t, executor, -1)

_, err = executorExec(executor, "update user set name = 'abc' where id in (42, 24)", map[string]*querypb.BindVariable{})
require.NoError(t, err)
testRowCount(t, executor, 2)
}

func testRowCount(t *testing.T, executor *Executor, wantRowCount int64) {
result, err := executorExec(executor, "select row_count()", map[string]*querypb.BindVariable{})
wantResult := &sqltypes.Result{
Fields: []*querypb.Field{
{Name: "row_count()", Type: sqltypes.Int64},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(wantRowCount),
}},
}
require.NoError(t, err)
utils.MustMatch(t, result, wantResult, "Mismatch")
}

func TestSelectLastInsertIdInUnion(t *testing.T) {
executor, sbc1, _, _ := createExecutorEnv()
executor.normalize = true
Expand Down
26 changes: 10 additions & 16 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/utils"

"vitess.io/vitess/go/vt/topo"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -325,12 +327,10 @@ func TestExecutorAutocommit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
wantSession := &vtgatepb.Session{TargetString: "@master", InTransaction: true, FoundRows: 1}
wantSession := &vtgatepb.Session{TargetString: "@master", InTransaction: true, FoundRows: 1, RowCount: -1}
testSession := *session.Session
testSession.ShardSessions = nil
if !proto.Equal(&testSession, wantSession) {
t.Errorf("autocommit=0: %v, want %v", testSession, wantSession)
}
utils.MustMatch(t, wantSession, &testSession, "session does not match for autocommit=0")

logStats := testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from main1", 1)
if logStats.CommitTime != 0 {
Expand Down Expand Up @@ -359,10 +359,8 @@ func TestExecutorAutocommit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
wantSession = &vtgatepb.Session{Autocommit: true, TargetString: "@master", FoundRows: 1}
if !proto.Equal(session.Session, wantSession) {
t.Errorf("autocommit=1: %v, want %v", session.Session, wantSession)
}
wantSession = &vtgatepb.Session{Autocommit: true, TargetString: "@master", FoundRows: 1, RowCount: 1}
utils.MustMatch(t, wantSession, session.Session, "session does not match for autocommit=1")
if got, want := sbclookup.AsTransactionCount.Get(), startCount+1; got != want {
t.Errorf("Commit count: %d, want %d", got, want)
}
Expand All @@ -388,12 +386,10 @@ func TestExecutorAutocommit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
wantSession = &vtgatepb.Session{InTransaction: true, Autocommit: true, TargetString: "@master", FoundRows: 1}
wantSession = &vtgatepb.Session{InTransaction: true, Autocommit: true, TargetString: "@master", FoundRows: 1, RowCount: 1}
testSession = *session.Session
testSession.ShardSessions = nil
if !proto.Equal(&testSession, wantSession) {
t.Errorf("autocommit=1: %v, want %v", &testSession, wantSession)
}
utils.MustMatch(t, wantSession, &testSession, "session does not match for autocommit=1")
if got, want := sbclookup.CommitCount.Get(), startCount; got != want {
t.Errorf("Commit count: %d, want %d", got, want)
}
Expand Down Expand Up @@ -1025,10 +1021,8 @@ func TestExecutorUse(t *testing.T) {
if err != nil {
t.Error(err)
}
wantSession := &vtgatepb.Session{Autocommit: true, TargetString: want[i]}
if !proto.Equal(session.Session, wantSession) {
t.Errorf("%s: %v, want %v", stmt, session.Session, wantSession)
}
wantSession := &vtgatepb.Session{Autocommit: true, TargetString: want[i], RowCount: -1}
utils.MustMatch(t, wantSession, session.Session, "session does not match")
}

_, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{}), "use 1", nil)
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vtgate/fallback_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package vtgate
import (
"context"

"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/planbuilder"
Expand All @@ -36,10 +38,10 @@ This method implements the fall back logic.
If exA(plan execute) is not able to plan the statement then it fall backs to exB(execute) method.
There is no fallback for parsing errors.
*/
func (f *fallbackExecutor) execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*query.BindVariable, logStats *LogStats) (*sqltypes.Result, error) {
qr, err := f.exA.execute(ctx, safeSession, sql, bindVars, logStats)
func (f *fallbackExecutor) execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*query.BindVariable, logStats *LogStats) (sqlparser.StatementType, *sqltypes.Result, error) {
stmtType, qr, err := f.exA.execute(ctx, safeSession, sql, bindVars, logStats)
if err == planbuilder.ErrPlanNotSupported {
return f.exB.execute(ctx, safeSession, sql, bindVars, logStats)
}
return qr, err
return stmtType, qr, err
}
42 changes: 21 additions & 21 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ func NewTestExecutor(ctx context.Context, strat func(executor *Executor) execute
return e
}

func (e *planExecute) execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) (*sqltypes.Result, error) {
func (e *planExecute) execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) (sqlparser.StatementType, *sqltypes.Result, error) {
// 1: Prepare before planning and execution

// Start an implicit transaction if necessary.
err := e.startTxIfNecessary(ctx, safeSession)
if err != nil {
return nil, err
return 0, nil, err
}

if bindVars == nil {
Expand All @@ -97,7 +97,7 @@ func (e *planExecute) execute(ctx context.Context, safeSession *SafeSession, sql
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, err := newVCursorImpl(ctx, safeSession, comments, e.e, logStats, e.e.vm, e.e.resolver.resolver)
if err != nil {
return nil, err
return 0, nil, err
}

// 2: Create a plan for the query
Expand All @@ -110,13 +110,13 @@ func (e *planExecute) execute(ctx context.Context, safeSession *SafeSession, sql
logStats,
)
if err == planbuilder.ErrPlanNotSupported {
return nil, err
return 0, nil, err
}
execStart := e.logPlanningFinished(logStats, sql)

if err != nil {
safeSession.ClearWarnings()
return nil, err
return 0, nil, err
}

if plan.Type != sqlparser.StmtShow {
Expand All @@ -127,18 +127,21 @@ func (e *planExecute) execute(ctx context.Context, safeSession *SafeSession, sql
// will fall through and be handled through planning
switch plan.Type {
case sqlparser.StmtBegin:
return e.e.handleBegin(ctx, safeSession, vcursor.tabletType, logStats)
qr, err := e.e.handleBegin(ctx, safeSession, vcursor.tabletType, logStats)
return sqlparser.StmtBegin, qr, err
case sqlparser.StmtCommit:
return e.e.handleCommit(ctx, safeSession, logStats)
qr, err := e.e.handleCommit(ctx, safeSession, logStats)
return sqlparser.StmtCommit, qr, err
case sqlparser.StmtRollback:
return e.e.handleRollback(ctx, safeSession, logStats)
qr, err := e.e.handleRollback(ctx, safeSession, logStats)
return sqlparser.StmtRollback, qr, err
}

// 3: Prepare for execution
err = e.e.addNeededBindVars(plan.BindVarNeeds, bindVars, safeSession)
if err != nil {
logStats.Error = err
return nil, err
return 0, nil, err
}

if plan.Instructions.NeedsTransaction() {
Expand All @@ -158,12 +161,12 @@ func (e *planExecute) startTxIfNecessary(ctx context.Context, safeSession *SafeS
return nil
}

func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSession, logStats *LogStats, f currFunc) (*sqltypes.Result, error) {
func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSession, logStats *LogStats, f currFunc) (sqlparser.StatementType, *sqltypes.Result, error) {
mustCommit := false
if safeSession.Autocommit && !safeSession.InTransaction() {
mustCommit = true
if err := e.e.txConn.Begin(ctx, safeSession); err != nil {
return nil, err
return 0, nil, err
}
}

Expand All @@ -178,9 +181,9 @@ func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSe
safeSession.SetAutocommittable(mustCommit)

// Execute!
result, err := f(logStats, safeSession)
stmtType, result, err := f(logStats, safeSession)
if err != nil {
return nil, err
return 0, nil, err
}

if mustCommit {
Expand All @@ -190,22 +193,19 @@ func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSe

commitStart := time.Now()
if err := e.e.txConn.Commit(ctx, safeSession); err != nil {
return nil, err
return 0, nil, err
}
logStats.CommitTime = time.Since(commitStart)
}
return result, nil
return stmtType, result, nil
}

type currFunc func(*LogStats, *SafeSession) (*sqltypes.Result, error)
type currFunc func(*LogStats, *SafeSession) (sqlparser.StatementType, *sqltypes.Result, error)

func (e *planExecute) executePlan(ctx context.Context, plan *engine.Plan, vcursor *vcursorImpl, bindVars map[string]*querypb.BindVariable, execStart time.Time) currFunc {
return func(logStats *LogStats, safeSession *SafeSession) (*sqltypes.Result, error) {
return func(logStats *LogStats, safeSession *SafeSession) (sqlparser.StatementType, *sqltypes.Result, error) {
// 4: Execute!
qr, err := plan.Instructions.Execute(vcursor, bindVars, true)
if err == nil && qr != nil && qr.InsertID > 0 {
safeSession.LastInsertId = qr.InsertID
}

// 5: Log and add statistics
errCount := e.logExecutionEnd(logStats, execStart, plan, err, qr)
Expand All @@ -216,7 +216,7 @@ func (e *planExecute) executePlan(ctx context.Context, plan *engine.Plan, vcurso
_ = e.e.txConn.Rollback(ctx, safeSession)
err = vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction rolled back due to partial DML execution: %v", err)
}
return qr, err
return plan.Type, qr, err
}
}

Expand Down
6 changes: 2 additions & 4 deletions go/vt/vtgate/plan_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,10 +983,8 @@ func TestPlanExecutorUse(t *testing.T) {
if err != nil {
t.Error(err)
}
wantSession := &vtgatepb.Session{Autocommit: true, TargetString: want[i]}
if !proto.Equal(session.Session, wantSession) {
t.Errorf("%s: %v, want %v", stmt, session.Session, wantSession)
}
wantSession := &vtgatepb.Session{Autocommit: true, TargetString: want[i], RowCount: -1}
utils.MustMatch(t, wantSession, session.Session, "session does not match")
}

_, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{}), "use 1", nil)
Expand Down

0 comments on commit 830f31d

Please sign in to comment.