Skip to content

Commit

Permalink
Kill driver on parse ctx.Done
Browse files Browse the repository at this point in the history
Signed-off-by: kuba-- <kuba@sourced.tech>
  • Loading branch information
kuba-- authored and dennwc committed Oct 1, 2019
1 parent 274fcb1 commit d82e5f7
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 16 deletions.
1 change: 1 addition & 0 deletions daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
)

// Public API metrics
// TODO(dennwc): add parse timeout metric
var (
parseCalls = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "bblfshd_parse_total",
Expand Down
28 changes: 16 additions & 12 deletions daemon/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,17 @@ func (dp *DriverPool) setIdle(d Driver) {
}

// killDriver stops are removes the driver from the queue.
func (dp *DriverPool) killDriver(d Driver) {
func (dp *DriverPool) killDriver(d Driver, info string, err error) {
if dp.metrics.spawn.kill != nil {
dp.metrics.spawn.kill.Add(1)
}

if err != nil {
dp.Logger.Errorf(err, "killDriver(%s): %s", d.ID(), info)
} else {
dp.Logger.Infof("killDriver(%s): %s", d.ID(), info)
}

dp.drivers.Lock()
delete(dp.drivers.all, d)
delete(dp.drivers.idle, d)
Expand Down Expand Up @@ -461,13 +467,13 @@ func (dp *DriverPool) scale() {
// prefer to kill driver that are returned by clients instead an idle ones
// idle map may be accessed without management goroutine, thus it's more
// valuable to keep it full
dp.killDriver(d)
dp.killDriver(d, "scale down - kill driver returned by client", nil)
continue
default:
}
// only idle drivers remain - start killing those
if d, ok := dp.peekIdle(); ok {
dp.killDriver(d)
dp.killDriver(d, "scale down - kill idle driver", nil)
continue
}
// no drivers are idle, only way to downscale is to wait for clients to put
Expand All @@ -479,7 +485,7 @@ func (dp *DriverPool) scale() {
dp.rescaleLater(req)
return
case d := <-dp.put:
dp.killDriver(d)
dp.killDriver(d, "scale down - no drivers are idle", nil)
}
}
return
Expand Down Expand Up @@ -561,7 +567,7 @@ func (dp *DriverPool) scaleDown(req driverRequest, exact bool) bool {
// scaling policy asks us to drain and then asks to scale
// back up - we could have returned this driver to the
// client instead
dp.killDriver(d)
dp.killDriver(d, "scaleDown", nil)
case <-dp.rescale:
// worth to re-evaluate scaling conditions
}
Expand Down Expand Up @@ -628,11 +634,11 @@ func (dp *DriverPool) drain() {
if !ok {
break
}
dp.killDriver(d)
dp.killDriver(d, "drain-peekIdle", nil)
}
for dp.running.Value() > 0 {
d := <-dp.put
dp.killDriver(d)
dp.killDriver(d, "drain-put", nil)
}
}

Expand Down Expand Up @@ -718,7 +724,7 @@ func (dp *DriverPool) putDriver(d Driver) error {
}
select {
case <-dp.poolCtx.Done():
dp.killDriver(d)
dp.killDriver(d, "putDriver", dp.poolCtx.Err())
return ErrPoolClosed.New()
case dp.put <- d:
}
Expand All @@ -730,12 +736,10 @@ func (dp *DriverPool) putDriver(d Driver) error {
func (dp *DriverPool) checkStatus(d Driver) error {
status, err := d.Status()
if err != nil {
dp.Logger.Errorf(err, "error getting driver status, removing")
dp.killDriver(d)
dp.killDriver(d, "error getting driver status, removing", err)
return err
} else if status != protocol.Running {
dp.Logger.Debugf("removing stopped driver")
dp.killDriver(d)
dp.killDriver(d, "removing stopped driver", nil)
return errDriverStopped.New()
}
return nil
Expand Down
70 changes: 68 additions & 2 deletions daemon/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
var (
_ protocol2.DriverServer = (*ServiceV2)(nil)
_ protocol2.DriverHostServer = (*ServiceV2)(nil)

parseKillDelay = time.Second
)

type ServiceV2 struct {
Expand Down Expand Up @@ -72,7 +74,7 @@ func (s *ServiceV2) Parse(rctx xcontext.Context, req *protocol2.ParseRequest) (r
req.Language = language

err = dp.ExecuteCtx(ctx, func(ctx context.Context, driver Driver) error {
resp, err = driver.ServiceV2().Parse(ctx, req)
resp, err = parseV2(ctx, dp, driver, req)
return err
})
if err != nil {
Expand All @@ -84,6 +86,38 @@ func (s *ServiceV2) Parse(rctx xcontext.Context, req *protocol2.ParseRequest) (r
return resp, err
}

func parseV2(ctx context.Context, pool *DriverPool, drv Driver, req *protocol2.ParseRequest) (*protocol2.ParseResponse, error) {
var (
resp *protocol2.ParseResponse
err error
)
done := make(chan struct{})
go func() {
resp, err = drv.ServiceV2().Parse(ctx, req)
close(done)
}()

var (
ctxKill context.Context
cancel context.CancelFunc
)
if deadline, ok := ctx.Deadline(); ok {
ctxKill, cancel = context.WithDeadline(context.Background(), deadline.Add(parseKillDelay))
defer cancel()
} else {
ctxKill = ctx
}

select {
case <-done:
return resp, err

case <-ctxKill.Done():
pool.killDriver(drv, "parseV2", ctxKill.Err())
return nil, ctxKill.Err()
}
}

// ServerVersion implements protocol2.DriverHostServer.
func (s *ServiceV2) ServerVersion(rctx xcontext.Context, _ *protocol2.VersionRequest) (*protocol2.VersionResponse, error) {
versionCallsV2.Add(1)
Expand Down Expand Up @@ -228,7 +262,7 @@ func (d *Service) Parse(req *protocol1.ParseRequest) *protocol1.ParseResponse {
req.Language = language

err = dp.Execute(func(ctx context.Context, driver Driver) error {
resp, err = driver.Service().Parse(ctx, req)
resp, err = parseV1(ctx, dp, driver, req)
return err
}, req.Timeout)

Expand All @@ -241,6 +275,38 @@ func (d *Service) Parse(req *protocol1.ParseRequest) *protocol1.ParseResponse {
return resp
}

func parseV1(ctx context.Context, pool *DriverPool, drv Driver, req *protocol1.ParseRequest) (*protocol1.ParseResponse, error) {
var (
resp *protocol1.ParseResponse
err error
)
done := make(chan struct{})
go func() {
resp, err = drv.Service().Parse(ctx, req)
close(done)
}()

var (
ctxKill context.Context
cancel context.CancelFunc
)
if deadline, ok := ctx.Deadline(); ok {
ctxKill, cancel = context.WithDeadline(context.Background(), deadline.Add(parseKillDelay))
defer cancel()
} else {
ctxKill = ctx
}

select {
case <-done:
return resp, err

case <-ctxKill.Done():
pool.killDriver(drv, "parseV1", ctxKill.Err())
return nil, ctxKill.Err()
}
}

func (d *Service) logResponse(s protocol1.Status, filename string, language string, size int, elapsed time.Duration) {
fields := log.Fields{"elapsed": elapsed}
if filename != "" {
Expand Down
33 changes: 31 additions & 2 deletions daemon/service_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package daemon

import (
"context"
"os"
"testing"
"time"
Expand All @@ -18,11 +19,39 @@ func TestServiceParse(t *testing.T) {
s := NewService(d)
resp := s.Parse(&protocol.ParseRequest{Filename: "foo.py", Content: "foo"})
require.Len(resp.Errors, 0)
require.Equal(resp.UAST.Token, "foo")
require.Equal(resp.Language, "python")
require.Equal("foo", resp.UAST.Token)
require.Equal("python", resp.Language)
require.True(resp.Elapsed.Nanoseconds() > 0)
}

// TODO(dennwc): Add test cases for V2
func TestServiceParseV1(t *testing.T) {
require := require.New(t)

d, tmp := buildMockedDaemon(t)
defer os.RemoveAll(tmp)

s := NewService(d)
req := &protocol.ParseRequest{Filename: "foo.py", Content: "foo"}
lang, dp, err := s.selectPool(context.TODO(), req.Language, req.Content, req.Filename)
require.NoError(err)
require.Equal("python", lang)

resp := &protocol.ParseResponse{}
err = dp.Execute(func(ctx context.Context, driver Driver) error {
ctx, cancel := context.WithCancel(ctx)
cancel() // simulate context.Done

// because we have a parseKillDelay ultimately we'll get response without errors
resp, err = parseV1(ctx, dp, driver, req)
return err
}, req.Timeout)

require.NoError(err)
require.Len(resp.Errors, 0)
require.Equal("foo", resp.UAST.Token)
}

func TestServiceNativeParse(t *testing.T) {
require := require.New(t)

Expand Down

0 comments on commit d82e5f7

Please sign in to comment.