-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: encode the returned data using the Chunk format in mocktikv #12023
Changes from 39 commits
4470f2c
4f3c090
b57c1c7
6d34738
0fddb75
8547dea
818f590
d841cc6
17389b3
3c0505e
c42e3ba
ae73a0c
41d2abb
464b816
6808fc2
4ea9b85
32e4583
2758e02
b96379b
8eb59bd
64a88e0
324ad76
d6d5362
2892c8a
64f2bd3
4b0bc99
56e85b7
2deb7c5
be50349
9b2d50e
81a5c13
a181642
08a512d
59d8619
f057d09
0f9662b
aa8dc9d
a86f1c6
feb2cd0
07e9804
a32153d
96a7951
392ec3c
388fed5
7f67e84
26b704b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,7 @@ type selectResult struct { | |
feedback *statistics.QueryFeedback | ||
partialCount int64 // number of partial results. | ||
sqlType string | ||
encodeType tipb.EncodeType | ||
|
||
// copPlanIDs contains all copTasks' planIDs, | ||
// which help to collect copTasks' runtime stats. | ||
|
@@ -145,8 +146,30 @@ func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) { | |
// Next reads data to the chunk. | ||
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { | ||
chk.Reset() | ||
// Check the returned data is default/arrow format. | ||
XuHuaiyu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if r.selectResp == nil || (len(r.selectResp.RowBatchData) == 0 && r.respChkIdx == len(r.selectResp.Chunks)) { | ||
err := r.getSelectResp() | ||
if err != nil || r.selectResp == nil { | ||
return err | ||
} | ||
// TODO(Shenghui Wu): add metrics | ||
if len(r.selectResp.RowBatchData) == 0 { | ||
r.encodeType = tipb.EncodeType_TypeDefault | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, we only need to do this check when the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now the check is temporary ad-hoc. The next pr will add a field |
||
} | ||
} | ||
|
||
switch r.encodeType { | ||
case tipb.EncodeType_TypeDefault: | ||
return r.readFromDefault(ctx, chk) | ||
case tipb.EncodeType_TypeArrow: | ||
return r.readFromArrow(ctx, chk) | ||
} | ||
return errors.Errorf("unsupported encode type:%v", r.encodeType) | ||
} | ||
|
||
func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) error { | ||
for !chk.IsFull() { | ||
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) { | ||
if r.respChkIdx == len(r.selectResp.Chunks) { | ||
err := r.getSelectResp() | ||
if err != nil || r.selectResp == nil { | ||
return err | ||
|
@@ -163,6 +186,14 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { | |
return nil | ||
} | ||
|
||
func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { | ||
rowBatchData := r.selectResp.RowBatchData | ||
codec := chunk.NewCodec(r.fieldTypes) | ||
XuHuaiyu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
remained := codec.DecodeToChunk(rowBatchData, chk) | ||
zz-jason marked this conversation as resolved.
Show resolved
Hide resolved
|
||
r.selectResp.RowBatchData = remained | ||
return nil | ||
} | ||
|
||
func (r *selectResult) getSelectResp() error { | ||
r.respChkIdx = 0 | ||
for { | ||
|
@@ -196,7 +227,7 @@ func (r *selectResult) getSelectResp() error { | |
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) | ||
r.partialCount++ | ||
sc.MergeExecDetails(re.result.GetExecDetails(), nil) | ||
if len(r.selectResp.Chunks) == 0 { | ||
if len(r.selectResp.Chunks) == 0 && len(r.selectResp.RowBatchData) == 0 { | ||
continue | ||
} | ||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2018,7 +2018,7 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) { | |
} | ||
c.Assert(count, Equals, 100) | ||
// FIXME: revert this result to new group value after distsql can handle initChunkSize. | ||
c.Assert(numChunks, Equals, 1) | ||
c.Assert(numChunks, Equals, 10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it's changed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the test case, it |
||
rs.Close() | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -352,6 +352,9 @@ type SessionVars struct { | |
// TODO: remove this after tidb-server configuration "enable-streaming' removed. | ||
EnableStreaming bool | ||
|
||
// EnableArrow indicates whether the coprocessor request can use arrow API. | ||
EnableArrow bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can make this config item be able to be reloaded without restarting the tidb-server like other hot-reloadable config items. It's not urgent, we can do this in another PR. |
||
|
||
writeStmtBufs WriteStmtBufs | ||
|
||
// L2CacheSize indicates the size of CPU L2 cache, using byte as unit. | ||
|
@@ -525,6 +528,14 @@ func NewSessionVars() *SessionVars { | |
enableStreaming = "0" | ||
} | ||
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming)) | ||
|
||
var enableArrow string | ||
if config.GetGlobalConfig().TiKVClient.EnableArrow { | ||
enableArrow = "1" | ||
} else { | ||
enableArrow = "0" | ||
} | ||
terror.Log(vars.SetSystemVar(TiDBEnableArrow, enableArrow)) | ||
return vars | ||
} | ||
|
||
|
@@ -848,6 +859,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { | |
s.DisableTxnAutoRetry = TiDBOptOn(val) | ||
case TiDBEnableStreaming: | ||
s.EnableStreaming = TiDBOptOn(val) | ||
case TiDBEnableArrow: | ||
s.EnableArrow = TiDBOptOn(val) | ||
case TiDBEnableCascadesPlanner: | ||
s.EnableCascadesPlanner = TiDBOptOn(val) | ||
case TiDBOptimizerSelectivityLevel: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ useChunkIPC/ enableTypeArrow