Skip to content

Commit

Permalink
*: remove Next function for RecordSet (pingcap#6040)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and zz-jason committed Mar 13, 2018
1 parent f59c5be commit 631141b
Show file tree
Hide file tree
Showing 23 changed files with 228 additions and 158 deletions.
3 changes: 0 additions & 3 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ type RecordSet interface {
// Fields gets result fields.
Fields() []*ResultField

// Next returns the next row, nil row means there is no more to return.
Next(ctx context.Context) (row types.Row, err error)

// NextChunk reads records into chunk.
NextChunk(ctx context.Context, chk *chunk.Chunk) error

Expand Down
7 changes: 4 additions & 3 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,16 @@ func prepareJoinBenchData(se Session, colType string, valueFormat string, valueC
}

func readResult(ctx context.Context, rs ast.RecordSet, count int) {
chk := rs.NewChunk()
for count > 0 {
x, err := rs.Next(ctx)
err := rs.NextChunk(ctx, chk)
if err != nil {
log.Fatal(err)
}
if x == nil {
if chk.NumRows() == 0 {
log.Fatal(count)
}
count--
count -= chk.NumRows()
}
rs.Close()
}
Expand Down
33 changes: 20 additions & 13 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/chunk"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -270,10 +271,12 @@ func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) {
}
r := rs[0]
defer terror.Call(r.Close)
row, err := r.Next(ctx)
if err != nil || row == nil {
chk := r.NewChunk()
err = r.NextChunk(ctx, chk)
if err != nil || chk.NumRows() == 0 {
return "", true, errors.Trace(err)
}
row := chk.GetRow(0)
if row.IsNull(0) {
return "", true, nil
}
Expand Down Expand Up @@ -473,17 +476,21 @@ func upgradeToVer12(s Session) {
r := rs[0]
sqls := make([]string, 0, 1)
defer terror.Call(r.Close)
row, err := r.Next(ctx)
for err == nil && row != nil {
user := row.GetString(0)
host := row.GetString(1)
pass := row.GetString(2)
var newPass string
newPass, err = oldPasswordUpgrade(pass)
terror.MustNil(err)
updateSQL := fmt.Sprintf(`UPDATE mysql.user set password = "%s" where user="%s" and host="%s"`, newPass, user, host)
sqls = append(sqls, updateSQL)
row, err = r.Next(ctx)
chk := r.NewChunk()
it := chunk.NewIterator4Chunk(chk)
err = r.NextChunk(ctx, chk)
for err == nil && chk.NumRows() != 0 {
for row := it.Begin(); row != it.End(); row = it.Next() {
user := row.GetString(0)
host := row.GetString(1)
pass := row.GetString(2)
var newPass string
newPass, err = oldPasswordUpgrade(pass)
terror.MustNil(err)
updateSQL := fmt.Sprintf(`UPDATE mysql.user set password = "%s" where user="%s" and host="%s"`, newPass, user, host)
sqls = append(sqls, updateSQL)
}
err = r.NextChunk(ctx, chk)
}
terror.MustNil(err)

Expand Down
51 changes: 33 additions & 18 deletions bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ func (s *testBootstrapSuite) TestBootstrap(c *C) {
r := mustExecSQL(c, se, `select * from user;`)
c.Assert(r, NotNil)
ctx := context.Background()
row, err := r.Next(ctx)
chk := r.NewChunk()
err := r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(row, NotNil)
datums := ast.RowToDatums(row, r.Fields())
c.Assert(chk.NumRows() == 0, IsFalse)
datums := ast.RowToDatums(chk.GetRow(0), r.Fields())
match(c, datums, []byte("%"), []byte("root"), []byte(""), "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y")

c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "anyhost"}, []byte(""), []byte("")), IsTrue)
Expand All @@ -65,9 +66,10 @@ func (s *testBootstrapSuite) TestBootstrap(c *C) {
// Check privilege tables.
r = mustExecSQL(c, se, "SELECT COUNT(*) from mysql.global_variables;")
c.Assert(r, NotNil)
v, err := r.Next(ctx)
chk = r.NewChunk()
err = r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(v.GetInt64(0), Equals, globalVarsCount())
c.Assert(chk.GetRow(0).GetInt64(0), Equals, globalVarsCount())

// Check a storage operations are default autocommit after the second start.
mustExecSQL(c, se, "USE test;")
Expand All @@ -84,9 +86,11 @@ func (s *testBootstrapSuite) TestBootstrap(c *C) {
mustExecSQL(c, se, "USE test;")
r = mustExecSQL(c, se, "select * from t")
c.Assert(r, NotNil)
v, err = r.Next(ctx)

chk = r.NewChunk()
err = r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
datums = ast.RowToDatums(v, r.Fields())
datums = ast.RowToDatums(chk.GetRow(0), r.Fields())
match(c, datums, 3)
mustExecSQL(c, se, "drop table if exists t")
se.Close()
Expand Down Expand Up @@ -143,9 +147,11 @@ func (s *testBootstrapSuite) testBootstrapWithError(c *C) {
se := newSession(c, store, s.dbNameBootstrap)
mustExecSQL(c, se, "USE mysql;")
r := mustExecSQL(c, se, `select * from user;`)
row, err := r.Next(ctx)
chk := r.NewChunk()
err := r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(row, NotNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row := chk.GetRow(0)
datums := ast.RowToDatums(row, r.Fields())
match(c, datums, []byte("%"), []byte("root"), []byte(""), "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y", "Y")
mustExecSQL(c, se, "USE test;")
Expand All @@ -155,14 +161,18 @@ func (s *testBootstrapSuite) testBootstrapWithError(c *C) {
mustExecSQL(c, se, "SELECT * from mysql.columns_priv;")
// Check global variables.
r = mustExecSQL(c, se, "SELECT COUNT(*) from mysql.global_variables;")
v, err := r.Next(ctx)
chk = r.NewChunk()
err = r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
v := chk.GetRow(0)
c.Assert(v.GetInt64(0), Equals, globalVarsCount())

r = mustExecSQL(c, se, `SELECT VARIABLE_VALUE from mysql.TiDB where VARIABLE_NAME="bootstrapped";`)
row, err = r.Next(ctx)
chk = r.NewChunk()
err = r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(row, NotNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row = chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetBytes(0), BytesEquals, []byte("True"))

Expand All @@ -181,9 +191,11 @@ func (s *testBootstrapSuite) TestUpgrade(c *C) {

// bootstrap with currentBootstrapVersion
r := mustExecSQL(c, se, `SELECT VARIABLE_VALUE from mysql.TiDB where VARIABLE_NAME="tidb_server_version";`)
row, err := r.Next(ctx)
chk := r.NewChunk()
err := r.NextChunk(ctx, chk)
row := chk.GetRow(0)
c.Assert(err, IsNil)
c.Assert(row, NotNil)
c.Assert(chk.NumRows() == 0, IsFalse)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetBytes(0), BytesEquals, []byte(fmt.Sprintf("%d", currentBootstrapVersion)))

Expand All @@ -208,9 +220,10 @@ func (s *testBootstrapSuite) TestUpgrade(c *C) {
delete(storeBootstrapped, store.UUID())
// Make sure the version is downgraded.
r = mustExecSQL(c, se1, `SELECT VARIABLE_VALUE from mysql.TiDB where VARIABLE_NAME="tidb_server_version";`)
row, err = r.Next(ctx)
chk = r.NewChunk()
err = r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(row, IsNil)
c.Assert(chk.NumRows() == 0, IsTrue)

ver, err = getBootstrapVersion(se1)
c.Assert(err, IsNil)
Expand All @@ -222,9 +235,11 @@ func (s *testBootstrapSuite) TestUpgrade(c *C) {
defer dom1.Close()
se2 := newSession(c, store, s.dbName)
r = mustExecSQL(c, se2, `SELECT VARIABLE_VALUE from mysql.TiDB where VARIABLE_NAME="tidb_server_version";`)
row, err = r.Next(ctx)
chk = r.NewChunk()
err = r.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(row, NotNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row = chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetBytes(0), BytesEquals, []byte(fmt.Sprintf("%d", currentBootstrapVersion)))

Expand Down
9 changes: 5 additions & 4 deletions cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ func (ut *benchDB) mustExec(sql string) {
if len(rss) > 0 {
ctx := context.Background()
rs := rss[0]
chk := rs.NewChunk()
for {
row, err1 := rs.Next(ctx)
if err1 != nil {
log.Fatal(err1)
err := rs.NextChunk(ctx, chk)
if err != nil {
log.Fatal(err)
}
if row == nil {
if chk.NumRows() == 0 {
break
}
}
Expand Down
36 changes: 21 additions & 15 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -54,28 +55,33 @@ func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRan
}

rs := rss[0]
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
for {
row, err := rs.Next(context.TODO())
err = rs.NextChunk(context.TODO(), chk)
if err != nil {
return nil, errors.Trace(err)
}
if row == nil {
if chk.NumRows() == 0 {
break
}
startKey, err := hex.DecodeString(row.GetString(2))
if err != nil {
return nil, errors.Trace(err)
}
endKey, err := hex.DecodeString(row.GetString(3))
if err != nil {
return nil, errors.Trace(err)

for row := it.Begin(); row != it.End(); row = it.Next() {
startKey, err := hex.DecodeString(row.GetString(2))
if err != nil {
return nil, errors.Trace(err)
}
endKey, err := hex.DecodeString(row.GetString(3))
if err != nil {
return nil, errors.Trace(err)
}
ranges = append(ranges, DelRangeTask{
JobID: row.GetInt64(0),
ElementID: row.GetInt64(1),
StartKey: startKey,
EndKey: endKey,
})
}
ranges = append(ranges, DelRangeTask{
JobID: row.GetInt64(0),
ElementID: row.GetInt64(1),
StartKey: startKey,
EndKey: endKey,
})
}
return ranges, nil
}
Expand Down
25 changes: 18 additions & 7 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/testkit"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -64,23 +65,31 @@ func (s *testSuite) TestCreateTable(c *C) {
rs, err := tk.Exec(`desc issue312_1`)
c.Assert(err, IsNil)
ctx := context.Background()
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
for {
row, err1 := rs.Next(ctx)
err1 := rs.NextChunk(ctx, chk)
c.Assert(err1, IsNil)
if row == nil {
if chk.NumRows() == 0 {
break
}
c.Assert(row.GetString(1), Equals, "float")
for row := it.Begin(); row != it.End(); row = it.Next() {
c.Assert(row.GetString(1), Equals, "float")
}
}
rs, err = tk.Exec(`desc issue312_2`)
c.Assert(err, IsNil)
chk = rs.NewChunk()
it = chunk.NewIterator4Chunk(chk)
for {
row, err1 := rs.Next(ctx)
err1 := rs.NextChunk(ctx, chk)
c.Assert(err1, IsNil)
if row == nil {
if chk.NumRows() == 0 {
break
}
c.Assert(row.GetString(1), Equals, "double")
for row := it.Begin(); row != it.End(); row = it.Next() {
c.Assert(chk.GetRow(0).GetString(1), Equals, "double")
}
}

// table option is auto-increment
Expand Down Expand Up @@ -146,8 +155,10 @@ func (s *testSuite) TestAlterTableAddColumn(c *C) {
now := time.Now().Add(-time.Duration(1 * time.Second)).Format(types.TimeFormat)
r, err := tk.Exec("select c2 from alter_test")
c.Assert(err, IsNil)
row, err := r.Next(context.Background())
chk := r.NewChunk()
err = r.NextChunk(context.Background(), chk)
c.Assert(err, IsNil)
row := chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(now, GreaterEqual, row.GetTime(0).String())
tk.MustExec("alter table alter_test add column c3 varchar(50) default 'CURRENT_TIMESTAMP'")
Expand Down
17 changes: 11 additions & 6 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func (s *testSuite) TestIndexDoubleReadClose(c *C) {

rs, err := tk.Exec("select * from dist where c_idx between 0 and 100")
c.Assert(err, IsNil)
_, err = rs.Next(context.Background())
chk := rs.NewChunk()
err = rs.NextChunk(context.Background(), chk)
c.Assert(err, IsNil)
c.Assert(err, IsNil)
keyword := "pickAndExecTask"
rs.Close()
Expand Down Expand Up @@ -103,9 +105,10 @@ func (s *testSuite) TestCopClientSend(c *C) {
rs, err := tk.Exec("select sum(id) from copclient")
c.Assert(err, IsNil)
defer rs.Close()
row, err := rs.Next(ctx)
chk := rs.NewChunk()
err = rs.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(row.GetMyDecimal(0).String(), Equals, "499500")
c.Assert(chk.GetRow(0).GetMyDecimal(0).String(), Equals, "499500")

// Split one region.
key := tablecodec.EncodeRowKeyWithHandle(tblID, 500)
Expand All @@ -116,15 +119,17 @@ func (s *testSuite) TestCopClientSend(c *C) {
// Check again.
rs, err = tk.Exec("select sum(id) from copclient")
c.Assert(err, IsNil)
row, err = rs.Next(ctx)
chk = rs.NewChunk()
err = rs.NextChunk(ctx, chk)
c.Assert(err, IsNil)
c.Assert(row.GetMyDecimal(0).String(), Equals, "499500")
c.Assert(chk.GetRow(0).GetMyDecimal(0).String(), Equals, "499500")
rs.Close()

// Check there is no goroutine leak.
rs, err = tk.Exec("select * from copclient order by id")
c.Assert(err, IsNil)
_, err = rs.Next(ctx)
chk = rs.NewChunk()
err = rs.NextChunk(ctx, chk)
c.Assert(err, IsNil)
rs.Close()
keyword := "(*copIterator).work"
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (e *CancelDDLJobsExec) NextChunk(ctx context.Context, chk *chunk.Chunk) err
}
numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobIDs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
chk.AppendInt64(0, e.jobIDs[i])
chk.AppendString(0, fmt.Sprintf("%d", e.jobIDs[i]))
if e.errs[i] != nil {
chk.AppendString(1, fmt.Sprintf("error: %v", e.errs[i]))
} else {
Expand Down
Loading

0 comments on commit 631141b

Please sign in to comment.