Skip to content

Commit

Permalink
make SplitQuery support VARBINARY column.
Browse files Browse the repository at this point in the history
1. SplitQuery will split a VARBINARY column as a hex string. That is, it assumes
   the key range is [0x0000000, 0xFFFFFFFF] and then divides the range into
   intervals based on split count.
2. Introduce splitBoundariesStringColumn to handle string column case.
3. Refactor existing SplitQuery in sqlquery.go: create two helper functions
   getColumnType and getColumnMinMax.
  • Loading branch information
yaoshengzhe committed Aug 17, 2015
1 parent 4ad5043 commit 09bff85
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 74 deletions.
12 changes: 12 additions & 0 deletions go/sqltypes/sqltypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ func (v Value) ParseUint64() (val uint64, err error) {
return strconv.ParseUint(string(n.raw()), 10, 64)
}

// ParseFloat64 will parse a Fractional value into an float64
func (v Value) ParseFloat64() (val float64, err error) {
if v.Inner == nil {
return 0, fmt.Errorf("value is null")
}
n, ok := v.Inner.(Fractional)
if !ok {
return 0, fmt.Errorf("value is not Fractional")
}
return strconv.ParseFloat(string(n.raw()), 64)
}

// EncodeSql encodes the value into an SQL statement. Can be binary.
func (v Value) EncodeSql(b BinWriter) {
if v.Inner == nil {
Expand Down
115 changes: 82 additions & 33 deletions go/vt/tabletserver/query_splitter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tabletserver

import (
"encoding/binary"
"fmt"
"strconv"

Expand All @@ -26,6 +27,11 @@ type QuerySplitter struct {
rowCount int64
}

const (
startBindVarName = ":_splitquery_start"
endBindVarName = ":_splitquery_end"
)

// NewQuerySplitter creates a new QuerySplitter. query is the original query
// to split and splitCount is the desired number of splits. splitCount must
// be a positive int, if not it will be set to 1.
Expand Down Expand Up @@ -95,8 +101,8 @@ func (qs *QuerySplitter) validateQuery() error {

// split splits the query into multiple queries. validateQuery() must return
// nil error before split() is called.
func (qs *QuerySplitter) split(pkMinMax *mproto.QueryResult) ([]proto.QuerySplit, error) {
boundaries, err := qs.splitBoundaries(pkMinMax)
func (qs *QuerySplitter) split(columnType int64, pkMinMax *mproto.QueryResult) ([]proto.QuerySplit, error) {
boundaries, err := qs.splitBoundaries(columnType, pkMinMax)
if err != nil {
return nil, err
}
Expand All @@ -108,59 +114,76 @@ func (qs *QuerySplitter) split(pkMinMax *mproto.QueryResult) ([]proto.QuerySplit
}
splits = append(splits, *split)
} else {
boundaries = append(boundaries, sqltypes.Value{})
whereClause := qs.sel.Where
// Loop through the boundaries and generated modified where clauses
start := sqltypes.Value{}
clauses := []*sqlparser.Where{}
for _, end := range boundaries {
clauses = append(clauses, qs.getWhereClause(start, end))
start.Inner = end.Inner
}
clauses = append(clauses, qs.getWhereClause(start, sqltypes.Value{}))
// Generate one split per clause
for _, clause := range clauses {
sel := qs.sel
sel.Where = clause
bindVars := make(map[string]interface{}, len(qs.query.BindVariables))
for k, v := range qs.query.BindVariables {
bindVars[k] = v
}
qs.sel.Where = qs.getWhereClause(whereClause, bindVars, start, end)
q := &proto.BoundQuery{
Sql: sqlparser.String(sel),
BindVariables: qs.query.BindVariables,
Sql: sqlparser.String(qs.sel),
BindVariables: bindVars,
}
split := &proto.QuerySplit{
Query: *q,
RowCount: qs.rowCount,
}
splits = append(splits, *split)
start.Inner = end.Inner
}
qs.sel.Where = whereClause // reset where clause
}
return splits, err
}

// getWhereClause returns a whereClause based on desired upper and lower
// bounds for primary key.
func (qs *QuerySplitter) getWhereClause(start, end sqltypes.Value) *sqlparser.Where {
func (qs *QuerySplitter) getWhereClause(whereClause *sqlparser.Where, bindVars map[string]interface{}, start, end sqltypes.Value) *sqlparser.Where {
var startClause *sqlparser.ComparisonExpr
var endClause *sqlparser.ComparisonExpr
var clauses sqlparser.BoolExpr
// No upper or lower bound, just return the where clause of original query
if start.IsNull() && end.IsNull() {
return qs.sel.Where
return whereClause
}
pk := &sqlparser.ColName{
Name: sqlparser.SQLName(qs.splitColumn),
}
// splitColumn >= start
if !start.IsNull() {
startClause = &sqlparser.ComparisonExpr{
Operator: sqlparser.AST_GE,
Left: pk,
Right: sqlparser.NumVal((start).Raw()),
Right: sqlparser.ValArg([]byte(startBindVarName)),
}
if start.IsNumeric() {
v, _ := start.ParseInt64()
bindVars[startBindVarName] = v
} else if start.IsString() {
bindVars[startBindVarName] = start.Raw()
} else if start.IsFractional() {
v, _ := start.ParseFloat64()
bindVars[startBindVarName] = v
}
}
// splitColumn < end
if !end.IsNull() {
endClause = &sqlparser.ComparisonExpr{
Operator: sqlparser.AST_LT,
Left: pk,
Right: sqlparser.NumVal((end).Raw()),
Right: sqlparser.ValArg([]byte(endBindVarName)),
}
if end.IsNumeric() {
v, _ := end.ParseInt64()
bindVars[endBindVarName] = v
} else if end.IsString() {
bindVars[endBindVarName] = end.Raw()
} else if end.IsFractional() {
v, _ := end.ParseFloat64()
bindVars[endBindVarName] = v
}
}
if startClause == nil {
Expand All @@ -176,9 +199,9 @@ func (qs *QuerySplitter) getWhereClause(start, end sqltypes.Value) *sqlparser.Wh
}
}
}
if qs.sel.Where != nil {
if whereClause != nil {
clauses = &sqlparser.AndExpr{
Left: qs.sel.Where.Expr,
Left: whereClause.Expr,
Right: clauses,
}
}
Expand All @@ -188,24 +211,23 @@ func (qs *QuerySplitter) getWhereClause(start, end sqltypes.Value) *sqlparser.Wh
}
}

func (qs *QuerySplitter) splitBoundaries(pkMinMax *mproto.QueryResult) ([]sqltypes.Value, error) {
boundaries := []sqltypes.Value{}
var err error
// If no min or max values were found, return empty list of boundaries
if len(pkMinMax.Rows) != 1 || pkMinMax.Rows[0][0].IsNull() || pkMinMax.Rows[0][1].IsNull() {
return boundaries, err
}
switch pkMinMax.Fields[0].Type {
func (qs *QuerySplitter) splitBoundaries(columnType int64, pkMinMax *mproto.QueryResult) ([]sqltypes.Value, error) {
switch columnType {
case mproto.VT_TINY, mproto.VT_SHORT, mproto.VT_LONG, mproto.VT_LONGLONG, mproto.VT_INT24:
boundaries, err = qs.parseInt(pkMinMax)
return qs.splitBoundariesIntColumn(pkMinMax)
case mproto.VT_FLOAT, mproto.VT_DOUBLE:
boundaries, err = qs.parseFloat(pkMinMax)
return qs.splitBoundariesFloatColumn(pkMinMax)
case mproto.VT_VARCHAR, mproto.VT_BIT, mproto.VT_VAR_STRING, mproto.VT_STRING:
return qs.splitBoundariesStringColumn()
}
return boundaries, err
return []sqltypes.Value{}, nil
}

func (qs *QuerySplitter) parseInt(pkMinMax *mproto.QueryResult) ([]sqltypes.Value, error) {
func (qs *QuerySplitter) splitBoundariesIntColumn(pkMinMax *mproto.QueryResult) ([]sqltypes.Value, error) {
boundaries := []sqltypes.Value{}
if pkMinMax == nil || len(pkMinMax.Rows) != 1 || pkMinMax.Rows[0][0].IsNull() || pkMinMax.Rows[0][1].IsNull() {
return boundaries, nil
}
minNumeric := sqltypes.MakeNumeric(pkMinMax.Rows[0][0].Raw())
maxNumeric := sqltypes.MakeNumeric(pkMinMax.Rows[0][1].Raw())
if pkMinMax.Rows[0][0].Raw()[0] == '-' {
Expand Down Expand Up @@ -256,8 +278,11 @@ func (qs *QuerySplitter) parseInt(pkMinMax *mproto.QueryResult) ([]sqltypes.Valu
return boundaries, nil
}

func (qs *QuerySplitter) parseFloat(pkMinMax *mproto.QueryResult) ([]sqltypes.Value, error) {
func (qs *QuerySplitter) splitBoundariesFloatColumn(pkMinMax *mproto.QueryResult) ([]sqltypes.Value, error) {
boundaries := []sqltypes.Value{}
if pkMinMax == nil || len(pkMinMax.Rows) != 1 || pkMinMax.Rows[0][0].IsNull() || pkMinMax.Rows[0][1].IsNull() {
return boundaries, nil
}
min, err := strconv.ParseFloat(pkMinMax.Rows[0][0].String(), 64)
if err != nil {
return nil, err
Expand All @@ -281,3 +306,27 @@ func (qs *QuerySplitter) parseFloat(pkMinMax *mproto.QueryResult) ([]sqltypes.Va
}
return boundaries, nil
}

// TODO(shengzhe): support split based on min, max from the string column.
func (qs *QuerySplitter) splitBoundariesStringColumn() ([]sqltypes.Value, error) {
firstRow := int64(0x0)
lastRow := int64(0xFFFFFFFF)
splitRange := lastRow - firstRow + 1
splitSize := splitRange / int64(qs.splitCount)
qs.rowCount = splitSize
var boundaries []sqltypes.Value
for i := 1; i < qs.splitCount; i++ {
buf := make([]byte, 8)
// encode split point into binaries.
binary.BigEndian.PutUint64(buf, uint64(firstRow+splitSize*int64(i)))
// only converts the lower 4 bytes into hex because the upper 4 bytes are
// always 0x00000000 and mysql does byte comparison from the most significant
// bits.
val, err := sqltypes.BuildValue(buf[4:])
if err != nil {
return nil, err
}
boundaries = append(boundaries, val)
}
return boundaries, nil
}
Loading

0 comments on commit 09bff85

Please sign in to comment.