Skip to content

Commit

Permalink
*: remove useless code in executor. (pingcap#4982)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored and winoros committed Nov 2, 2017
1 parent e34fc44 commit 2339a70
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 1,786 deletions.
181 changes: 80 additions & 101 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 PingCAP, Inc.
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
Expand All @@ -32,16 +32,14 @@ var (
)

var (
_ SelectResult = &selectResult{}
_ PartialResult = &partialResult{}
_ NewSelectResult = &newSelectResult{}
_ NewPartialResult = &newPartialResult{}
)

var selectResultGP = gp.New(2 * time.Minute)

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// NewSelectResult is an iterator of coprocessor partial results.
type NewSelectResult interface {
// Next gets the next partial result.
Next() (PartialResult, error)
Next() (NewPartialResult, error)
// NextRaw gets the next raw result.
NextRaw() ([]byte, error)
// Close closes the iterator.
Expand All @@ -51,37 +49,36 @@ type SelectResult interface {
Fetch(ctx goctx.Context)
}

// PartialResult is the result from a single region server.
type PartialResult interface {
// NewPartialResult is the result from a single region server.
type NewPartialResult interface {
// Next returns the next rowData of the sub result.
// If no more row to return, rowData would be nil.
Next() (handle int64, rowData []byte, err error)
Next() (rowData []types.Datum, err error)
// Close closes the partial result.
Close() error
}

// SelectResult is used to get response rows from SelectRequest.
type selectResult struct {
type newSelectResult struct {
label string
aggregate bool
resp kv.Response

results chan resultWithErr
results chan newResultWithErr
closed chan struct{}

rowLen int
}

type resultWithErr struct {
type newResultWithErr struct {
result []byte
err error
}

func (r *selectResult) Fetch(ctx goctx.Context) {
selectResultGP.Go(func() {
r.fetch(ctx)
})
func (r *newSelectResult) Fetch(ctx goctx.Context) {
go r.fetch(ctx)
}

func (r *selectResult) fetch(ctx goctx.Context) {
func (r *newSelectResult) fetch(ctx goctx.Context) {
startTime := time.Now()
defer func() {
close(r.results)
Expand All @@ -91,17 +88,17 @@ func (r *selectResult) fetch(ctx goctx.Context) {
for {
resultSubset, err := r.resp.Next()
if err != nil {
r.results <- resultWithErr{err: errors.Trace(err)}
r.results <- newResultWithErr{err: errors.Trace(err)}
return
}
if resultSubset == nil {
return
}

select {
case r.results <- resultWithErr{result: resultSubset}:
case r.results <- newResultWithErr{result: resultSubset}:
case <-r.closed:
// if selectResult called Close() already, make fetch goroutine exit
// If selectResult called Close() already, make fetch goroutine exit.
return
case <-ctx.Done():
return
Expand All @@ -110,41 +107,40 @@ func (r *selectResult) fetch(ctx goctx.Context) {
}

// Next returns the next row.
func (r *selectResult) Next() (PartialResult, error) {
func (r *newSelectResult) Next() (NewPartialResult, error) {
re := <-r.results
if re.err != nil {
return nil, errors.Trace(re.err)
}
if re.result == nil {
return nil, nil
}
pr := &partialResult{}
pr := &newPartialResult{}
pr.rowLen = r.rowLen
err := pr.unmarshal(re.result)
return pr, errors.Trace(err)
}

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw() ([]byte, error) {
func (r *newSelectResult) NextRaw() ([]byte, error) {
re := <-r.results
return re.result, errors.Trace(re.err)
}

// Close closes SelectResult.
func (r *selectResult) Close() error {
// close this channel tell fetch goroutine to exit
func (r *newSelectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
close(r.closed)
return r.resp.Close()
}

// partialResult represents a subset of select result.
type partialResult struct {
resp *tipb.SelectResponse
chunkIdx int
cursor int
dataOffset int64
type newPartialResult struct {
resp *tipb.SelectResponse
chunkIdx int
rowLen int
}

func (pr *partialResult) unmarshal(resultSubset []byte) error {
func (pr *newPartialResult) unmarshal(resultSubset []byte) error {
pr.resp = new(tipb.SelectResponse)
err := pr.resp.Unmarshal(resultSubset)
if err != nil {
Expand All @@ -158,121 +154,104 @@ func (pr *partialResult) unmarshal(resultSubset []byte) error {
return nil
}

var zeroLenData = make([]byte, 0)

// Next returns the next row of the sub result.
// If no more row to return, data would be nil.
func (pr *partialResult) Next() (handle int64, data []byte, err error) {
func (pr *newPartialResult) Next() (data []types.Datum, err error) {
chunk := pr.getChunk()
if chunk == nil {
return 0, nil, nil
return nil, nil
}
rowMeta := chunk.RowsMeta[pr.cursor]
data = chunk.RowsData[pr.dataOffset : pr.dataOffset+rowMeta.Length]
if data == nil {
// The caller checks if data is nil to determine finished.
data = zeroLenData
data = make([]types.Datum, pr.rowLen)
for i := 0; i < pr.rowLen; i++ {
var l []byte
l, chunk.RowsData, err = codec.CutOne(chunk.RowsData)
if err != nil {
return nil, errors.Trace(err)
}
data[i].SetRaw(l)
}
pr.dataOffset += rowMeta.Length
handle = rowMeta.Handle
pr.cursor++
return
}

func (pr *partialResult) getChunk() *tipb.Chunk {
func (pr *newPartialResult) getChunk() *tipb.Chunk {
for {
if pr.chunkIdx >= len(pr.resp.Chunks) {
return nil
}
chunk := &pr.resp.Chunks[pr.chunkIdx]
if pr.cursor < len(chunk.RowsMeta) {
if len(chunk.RowsData) > 0 {
return chunk
}
pr.cursor = 0
pr.dataOffset = 0
pr.chunkIdx++
}
}

// Close closes the sub result.
func (pr *partialResult) Close() error {
func (pr *newPartialResult) Close() error {
return nil
}

// Select do a select request, returns SelectResult.
// concurrency: The max concurrency for underlying coprocessor request.
// keepOrder: If the result should returned in key order. For example if we need keep data in order by
// scan index, we should set keepOrder to true.
func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, isolationLevel kv.IsoLevel, priority int) (SelectResult, error) {
// NewSelectDAG sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func NewSelectDAG(ctx goctx.Context, client kv.Client, kvReq *kv.Request, colLen int) (NewSelectResult, error) {
var err error
defer func() {
// Add metrics
// Add metrics.
if err != nil {
queryCounter.WithLabelValues(queryFailed).Inc()
} else {
queryCounter.WithLabelValues(querySucc).Inc()
}
}()

// Convert tipb.*Request to kv.Request.
kvReq, err1 := composeRequest(req, keyRanges, concurrency, keepOrder, isolationLevel, priority)
if err1 != nil {
err = errors.Trace(err1)
return nil, err
}

resp := client.Send(ctx, kvReq)
if resp == nil {
return nil, errors.New("client returns nil response")
err = errors.New("client returns nil response")
return nil, errors.Trace(err)
}
result := &selectResult{
result := &newSelectResult{
label: "dag",
resp: resp,
results: make(chan resultWithErr, 5),
results: make(chan newResultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
rowLen: colLen,
}
// If Aggregates is not nil, we should set result fields latter.
if len(req.Aggregates) == 0 && len(req.GroupBy) == 0 {
if req.TableInfo != nil {
result.label = "table"
return result, nil
}

// NewAnalyze do a analyze request.
func NewAnalyze(ctx goctx.Context, client kv.Client, kvReq *kv.Request) (NewSelectResult, error) {
var err error
defer func() {
// Add metrics.
if err != nil {
queryCounter.WithLabelValues(queryFailed).Inc()
} else {
result.label = "index"
queryCounter.WithLabelValues(querySucc).Inc()
}
} else {
result.label = "aggregate"
}()

resp := client.Send(ctx, kvReq)
if resp == nil {
return nil, errors.New("client returns nil response")
}
result := &newSelectResult{
label: "analyze",
resp: resp,
results: make(chan newResultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
}
return result, nil
}

// Convert tipb.Request to kv.Request.
func composeRequest(req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, isolationLevel kv.IsoLevel, priority int) (*kv.Request, error) {
kvReq := &kv.Request{
StartTs: req.StartTs,
Concurrency: concurrency,
KeepOrder: keepOrder,
KeyRanges: keyRanges,
IsolationLevel: isolationLevel,
Priority: priority,
}
if req.IndexInfo != nil {
kvReq.Tp = kv.ReqTypeIndex
} else {
kvReq.Tp = kv.ReqTypeSelect
}
if req.OrderBy != nil {
kvReq.Desc = req.OrderBy[0].Desc
}
var err error
kvReq.Data, err = req.Marshal()
if err != nil {
return nil, errors.Trace(err)
}
return kvReq, nil
type resultWithErr struct {
result []byte
err error
}

// XAPI error codes.
const (
codeInvalidResp = 1
codeNilResp = 2
)

// FieldTypeFromPBColumn creates a types.FieldType from tipb.ColumnInfo.
Expand Down
39 changes: 0 additions & 39 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@ package distsql

import (
"errors"
"runtime"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -134,42 +131,6 @@ func (s *testDistsqlSuite) TestIndexToProto(c *C) {
c.Assert(pIdx.Unique, Equals, true)
}

// For issue 1791
func (s *testDistsqlSuite) TestGoroutineLeak(c *C) {
var sr SelectResult
countBefore := runtime.NumGoroutine()

sr = &selectResult{
resp: &mockResponse{},
results: make(chan resultWithErr, 5),
closed: make(chan struct{}),
}
go sr.Fetch(goctx.TODO())
for {
// mock test will generate some partial result then return error
_, err := sr.Next()
if err != nil {
// close selectResult on error, partialResult's fetch goroutine may leak
sr.Close()
break
}
}

tick := 10 * time.Millisecond
totalSleep := time.Duration(0)
for totalSleep < 3*time.Second {
time.Sleep(tick)
totalSleep += tick
countAfter := runtime.NumGoroutine()

if countAfter-countBefore < 5 {
return
}
}

c.Error("distsql goroutine leak!")
}

type mockResponse struct {
count int
}
Expand Down
Loading

0 comments on commit 2339a70

Please sign in to comment.