Skip to content

Commit f3a82f0

Browse files
authored
Merge pull request src-d#154 from kuba--/opentracing
Opentracing upgraded to 2.13.0 It uses configuration based on env. vars.
2 parents a25d250 + a8cbce6 commit f3a82f0

File tree

15 files changed

+118
-119
lines changed

15 files changed

+118
-119
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,5 @@ benchmark/*tbl
2828
vendor
2929
Makefile.main
3030
.ci/
31+
_example/main
32+
_example/*.exe

Gopkg.lock

Lines changed: 7 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
[[constraint]]
3030
name = "github.com/uber/jaeger-client-go"
31-
version = "2.12.0"
31+
version = "2.13.0"
3232

3333
[[constraint]]
3434
name = "gopkg.in/src-d/go-errors.v1"

_example/main.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ func main() {
3232
Password: "password1",
3333
}}
3434

35-
s, err := server.NewDefaultServer("tcp", "localhost:5123", auth, driver)
35+
s, err := server.NewDefaultServer(server.Config{
36+
Protocol: "tcp",
37+
Address: "localhost:5123",
38+
Auth: auth,
39+
}, driver)
40+
3641
if err != nil {
3742
panic(err)
3843
}
@@ -41,12 +46,12 @@ func main() {
4146
}
4247

4348
func createTestDatabase() *mem.Database {
44-
db := mem.NewDatabase("test")
49+
db := mem.NewDatabase("test").(*mem.Database)
4550
table := mem.NewTable("mytable", sql.Schema{
46-
{Name: "name", Type: sql.Text},
47-
{Name: "email", Type: sql.Text},
48-
{Name: "phone_numbers", Type: sql.JSON},
49-
{Name: "created_at", Type: sql.Timestamp},
51+
{Name: "name", Type: sql.Text, Source: "mytable"},
52+
{Name: "email", Type: sql.Text, Source: "mytable"},
53+
{Name: "phone_numbers", Type: sql.JSON, Source: "mytable"},
54+
{Name: "created_at", Type: sql.Timestamp, Source: "mytable"},
5055
})
5156
db.AddTable("mytable", table)
5257
table.Insert(sql.NewRow("John Doe", "john@doe.com", []string{"555-555-555"}, time.Now()))

engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sqle
22

33
import (
4+
opentracing "github.com/opentracing/opentracing-go"
45
"gopkg.in/src-d/go-mysql-server.v0/sql"
56
"gopkg.in/src-d/go-mysql-server.v0/sql/analyzer"
67
"gopkg.in/src-d/go-mysql-server.v0/sql/expression/function"
@@ -27,6 +28,9 @@ func (e *Engine) Query(
2728
ctx *sql.Context,
2829
query string,
2930
) (sql.Schema, sql.RowIter, error) {
31+
span, ctx := ctx.Span("query", opentracing.Tag{Key: "query", Value: query})
32+
defer span.Finish()
33+
3034
parsed, err := parse.Parse(ctx, query)
3135
if err != nil {
3236
return nil, nil, err

engine_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -416,12 +416,12 @@ const expectedTree = `Offset(2)
416416
func TestPrintTree(t *testing.T) {
417417
require := require.New(t)
418418
node, err := parse.Parse(sql.NewEmptyContext(), `
419-
SELECT t.foo, bar.baz
420-
FROM tbl t
421-
INNER JOIN bar
422-
ON foo = baz
423-
WHERE foo > qux
424-
LIMIT 5
419+
SELECT t.foo, bar.baz
420+
FROM tbl t
421+
INNER JOIN bar
422+
ON foo = baz
423+
WHERE foo > qux
424+
LIMIT 5
425425
OFFSET 2`)
426426
require.NoError(err)
427427
require.Equal(expectedTree, node.String())
@@ -439,10 +439,10 @@ func TestTracing(t *testing.T) {
439439

440440
ctx := sql.NewContext(context.TODO(), sql.WithTracer(tracer))
441441

442-
_, iter, err := e.Query(ctx, `SELECT DISTINCT i
443-
FROM mytable
444-
WHERE s = 'first row'
445-
ORDER BY i DESC
442+
_, iter, err := e.Query(ctx, `SELECT DISTINCT i
443+
FROM mytable
444+
WHERE s = 'first row'
445+
ORDER BY i DESC
446446
LIMIT 1`)
447447
require.NoError(err)
448448

@@ -463,7 +463,7 @@ func TestTracing(t *testing.T) {
463463
"plan.Sort",
464464
}
465465

466-
require.Len(spans, 76)
466+
require.Len(spans, 77)
467467

468468
var spanOperations []string
469469
for _, s := range spans {

server/config.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package server
2+
3+
import (
4+
"io"
5+
6+
opentracing "github.com/opentracing/opentracing-go"
7+
jaeger "github.com/uber/jaeger-client-go"
8+
jaegercfg "github.com/uber/jaeger-client-go/config"
9+
"github.com/uber/jaeger-lib/metrics"
10+
"gopkg.in/src-d/go-vitess.v0/mysql"
11+
)
12+
13+
const (
14+
jaegerDefaultServiceName = "go-mysql-server"
15+
)
16+
17+
// Config for the mysql server.
18+
type Config struct {
19+
// Protocol for the connection.
20+
Protocol string
21+
// Address of the server.
22+
Address string
23+
// Auth of the server.
24+
Auth mysql.AuthServer
25+
}
26+
27+
// Tracer creates a new tracer for the current configuration. It also returns
28+
// an io.Closer to close the tracer and an error, if any.
29+
func (c Config) Tracer() (opentracing.Tracer, io.Closer, error) {
30+
cfg, err := jaegercfg.FromEnv()
31+
if err != nil {
32+
return nil, nil, err
33+
}
34+
35+
if cfg.ServiceName == "" {
36+
cfg.ServiceName = jaegerDefaultServiceName
37+
}
38+
39+
return cfg.NewTracer(
40+
jaegercfg.Logger(jaeger.StdLogger),
41+
jaegercfg.Metrics(metrics.NullFactory),
42+
)
43+
}

server/server.go

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -4,69 +4,11 @@ import (
44
"io"
55

66
opentracing "github.com/opentracing/opentracing-go"
7-
jaeger "github.com/uber/jaeger-client-go"
87
"gopkg.in/src-d/go-mysql-server.v0"
98

109
"gopkg.in/src-d/go-vitess.v0/mysql"
1110
)
1211

13-
// Config for the mysql server.
14-
type Config struct {
15-
// Protocol for the connection.
16-
Protocol string
17-
// Address of the server.
18-
Address string
19-
// Auth of the server.
20-
Auth mysql.AuthServer
21-
// EnableTracing will enable the tracing, if it's true.
22-
EnableTracing bool
23-
// TracingAddr is the address where tracing will be sent.
24-
// If this is empty, and tracing is enabled, it will be reported
25-
// to the logs via stdout.
26-
TracingAddr string
27-
// TracingMaxPacketSize is the max packet size for sending traces
28-
// to the remote endpoint.
29-
TracingMaxPacketSize uint64
30-
// TracingSamplingRate is the rate of traces we want to sample.
31-
// Only takes effect is TracingAddr is not empty.
32-
TracingSamplingRate float64
33-
}
34-
35-
type nopCloser struct{}
36-
37-
func (nopCloser) Close() error { return nil }
38-
39-
// Tracer creates a new tracer for the current configuration. It also returns
40-
// an io.Closer to close the tracer and an error, if any.
41-
func (c Config) Tracer() (opentracing.Tracer, io.Closer, error) {
42-
if !c.EnableTracing {
43-
return opentracing.NoopTracer{}, nopCloser{}, nil
44-
}
45-
46-
var reporter jaeger.Reporter
47-
var sampler jaeger.Sampler
48-
if c.TracingAddr == "" {
49-
reporter = jaeger.NewLoggingReporter(jaeger.StdLogger)
50-
sampler = jaeger.NewConstSampler(true)
51-
} else {
52-
transport, err := jaeger.NewUDPTransport(
53-
c.TracingAddr,
54-
int(c.TracingMaxPacketSize),
55-
)
56-
if err != nil {
57-
return nil, nil, err
58-
}
59-
reporter = jaeger.NewRemoteReporter(transport)
60-
sampler, err = jaeger.NewProbabilisticSampler(c.TracingSamplingRate)
61-
if err != nil {
62-
return nil, nil, err
63-
}
64-
}
65-
66-
tracer, closer := jaeger.NewTracer("go-mysql-server", sampler, reporter)
67-
return tracer, closer, nil
68-
}
69-
7012
// Server is a MySQL server for SQLe engines.
7113
type Server struct {
7214
Listener *mysql.Listener
@@ -85,6 +27,7 @@ func NewServer(cfg Config, e *sqle.Engine, sb SessionBuilder) (*Server, error) {
8527
if err != nil {
8628
return nil, err
8729
}
30+
opentracing.SetGlobalTracer(tracer)
8831

8932
handler := NewHandler(e, NewSessionManager(sb, tracer))
9033
l, err := mysql.NewListener(cfg.Protocol, cfg.Address, cfg.Auth, handler)

sql/analyzer/analyzer.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,23 @@ func (a *Analyzer) Log(msg string, args ...interface{}) {
8585
// Analyze the node and all its children.
8686
func (a *Analyzer) Analyze(ctx *sql.Context, n sql.Node) (sql.Node, error) {
8787
span, ctx := ctx.Span("analyze")
88-
defer span.Finish()
88+
span.LogKV("plan", n.String())
8989

9090
prev := n
91-
9291
a.Log("starting analysis of node of type: %T", n)
9392
cur, err := a.analyzeOnce(ctx, n)
93+
defer func() {
94+
if cur != nil {
95+
span.SetTag("IsResolved", cur.Resolved())
96+
}
97+
span.Finish()
98+
}()
99+
94100
if err != nil {
95101
return nil, err
96102
}
97103

98-
i := 0
99-
for !reflect.DeepEqual(prev, cur) {
104+
for i := 0; !reflect.DeepEqual(prev, cur); {
100105
a.Log("previous node does not match new node, analyzing again, iteration: %d", i)
101106
prev = cur
102107
cur, err = a.analyzeOnce(ctx, cur)
@@ -111,18 +116,17 @@ func (a *Analyzer) Analyze(ctx *sql.Context, n sql.Node) (sql.Node, error) {
111116
}
112117

113118
if errs := a.validate(ctx, cur); len(errs) != 0 {
114-
var err error
115119
for _, e := range errs {
116120
err = multierror.Append(err, e)
117121
}
118-
return cur, err
119122
}
120123

121-
return cur, nil
124+
return cur, err
122125
}
123126

124127
func (a *Analyzer) analyzeOnce(ctx *sql.Context, n sql.Node) (sql.Node, error) {
125128
span, ctx := ctx.Span("analyze_once")
129+
span.LogKV("plan", n.String())
126130
defer span.Finish()
127131

128132
result := n

sql/expression/function/aggregation/avg.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ func (a *Avg) IsNullable() bool {
3939

4040
// Eval implements AggregationExpression interface. (AggregationExpression[Expression]])
4141
func (a *Avg) Eval(ctx *sql.Context, buffer sql.Row) (interface{}, error) {
42+
span, ctx := ctx.Span("aggregation.Avg_Eval")
43+
defer span.Finish()
44+
4245
isNoNum := buffer[2].(bool)
4346
if isNoNum {
4447
return float64(0), nil
@@ -50,6 +53,7 @@ func (a *Avg) Eval(ctx *sql.Context, buffer sql.Row) (interface{}, error) {
5053
}
5154

5255
avg := buffer[0]
56+
span.LogKV("avg", avg)
5357
return avg, nil
5458
}
5559

@@ -75,9 +79,6 @@ func (a *Avg) NewBuffer() sql.Row {
7579

7680
// Update implements AggregationExpression interface. (AggregationExpression)
7781
func (a *Avg) Update(ctx *sql.Context, buffer, row sql.Row) error {
78-
span, ctx := ctx.Span("aggregation.Avg_Update")
79-
defer span.Finish()
80-
8182
v, err := a.Child.Eval(ctx, row)
8283
if err != nil {
8384
return err
@@ -112,9 +113,6 @@ func (a *Avg) Update(ctx *sql.Context, buffer, row sql.Row) error {
112113

113114
// Merge implements AggregationExpression interface. (AggregationExpression)
114115
func (a *Avg) Merge(ctx *sql.Context, buffer, partial sql.Row) error {
115-
span, _ := ctx.Span("aggregation.Avg_Merge")
116-
defer span.Finish()
117-
118116
bufferAvg := buffer[0].(float64)
119117
bufferRows := buffer[1].(float64)
120118

sql/expression/function/aggregation/count.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ func (c *Count) TransformUp(f sql.TransformExprFunc) (sql.Expression, error) {
5656

5757
// Update implements the Aggregation interface.
5858
func (c *Count) Update(ctx *sql.Context, buffer, row sql.Row) error {
59-
span, ctx := ctx.Span("aggregation.Count_Update")
60-
defer span.Finish()
61-
6259
var inc bool
6360
if _, ok := c.Child.(*expression.Star); ok {
6461
inc = true
@@ -82,14 +79,16 @@ func (c *Count) Update(ctx *sql.Context, buffer, row sql.Row) error {
8279

8380
// Merge implements the Aggregation interface.
8481
func (c *Count) Merge(ctx *sql.Context, buffer, partial sql.Row) error {
85-
span, _ := ctx.Span("aggregation.Count_Merge")
86-
defer span.Finish()
87-
8882
buffer[0] = buffer[0].(int32) + partial[0].(int32)
8983
return nil
9084
}
9185

9286
// Eval implements the Aggregation interface.
9387
func (c *Count) Eval(ctx *sql.Context, buffer sql.Row) (interface{}, error) {
94-
return buffer[0], nil
88+
span, ctx := ctx.Span("aggregation.Count_Eval")
89+
count := buffer[0]
90+
span.LogKV("count", count)
91+
span.Finish()
92+
93+
return count, nil
9594
}

0 commit comments

Comments
 (0)