Skip to content

Commit

Permalink
[Golang API] Add support for resuming execution after transient failu…
Browse files Browse the repository at this point in the history
…res of the GRPC connection.

Summary:
The Query Broker is setup to accept ExecuteScript requests that are pointed at a particular query ID.
This diff adds support to the Golang API for resuming queries after the grpc connection fails with Unavailable or RST_STREAM.
The retry/resumption respects the original context the user passed into ExecuteScript, and as long as that context is valid, it will retry indefinitely.

Test Plan: Tested that query resumption works by deploying a vizier with Read/WriteTimeouts set to 60s on the grpc servers, and then seeing that the query failed and then was resumed after 60s.

Reviewers: michelle, vihang, philkuz, zasgar

Reviewed By: philkuz

Signed-off-by: James Bartlett <jamesbartlett@pixielabs.ai>

Differential Revision: https://phab.corp.pixielabs.ai/D12305

GitOrigin-RevId: c1989888497e0c7c6d8714634eee8933d92a8178
  • Loading branch information
JamesMBartlett authored and copybaranaut committed Oct 31, 2022
1 parent 2e68511 commit 1588c30
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/api/go/pxapi/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ go_library(
"//src/api/proto/cloudpb:cloudapi_pl_go_proto",
"//src/api/proto/vizierpb:vizier_pl_go_proto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
],
)

Expand Down
53 changes: 53 additions & 0 deletions src/api/go/pxapi/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ package pxapi

import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"px.dev/pixie/src/api/go/pxapi/errdefs"
"px.dev/pixie/src/api/go/pxapi/types"
"px.dev/pixie/src/api/go/pxapi/utils"
Expand Down Expand Up @@ -59,6 +65,10 @@ type ScriptResults struct {
wg sync.WaitGroup

stats *ResultsStats

v *VizierClient
queryID string
origCtx context.Context
}

func newScriptResults() *ScriptResults {
Expand Down Expand Up @@ -133,6 +143,37 @@ func (s *ScriptResults) handleGRPCMsg(ctx context.Context, resp *vizierpb.Execut
return errdefs.ErrInternalUnImplementedType
}

func isTransientGRPCError(err error) bool {
s, ok := status.FromError(err)
if !ok {
return false
}
if s.Code() == codes.Internal && strings.Contains(s.Message(), "RST_STREAM") {
return true
}
return false
}

func (s *ScriptResults) reconnect() error {
if s.queryID == "" {
return errors.New("cannot reconnect to query that hasn't returned a QueryID yet")
}
req := &vizierpb.ExecuteScriptRequest{
ClusterID: s.v.vizierID,
QueryID: s.queryID,
EncryptionOptions: s.v.encOpts,
}
ctx, cancel := context.WithCancel(s.origCtx)
res, err := s.v.vzClient.ExecuteScript(s.v.cloud.cloudCtxWithMD(ctx), req)
if err != nil {
cancel()
return err
}
s.cancel = cancel
s.c = res
return nil
}

func (s *ScriptResults) run() error {
ctx := s.c.Context()
for {
Expand All @@ -143,11 +184,23 @@ func (s *ScriptResults) run() error {
// Stream has terminated.
return nil
}
if isTransientGRPCError(err) {
origErr := err
err = s.reconnect()
if err != nil {
return fmt.Errorf("streaming failed: %w, error occurred while reconnecting: %v", origErr, err)
}
ctx = s.c.Context()
continue
}
return err
}
if resp == nil {
return nil
}
if s.queryID == "" {
s.queryID = resp.QueryID
}
if err := s.handleGRPCMsg(ctx, resp); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions src/api/go/pxapi/vizier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (v *VizierClient) ExecuteScript(ctx context.Context, pxl string, mux TableM
QueryStr: pxl,
EncryptionOptions: v.encOpts,
}
origCtx := ctx
ctx, cancel := context.WithCancel(ctx)
res, err := v.vzClient.ExecuteScript(v.cloud.cloudCtxWithMD(ctx), req)
if err != nil {
Expand All @@ -54,6 +55,8 @@ func (v *VizierClient) ExecuteScript(ctx context.Context, pxl string, mux TableM
sr.cancel = cancel
sr.tm = mux
sr.decOpts = v.decOpts
sr.v = v
sr.origCtx = origCtx

return sr, nil
}
Expand Down

0 comments on commit 1588c30

Please sign in to comment.