Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc bug fixes. Descriptions in commits. #225

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a341444
Added small debug log change to test code fork
gamolina Sep 7, 2018
5ea65f0
Fix sort logic, need to check inverse condition explicitly.
gamolina Sep 10, 2018
2f00868
Append to value list here results in duplicate insert rows.
gamolina Sep 22, 2018
04c19a7
update statements with functions in the updateList will fail due to p…
gamolina Sep 22, 2018
977adc4
Expressions in valueList are evaluated but results not saved in Value…
gamolina Sep 22, 2018
f405b75
Added Sink interface to allow for pluggable SELECT ... INTO ... FROM ...
gamolina Oct 7, 2018
f61aa5f
Implement factory pattern for sinks.
gamolina Oct 7, 2018
e90d74e
Should panic if sink not registered.
gamolina Oct 7, 2018
7eb5694
Missed a few checkins.
gamolina Oct 7, 2018
19123fa
Pass original destination
gamolina Oct 7, 2018
955e038
Opened up support for SELECT *
gamolina Oct 7, 2018
aed82b5
In aggregate projections use col.As as col.Name can be empty.
gamolina Oct 9, 2018
ee5f347
Added factory for join implementaton and added between to join sql re…
gamolina Sep 10, 2019
5119193
Fix task processing issue with multi-table joins
gamolina Sep 12, 2019
bbf2166
Added pluggable GroupBy and Where tasks.
gamolina Sep 14, 2019
7d4106e
Added ability to override projection exec task implementions
gamolina Sep 16, 2019
e3c940a
Use original SQL projection list for driver results output.
gamolina Oct 20, 2020
5f00b0b
Use original sql projection for INTO.
gamolina Oct 23, 2020
189bac4
Revert changes to use projection from orig Sql. Gonna rewrite the fi…
gamolina Oct 29, 2020
7fbbcb4
Revert changes to use projection from orig Sql. Gonna rewrite the fi…
gamolina Oct 29, 2020
d96dc54
Pass affected row count to exec results.
gamolina Nov 22, 2020
e3ad6c8
Allow for aliasing columns.
gamolina Jan 16, 2021
b0a2b89
Removed dependency on siphash library.
gamolina Feb 24, 2021
b0c0b77
Added Ctx.Session to sql/driver connection state.
guymolinari Apr 27, 2021
cbe1dac
Merge branch 'into-feature' of https://github.com/guymolinari/qlbridg…
guymolinari Apr 27, 2021
0aff1ef
Changle log level for expression parse scenario.
gamolina Dec 12, 2021
2d30eaa
Merge branch 'into-feature' of https://github.com/guymolinari/qlbridg…
gamolina Dec 12, 2021
dcc5394
Merge pull request #1 from guymolinari/into-feature
guymolinari May 2, 2022
73bb043
Add subquery support.
gamolina May 10, 2022
e6863f1
If LIKE expression doesn't start with '%' then default it.
gamolina May 25, 2022
c781725
Merge pull request #2 from guymolinari/subquery-feature
guymolinari May 25, 2022
18d19f0
Implemented prepared statements, optimized inserts.
gamolina Sep 6, 2022
ba4aea9
Merge pull request #3 from guymolinari/prepared-stmts
guymolinari Sep 6, 2022
39888b9
queryArgsConvert returning empty query
gamolina Sep 7, 2022
8e22581
Dont modify the in-place state of a prepared query statement
gamolina Sep 8, 2022
7d69902
Improve error processing in task processing. Add cleanup processing …
gamolina Nov 20, 2022
8f60c61
Error handling improvements.
gamolina Nov 27, 2022
c39f38b
Fix error handling for exec jobs.
gamolina Nov 28, 2022
bd77fa9
Add Delete function to ContextSimpleNative.
gamolina Jan 15, 2023
376aef7
Fix NPE for nil colunns in insert list.
gamolina Feb 16, 2023
956f31a
Ability to parse @rownum as aliased identity.
gamolina May 3, 2023
a5ddf62
Added a crude implementation of functions in rewritten where clauses
gamolina May 9, 2023
7e2ea95
SQL insert should return newly added @rownum
gamolina May 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datasource/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (m *ContextSimple) Commit(rowInfo []expr.SchemaInfo, row expr.RowWriter) er
return nil
}
func (m *ContextSimple) Delete(row map[string]value.Value) error {
for k, _ := range row {
delete(m.Data, k)
}
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions datasource/files/filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"time"

u "github.com/araddon/gou"
"github.com/dchest/siphash"

hash "github.com/aviddiviner/go-murmur"
"github.com/lytics/cloudstorage"
"golang.org/x/net/context"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -46,7 +47,7 @@ type FileReaderIterator interface {
type Partitioner func(uint64, *FileInfo) int

func SipPartitioner(partitionCt uint64, fi *FileInfo) int {
hashU64 := siphash.Hash(0, 1, []byte(fi.Name))
hashU64 := hash.MurmurHash64A([]byte(fi.Name), 1)
return int(hashU64 % partitionCt)
}

Expand Down
6 changes: 3 additions & 3 deletions datasource/membtree/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"

u "github.com/araddon/gou"
"github.com/dchest/siphash"
hash "github.com/aviddiviner/go-murmur"
"github.com/google/btree"
"golang.org/x/net/context"

Expand Down Expand Up @@ -78,14 +78,14 @@ func makeId(dv driver.Value) uint64 {
case int64:
return uint64(vt)
case []byte:
return siphash.Hash(0, 1, vt)
return hash.MurmurHash64A(vt, 1)
// iv, err := strconv.ParseUint(string(vt), 10, 64)
// if err != nil {
// u.Warnf("could not create id: %v for %v", err, dv)
// }
// return iv
case string:
return siphash.Hash(0, 1, []byte(vt))
return hash.MurmurHash64A([]byte(vt), 1)
// iv, err := strconv.ParseUint(vt, 10, 64)
// if err != nil {
// u.Warnf("could not create id: %v for %v", err, dv)
Expand Down
6 changes: 3 additions & 3 deletions datasource/memdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"

u "github.com/araddon/gou"
"github.com/dchest/siphash"
hash "github.com/aviddiviner/go-murmur"
"github.com/hashicorp/go-memdb"

"github.com/araddon/qlbridge/datasource"
Expand All @@ -25,9 +25,9 @@ func makeId(dv driver.Value) uint64 {
case int64:
return uint64(vt)
case []byte:
return siphash.Hash(456729, 1111581582, vt)
return hash.MurmurHash64A(vt, 1111581582)
case string:
return siphash.Hash(456729, 1111581582, []byte(vt))
return hash.MurmurHash64A([]byte(vt), 1111581582)
//by := append(make([]byte,0,8), byte(r), byte(r>>8), byte(r>>16), byte(r>>24), byte(r>>32), byte(r>>40), byte(r>>48), byte(r>>56))
case datasource.KeyCol:
return makeId(vt.Val)
Expand Down
6 changes: 3 additions & 3 deletions datasource/sqlite/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"

u "github.com/araddon/gou"
"github.com/dchest/siphash"
"github.com/aviddiviner/go-murmur"
"github.com/google/btree"
"golang.org/x/net/context"
// Import driver for sqlite
Expand Down Expand Up @@ -323,9 +323,9 @@ func MakeId(dv driver.Value) uint64 {
case int64:
return uint64(vt)
case []byte:
return siphash.Hash(456729, 1111581582, vt)
return go-murmur.MurmurHash64A(vt, 1111581582)
case string:
return siphash.Hash(456729, 1111581582, []byte(vt))
return go-murmur.MurmurHash64A([]byte(vt), 1111581582)
//by := append(make([]byte,0,8), byte(r), byte(r>>8), byte(r>>16), byte(r>>24), byte(r>>32), byte(r>>40), byte(r>>48), byte(r>>56))
case datasource.KeyCol:
return MakeId(vt.Val)
Expand Down
24 changes: 23 additions & 1 deletion exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ package exec

import (
"fmt"

"database/sql/driver"
"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/schema"
)

const (
JOINMERGE_MAKER = "UseJoinMerge"
WHERE_MAKER = "UseWhere"
GROUPBY_MAKER = "UseGroupBy"
PROJECTION_MAKER = "UseProjection"
)

var (
// ErrShuttingDown already shutting down error
ErrShuttingDown = fmt.Errorf("Received Shutdown Signal")
Expand Down Expand Up @@ -105,6 +112,7 @@ type (
WalkHaving(p *plan.Having) (Task, error)
WalkGroupBy(p *plan.GroupBy) (Task, error)
WalkOrder(p *plan.Order) (Task, error)
WalkInto(p *plan.Into) (Task, error)
WalkProjection(p *plan.Projection) (Task, error)
// Other Statements
WalkCommand(p *plan.Command) (Task, error)
Expand All @@ -122,4 +130,18 @@ type (
// WalkExecSource given our plan, turn that into a Task.
WalkExecSource(p *plan.Source) (Task, error)
}

// SinkMaker Sink Factory
SinkMaker func(ctx *plan.Context, dest string, params map[string]interface{}) (Sink, error)

// Sinks are execution tasks used to direct query result set output to a destination.
Sink interface {
Open(ctx *plan.Context, destination string, params map[string]interface{}) error
Next(dest []driver.Value, colIndex map[string]int) error
Cleanup() error
Close() error
}

// JoinMergeMaker Factory
JoinMergeMaker func(ctx *plan.Context, l, r TaskRunner, p *plan.JoinMerge) TaskRunner
)
102 changes: 86 additions & 16 deletions exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,21 @@ func BuildSqlJobPlanned(planner plan.Planner, executor Executor, ctx *plan.Conte
if ctx.Raw == "" {
return nil, fmt.Errorf("no sql provided")
}
stmt, err := rel.ParseSql(ctx.Raw)
if err != nil {
u.Debugf("could not parse sql : %v", err)
return nil, err
}
if stmt == nil {
return nil, fmt.Errorf("Not statement for parse? %v", ctx.Raw)
var err error
var pln plan.Task
var stmt rel.SqlStatement
if ctx.Stmt == nil { // Prepared statement
stmt, err = rel.ParseSql(ctx.Raw)
if err != nil {
u.Debugf("could not parse sql : %v", err)
return nil, err
}
if stmt == nil {
return nil, fmt.Errorf("Not statement for parse? %v", ctx.Raw)
}
ctx.Stmt = stmt
}
ctx.Stmt = stmt

pln, err := plan.WalkStmt(ctx, stmt, planner)
pln, err = plan.WalkStmt(ctx, ctx.Stmt, planner)

if err != nil {
return nil, err
Expand Down Expand Up @@ -197,6 +201,7 @@ func (m *JobExecutor) WalkSource(p *plan.Source) (Task, error) {
}
return NewSource(m.Ctx, p)
}

func (m *JobExecutor) WalkSourceExec(p *plan.Source) (Task, error) {

if p.Conn == nil {
Expand All @@ -219,22 +224,71 @@ func (m *JobExecutor) WalkSourceExec(p *plan.Source) (Task, error) {
return nil, fmt.Errorf("%T Must Implement Scanner for %q", p.Conn, p.Stmt.String())
}
func (m *JobExecutor) WalkWhere(p *plan.Where) (Task, error) {
return NewWhere(m.Ctx, p), nil

var tr TaskRunner
tr = NewWhere(m.Ctx, p)
if m.Ctx.Session != nil {
if v, ok := m.Ctx.Session.Get(WHERE_MAKER); ok {
//if factory, ok2 := v.Value().(JoinMergeMaker); !ok2 {
if factory, ok2 := v.Value().(func(ctx *plan.Context, p *plan.Where) TaskRunner); !ok2 {
return nil, fmt.Errorf("Cannot cast [%T] to WhereMaker factory.", v.Value)
} else {
tr = factory(m.Ctx, p)
}
}
}
return tr, nil
}
func (m *JobExecutor) WalkHaving(p *plan.Having) (Task, error) {
return NewHaving(m.Ctx, p), nil
}
func (m *JobExecutor) WalkGroupBy(p *plan.GroupBy) (Task, error) {
return NewGroupBy(m.Ctx, p), nil

var tr TaskRunner
tr = NewGroupBy(m.Ctx, p)
if m.Ctx.Session != nil {
if v, ok := m.Ctx.Session.Get(GROUPBY_MAKER); ok {
//if factory, ok2 := v.Value().(JoinMergeMaker); !ok2 {
if factory, ok2 := v.Value().(func(ctx *plan.Context, p *plan.GroupBy) TaskRunner); !ok2 {
return nil, fmt.Errorf("Cannot cast [%T] to GroupByMaker factory.", v.Value)
} else {
tr = factory(m.Ctx, p)
}
}
}
return tr, nil
}
func (m *JobExecutor) WalkOrder(p *plan.Order) (Task, error) {
return NewOrder(m.Ctx, p), nil
}
func (m *JobExecutor) WalkInto(p *plan.Into) (Task, error) {
return NewInto(m.Ctx, p), nil
}
func (m *JobExecutor) WalkProjection(p *plan.Projection) (Task, error) {
return NewProjection(m.Ctx, p), nil
var tr TaskRunner
tr = NewProjection(m.Ctx, p)
if m.Ctx.Session != nil {
if v, ok := m.Ctx.Session.Get(PROJECTION_MAKER); ok {
//if factory, ok2 := v.Value().(JoinMergeMaker); !ok2 {
if factory, ok2 := v.Value().(func(ctx *plan.Context, p *plan.Projection) TaskRunner); !ok2 {
return nil, fmt.Errorf("Cannot cast [%T] to ProjectionMaker factory.", v.Value)
} else {
tr = factory(m.Ctx, p)
}
}
}
return tr, nil
}
func (m *JobExecutor) WalkJoin(p *plan.JoinMerge) (Task, error) {
execTask := NewTaskParallel(m.Ctx)

// If the left task is already parallelized then must be a multi table join.
// No need to parallelize subsequent join tasks.
var execTask TaskRunner
if p.Left.IsParallel() {
execTask = NewTaskSequential(m.Ctx)
} else {
execTask = NewTaskParallel(m.Ctx)
}
//u.Debugf("join.Left: %#v \nright:%#v", p.Left, p.Right)
l, err := m.WalkPlanAll(p.Left)
if err != nil {
Expand All @@ -255,7 +309,21 @@ func (m *JobExecutor) WalkJoin(p *plan.JoinMerge) (Task, error) {
return nil, err
}

jm := NewJoinNaiveMerge(m.Ctx, l.(TaskRunner), r.(TaskRunner), p)

var jm TaskRunner
jm = NewJoinNaiveMerge(m.Ctx, l.(TaskRunner), r.(TaskRunner), p)
if m.Ctx.Session != nil {
if v, ok := m.Ctx.Session.Get(JOINMERGE_MAKER); ok {
//if factory, ok2 := v.Value().(JoinMergeMaker); !ok2 {
if factory, ok2 := v.Value().(func(ctx *plan.Context, l, r TaskRunner,
p *plan.JoinMerge) TaskRunner); !ok2 {
return nil, fmt.Errorf("Cannot cast [%T] to JoinMergeMaker factory.", v.Value)
} else {
jm = factory(m.Ctx, l.(TaskRunner), r.(TaskRunner), p)
}
}
}

err = execTask.Add(jm)
if err != nil {
return nil, err
Expand All @@ -273,7 +341,7 @@ func (m *JobExecutor) WalkPlanAll(p plan.Task) (Task, error) {
}
if len(p.Children()) > 0 {
dagRoot := m.NewTask(p)
//u.Debugf("sequential?%v parallel?%v", p.IsSequential(), p.IsParallel())
//u.Debugf("%p sequential?%v parallel?%v", p, p.IsSequential(), p.IsParallel())
err = dagRoot.Add(root)
if err != nil {
u.Errorf("Could not add root: %v", err)
Expand All @@ -296,6 +364,8 @@ func (m *JobExecutor) WalkPlanTask(p plan.Task) (Task, error) {
return m.Executor.WalkGroupBy(p)
case *plan.Order:
return m.Executor.WalkOrder(p)
case *plan.Into:
return m.Executor.WalkInto(p)
case *plan.Projection:
return m.Executor.WalkProjection(p)
case *plan.JoinMerge:
Expand Down
Loading