Skip to content

Commit a770f71

Browse files
authored
Merge pull request #11697 from RaduBerinde/trivial-router
distsql: add passthrough router type
2 parents f91562d + 9a05ab2 commit a770f71

File tree

9 files changed

+113
-95
lines changed

9 files changed

+113
-95
lines changed

pkg/sql/distsql/cluster_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func TestClusterFlow(t *testing.T) {
122122
Processors: []ProcessorSpec{{
123123
Core: ProcessorCoreUnion{TableReader: &tr1},
124124
Output: []OutputRouterSpec{{
125-
Type: OutputRouterSpec_MIRROR,
125+
Type: OutputRouterSpec_PASS_THROUGH,
126126
Streams: []StreamEndpointSpec{
127127
{Type: StreamEndpointSpec_REMOTE, StreamID: 0, TargetAddr: tc.Server(2).ServingAddr()},
128128
},
@@ -136,7 +136,7 @@ func TestClusterFlow(t *testing.T) {
136136
Processors: []ProcessorSpec{{
137137
Core: ProcessorCoreUnion{TableReader: &tr2},
138138
Output: []OutputRouterSpec{{
139-
Type: OutputRouterSpec_MIRROR,
139+
Type: OutputRouterSpec_PASS_THROUGH,
140140
Streams: []StreamEndpointSpec{
141141
{Type: StreamEndpointSpec_REMOTE, StreamID: 1, TargetAddr: tc.Server(2).ServingAddr()},
142142
},
@@ -151,7 +151,7 @@ func TestClusterFlow(t *testing.T) {
151151
{
152152
Core: ProcessorCoreUnion{TableReader: &tr3},
153153
Output: []OutputRouterSpec{{
154-
Type: OutputRouterSpec_MIRROR,
154+
Type: OutputRouterSpec_PASS_THROUGH,
155155
Streams: []StreamEndpointSpec{
156156
{Type: StreamEndpointSpec_LOCAL, StreamID: 2},
157157
},
@@ -169,7 +169,7 @@ func TestClusterFlow(t *testing.T) {
169169
}},
170170
Core: ProcessorCoreUnion{JoinReader: &jr},
171171
Output: []OutputRouterSpec{{
172-
Type: OutputRouterSpec_MIRROR,
172+
Type: OutputRouterSpec_PASS_THROUGH,
173173
Streams: []StreamEndpointSpec{{Type: StreamEndpointSpec_SYNC_RESPONSE}},
174174
}},
175175
},

pkg/sql/distsql/data.pb.go

Lines changed: 73 additions & 69 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/distsql/data.proto

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,17 @@ message InputSyncSpec {
104104
// it decides how to send results to multiple output streams.
105105
message OutputRouterSpec {
106106
enum Type {
107-
// Each row is sent to all output streams (also used if there is a
108-
// single output stream).
109-
MIRROR = 0;
107+
// Single output stream.
108+
PASS_THROUGH = 0;
109+
// Each row is sent to all output streams.
110+
MIRROR = 1;
110111
// Each row is sent to one stream, chosen by hashing certain columns of
111112
// the row (specified by the hash_columns field).
112-
BY_HASH = 1;
113+
BY_HASH = 2;
113114
// Each row is sent to one stream, chosen according to preset boundaries
114115
// for the values of certain columns of the row. TODO(radu): an extra
115116
// optional structure below for the range details.
116-
BY_RANGE = 2;
117+
BY_RANGE = 3;
117118
}
118119
optional Type type = 1 [(gogoproto.nullable) = false];
119120
repeated StreamEndpointSpec streams = 2 [(gogoproto.nullable) = false];

pkg/sql/distsql/flow_diagram.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ func (is *InputSyncSpec) summary() (string, []string) {
121121

122122
func (r *OutputRouterSpec) summary() (string, []string) {
123123
switch r.Type {
124+
case OutputRouterSpec_PASS_THROUGH:
125+
return "", []string{}
124126
case OutputRouterSpec_MIRROR:
125127
return "mirror", []string{}
126128
case OutputRouterSpec_BY_HASH:

pkg/sql/distsql/flow_diagram_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestPlanDiagramIndexJoin(t *testing.T) {
6767
Processors: []ProcessorSpec{{
6868
Core: ProcessorCoreUnion{TableReader: &tr},
6969
Output: []OutputRouterSpec{{
70-
Type: OutputRouterSpec_MIRROR,
70+
Type: OutputRouterSpec_PASS_THROUGH,
7171
Streams: []StreamEndpointSpec{
7272
{StreamID: 0},
7373
},
@@ -79,7 +79,7 @@ func TestPlanDiagramIndexJoin(t *testing.T) {
7979
Processors: []ProcessorSpec{{
8080
Core: ProcessorCoreUnion{TableReader: &tr},
8181
Output: []OutputRouterSpec{{
82-
Type: OutputRouterSpec_MIRROR,
82+
Type: OutputRouterSpec_PASS_THROUGH,
8383
Streams: []StreamEndpointSpec{
8484
{StreamID: 1},
8585
},
@@ -92,7 +92,7 @@ func TestPlanDiagramIndexJoin(t *testing.T) {
9292
{
9393
Core: ProcessorCoreUnion{TableReader: &tr},
9494
Output: []OutputRouterSpec{{
95-
Type: OutputRouterSpec_MIRROR,
95+
Type: OutputRouterSpec_PASS_THROUGH,
9696
Streams: []StreamEndpointSpec{
9797
{StreamID: 2},
9898
},
@@ -110,7 +110,7 @@ func TestPlanDiagramIndexJoin(t *testing.T) {
110110
}},
111111
Core: ProcessorCoreUnion{JoinReader: &jr},
112112
Output: []OutputRouterSpec{{
113-
Type: OutputRouterSpec_MIRROR,
113+
Type: OutputRouterSpec_PASS_THROUGH,
114114
Streams: []StreamEndpointSpec{{Type: StreamEndpointSpec_SYNC_RESPONSE}},
115115
}}},
116116
},
@@ -216,7 +216,7 @@ func TestPlanDiagramJoin(t *testing.T) {
216216
},
217217
Core: ProcessorCoreUnion{HashJoiner: &hj},
218218
Output: []OutputRouterSpec{{
219-
Type: OutputRouterSpec_MIRROR,
219+
Type: OutputRouterSpec_PASS_THROUGH,
220220
Streams: []StreamEndpointSpec{
221221
{StreamID: 101},
222222
},
@@ -232,7 +232,7 @@ func TestPlanDiagramJoin(t *testing.T) {
232232
}},
233233
Core: ProcessorCoreUnion{Noop: &NoopCoreSpec{}},
234234
Output: []OutputRouterSpec{{
235-
Type: OutputRouterSpec_MIRROR,
235+
Type: OutputRouterSpec_PASS_THROUGH,
236236
Streams: []StreamEndpointSpec{{Type: StreamEndpointSpec_SYNC_RESPONSE}},
237237
}}},
238238
},
@@ -282,7 +282,7 @@ func TestPlanDiagramJoin(t *testing.T) {
282282
},
283283
Core: ProcessorCoreUnion{HashJoiner: &hj},
284284
Output: []OutputRouterSpec{{
285-
Type: OutputRouterSpec_MIRROR,
285+
Type: OutputRouterSpec_PASS_THROUGH,
286286
Streams: []StreamEndpointSpec{
287287
{StreamID: 101},
288288
},

pkg/sql/distsql/routers.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,24 @@ import (
2828
)
2929

3030
func makeRouter(spec *OutputRouterSpec, streams []RowReceiver) (RowReceiver, error) {
31-
switch len(streams) {
32-
case 0:
31+
if len(streams) == 0 {
3332
return nil, errors.Errorf("no streams in router")
34-
case 1:
35-
// Special passthrough case - no router.
36-
return streams[0], nil
3733
}
3834

3935
switch spec.Type {
36+
case OutputRouterSpec_PASS_THROUGH:
37+
if len(streams) != 1 {
38+
return nil, errors.Errorf("expected one stream for passthrough router")
39+
}
40+
// No router.
41+
return streams[0], nil
42+
4043
case OutputRouterSpec_BY_HASH:
4144
return makeHashRouter(spec.HashColumns, streams)
45+
4246
case OutputRouterSpec_MIRROR:
4347
return makeMirrorRouter(streams)
48+
4449
default:
4550
return nil, errors.Errorf("router type %s not supported", spec.Type)
4651
}
@@ -69,12 +74,18 @@ var _ RowReceiver = &mirrorRouter{}
6974
var crc32Table = crc32.MakeTable(crc32.Castagnoli)
7075

7176
func makeMirrorRouter(streams []RowReceiver) (*mirrorRouter, error) {
77+
if len(streams) < 2 {
78+
return nil, errors.Errorf("need at least two streams for mirror router")
79+
}
7280
return &mirrorRouter{
7381
routerBase: routerBase{streams: streams},
7482
}, nil
7583
}
7684

7785
func makeHashRouter(hashCols []uint32, streams []RowReceiver) (*hashRouter, error) {
86+
if len(streams) < 2 {
87+
return nil, errors.Errorf("need at least two streams for hash router")
88+
}
7889
if len(hashCols) == 0 {
7990
return nil, errors.Errorf("no hash columns for BY_HASH router")
8091
}

pkg/sql/distsql/routers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func TestMirrorRouter(t *testing.T) {
119119

120120
vals := sqlbase.RandEncDatumSlices(rng, numCols, numRows)
121121

122-
for numBuckets := 1; numBuckets <= 3; numBuckets++ {
122+
for numBuckets := 2; numBuckets <= 4; numBuckets++ {
123123
bufs := make([]*RowBuffer, numBuckets)
124124
recvs := make([]RowReceiver, numBuckets)
125125
for i := 0; i < numBuckets; i++ {

pkg/sql/distsql/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestServer(t *testing.T) {
6464
Processors: []ProcessorSpec{{
6565
Core: ProcessorCoreUnion{TableReader: &ts},
6666
Output: []OutputRouterSpec{{
67-
Type: OutputRouterSpec_MIRROR,
67+
Type: OutputRouterSpec_PASS_THROUGH,
6868
Streams: []StreamEndpointSpec{{Type: StreamEndpointSpec_SYNC_RESPONSE}},
6969
}},
7070
}},

0 commit comments

Comments
 (0)