Skip to content

Bugfix/unbounded send queue #224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c5fa214
Add unbounded send-queue. (WIP. Panics.)
lthibault Feb 20, 2022
7f0b180
Remove sender lock (WIP. Panics due to negative waitgroup.)
lthibault Feb 20, 2022
7f61672
Fix waitgroup panic.
lthibault Feb 20, 2022
7d8532f
Hold lock during entire duration of Bootstrap call.
lthibault Feb 21, 2022
426d126
Add fine-grained locking in Conn.Bootstrap.
lthibault Feb 21, 2022
ffecdfc
Rename package 'errors' to 'exc' and move to project root.
lthibault Feb 21, 2022
605d61a
Refactor exception annotation in package rpc.
lthibault Feb 21, 2022
1ec61a2
Refactor exception annotation in main package + server.
lthibault Feb 21, 2022
d0021f6
Rename exc.Factory to exc.Annotator.
lthibault Feb 21, 2022
33f4b44
Wrap errors returned from rpc_test.pipe with %w in order for rpc.Conn to
lthibault Feb 21, 2022
c31b854
Wrap errors returned by stream transport with %w to allow inspection.
lthibault Feb 21, 2022
825d368
Fix errgroup wait condition.
lthibault Feb 22, 2022
a51146a
Clean up level0_test
lthibault Feb 22, 2022
c5fdc8a
Run level 0 tests in parallel.
lthibault Feb 22, 2022
e41432f
Simplify locking in rpc.Conn.Bootstrap
lthibault Feb 22, 2022
0d709f1
Fix deadlock in shutdown.
lthibault Feb 22, 2022
6e3eef3
Clean up Conn.shutdown()
lthibault Feb 22, 2022
fbb544b
Remove ExcAlreadyClosed.
lthibault Feb 22, 2022
c0985d5
Fix deadlock in tests.
lthibault Mar 5, 2022
921e2fd
Pool error channels to avoid allocs during high RPC load.
lthibault Mar 5, 2022
e6c3969
Revert .AddRef() removal from debugging https://github.com/capnproto/…
lthibault Mar 5, 2022
c7fdb9d
WIP. Async send.
lthibault Apr 12, 2022
8bc121b
bugfix: test for non-nil error in question.PipelineCall
lthibault Apr 13, 2022
61a838b
Bugfix:
lthibault Apr 13, 2022
dc40266
Eagerly allocate RPC parameters.
lthibault Apr 15, 2022
6388313
Use generic mpsc.Queue
lthibault Apr 15, 2022
28a7aae
Return 'disconnected' exception when aborting an async send.
lthibault Apr 15, 2022
1d9a449
Tidy up Conn.Bootstrap
lthibault Apr 15, 2022
0258bd5
Bugfix. Stop tasks before draining send queue.
lthibault Apr 15, 2022
69f4f84
Tidy up rpc package.
lthibault Apr 15, 2022
30be2d2
Fix data race in rpc_test.pipe.
lthibault Apr 15, 2022
8fc6544
Clean up unit tests for packed encoding.
lthibault Apr 16, 2022
42a1070
Style fixes.
lthibault Apr 16, 2022
cae6c0c
Document invariants for receive-goroutine blocking behavior.
lthibault Apr 16, 2022
fa02f1f
Uncomment ErrorReporter in BenchmarkPingPong
lthibault Apr 16, 2022
b798fba
Bugfix. Add call to rpc.Conn.setAnswerQuestion in rpc.Conn.Bootstrap.
lthibault Apr 16, 2022
f2c1a7f
Fix wording of answer.sendError and answer.sendException docstrings.
lthibault Apr 16, 2022
dc14c47
Add docstring to sendMessage.
lthibault Apr 16, 2022
2f8e37f
Remove ReportAllocs() in rpc/bench_test.go
lthibault Apr 16, 2022
f2a1209
Bugfix. Release lock before calling rpc.Conn.drainQueue.
lthibault Apr 16, 2022
279179f
Bugfix. Remove deadlock from handleReturn. Replace netPipe with net.P…
lthibault Apr 16, 2022
3ee412c
Disable slow tests when running race detector.
lthibault Apr 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ jobs:
uses: actions/setup-go@v1
with:
go-version: ${{ matrix.go }}
- run: go test -v -race ./...
- run: go test -v -race -short ./...
- run: go test -v ./...
8 changes: 4 additions & 4 deletions answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strconv"
"sync"

"capnproto.org/go/capnp/v3/internal/errors"
"capnproto.org/go/capnp/v3/exc"
"capnproto.org/go/capnp/v3/internal/syncutil"
)

Expand Down Expand Up @@ -766,11 +766,11 @@ type resolution struct {
// ptr obtains a Ptr by applying a transform.
func (r resolution) ptr(transform []PipelineOp) (Ptr, error) {
if r.err != nil {
return Ptr{}, errors.Annotate("", r.method.String(), r.err)
return Ptr{}, exc.Annotate("", r.method.String(), r.err)
}
p, err := Transform(r.result, transform)
if err != nil {
return Ptr{}, errors.Annotate("", r.method.String(), err)
return Ptr{}, exc.Annotate("", r.method.String(), err)
}
return p, nil
}
Expand All @@ -789,7 +789,7 @@ func (r resolution) client(transform []PipelineOp) *Client {
}
iface := p.Interface()
if p.IsValid() && !iface.IsValid() {
return ErrorClient(newError("not a capability"))
return ErrorClient(errorf("not a capability"))
}
return iface.Client()
}
Expand Down
12 changes: 6 additions & 6 deletions canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Canonicalize(s Struct) ([]byte, error) {
return nil, errorf("canonicalize: %v", err)
}
if err := fillCanonicalStruct(root, s); err != nil {
return nil, annotate(err).errorf("canonicalize")
return nil, annotatef(err, "canonicalize")
}
return seg.Data(), nil
}
Expand Down Expand Up @@ -55,14 +55,14 @@ func fillCanonicalStruct(dst, s Struct) error {
for i := uint16(0); i < dst.size.PointerCount; i++ {
p, err := s.Ptr(i)
if err != nil {
return annotate(err).errorf("struct pointer %d", i)
return annotatef(err, "struct pointer %d", i)
}
cp, err := canonicalPtr(dst.seg, p)
if err != nil {
return annotate(err).errorf("struct pointer %d", i)
return annotatef(err, "struct pointer %d", i)
}
if err := dst.SetPtr(i, cp); err != nil {
return annotate(err).errorf("struct pointer %d", i)
return annotatef(err, "struct pointer %d", i)
}
}
return nil
Expand Down Expand Up @@ -124,7 +124,7 @@ func canonicalList(dst *Segment, l List) (List, error) {
}
cp, err := canonicalPtr(dst, p)
if err != nil {
return List{}, annotate(err).errorf("list element %d", i)
return List{}, annotatef(err, "list element %d", i)
}
if err := cl.Set(i, cp); err != nil {
return List{}, errorf("list element %d: %v", i, err)
Expand All @@ -150,7 +150,7 @@ func canonicalList(dst *Segment, l List) (List, error) {
}
for i := 0; i < cl.Len(); i++ {
if err := fillCanonicalStruct(cl.Struct(i), l.Struct(i)); err != nil {
return List{}, annotate(err).errorf("list element %d", i)
return List{}, annotatef(err, "list element %d", i)
}
}
return cl, nil
Expand Down
16 changes: 11 additions & 5 deletions capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,10 @@ func (c *Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) {
h, _, released, finish := c.startCall()
defer finish()
if released {
return ErrorAnswer(s.Method, newError("call on released client")), func() {}
return ErrorAnswer(s.Method, errorf("call on released client")), func() {}
}
if h == nil {
return ErrorAnswer(s.Method, newError("call on null client")), func() {}
return ErrorAnswer(s.Method, errorf("call on null client")), func() {}
}

limiter := c.GetFlowLimiter()
Expand Down Expand Up @@ -345,11 +345,11 @@ func (c *Client) RecvCall(ctx context.Context, r Recv) PipelineCaller {
h, _, released, finish := c.startCall()
defer finish()
if released {
r.Reject(newError("call on released client"))
r.Reject(errorf("call on released client"))
return nil
}
if h == nil {
r.Reject(newError("call on null client"))
r.Reject(errorf("call on null client"))
return nil
}
return h.Recv(ctx, r)
Expand Down Expand Up @@ -384,11 +384,13 @@ func (c *Client) Resolve(ctx context.Context) error {
for {
h, released, resolved := c.peek()
if released {
return newError("cannot resolve released client")
return errorf("cannot resolve released client")
}

if resolved {
return nil
}

select {
case <-h.resolved:
case <-ctx.Done():
Expand Down Expand Up @@ -606,6 +608,10 @@ type ClientPromise struct {
h *clientHook
}

func (cp *ClientPromise) Reject(err error) {
cp.Fulfill(ErrorClient(err))
}

// Fulfill resolves the client promise to c. After Fulfill returns,
// then all future calls to the client created by NewPromisedClient will
// be sent to c. It is guaranteed that the hook passed to
Expand Down
35 changes: 14 additions & 21 deletions error.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,42 @@
package capnp

import (
"fmt"
"capnproto.org/go/capnp/v3/exc"
)

"capnproto.org/go/capnp/v3/internal/errors"
var (
capnperr = exc.Annotator("capnp")
)

// TODO(someday): progressively remove exported functions and instead
// rely on package 'exc'.

// Unimplemented returns an error that formats as the given text and
// will report true when passed to IsUnimplemented.
func Unimplemented(s string) error {
return errors.New(errors.Unimplemented, "", s)
return exc.New(exc.Unimplemented, "", s)
}

// IsUnimplemented reports whether e indicates that functionality is unimplemented.
func IsUnimplemented(e error) bool {
return errors.TypeOf(e) == errors.Unimplemented
return exc.TypeOf(e) == exc.Unimplemented
}

// Disconnected returns an error that formats as the given text and
// will report true when passed to IsDisconnected.
func Disconnected(s string) error {
return errors.New(errors.Disconnected, "", s)
return exc.New(exc.Disconnected, "", s)
}

// IsDisconnected reports whether e indicates a failure due to loss of a necessary capability.
func IsDisconnected(e error) bool {
return errors.TypeOf(e) == errors.Disconnected
}

func newError(msg string) error {
return errors.New(errors.Failed, "capnp", msg)
return exc.TypeOf(e) == exc.Disconnected
}

func errorf(format string, args ...interface{}) error {
return newError(fmt.Sprintf(format, args...))
}

type annotater struct {
err error
}

func annotate(err error) annotater {
return annotater{err}
return capnperr.Failedf(format, args...)
}

func (a annotater) errorf(format string, args ...interface{}) error {
return errors.Annotate("capnp", fmt.Sprintf(format, args...), a.err)
func annotatef(err error, format string, args ...interface{}) error {
return capnperr.Annotatef(err, format, args...)
}
113 changes: 113 additions & 0 deletions exc/exc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Package exc provides an error type for capnp exceptions.
package exc

import (
"errors"
"fmt"
)

// Exception is an error that designates a Cap'n Proto exception.
type Exception struct {
Type Type
Prefix string
Cause error
}

// New creates a new error that formats as "<prefix>: <msg>".
// The type can be recovered using the TypeOf() function.
func New(typ Type, prefix, msg string) *Exception {
return &Exception{typ, prefix, errors.New(msg)}
}

func (e Exception) Error() string {
if e.Prefix == "" {
return e.Cause.Error()
}

return fmt.Sprintf("%s: %v", e.Prefix, e.Cause)
}

func (e Exception) Unwrap() error { return e.Cause }

func (e Exception) GoString() string {
return fmt.Sprintf("errors.Error{Type: %s, Prefix: %q, Cause: fmt.Errorf(%q)}",
e.Type.GoString(),
e.Prefix,
e.Cause)
}

// Annotate is creates a new error that formats as "<prefix>: <msg>: <e>".
// If e.Prefix == prefix, the prefix will not be duplicated.
// The returned Error.Type == e.Type.
func (e Exception) Annotate(prefix, msg string) *Exception {
if prefix != e.Prefix {
return &Exception{e.Type, prefix, fmt.Errorf("%s: %w", msg, e)}
}

return &Exception{e.Type, prefix, fmt.Errorf("%s: %w", msg, e.Cause)}
}

// Annotate creates a new error that formats as "<prefix>: <msg>: <err>".
// If err has the same prefix, then the prefix won't be duplicated.
// The returned error's type will match err's type.
func Annotate(prefix, msg string, err error) *Exception {
if err == nil {
return nil
}

if ce, ok := err.(*Exception); ok {
return ce.Annotate(prefix, msg)
}

return &Exception{
Type: Failed,
Prefix: prefix,
Cause: fmt.Errorf("%s: %w", msg, err),
}
}

type Annotator string

func (f Annotator) New(t Type, err error) *Exception {
if err == nil {
return nil
}

return &Exception{
Type: t,
Prefix: string(f),
Cause: err,
}
}

func (f Annotator) Failed(err error) *Exception {
return f.New(Failed, err)
}

func (f Annotator) Failedf(format string, args ...interface{}) *Exception {
return f.Failed(fmt.Errorf(format, args...))
}

func (f Annotator) Disconnected(err error) *Exception {
return f.New(Disconnected, err)
}

func (f Annotator) Disconnectedf(format string, args ...interface{}) *Exception {
return f.Disconnected(fmt.Errorf(format, args...))
}

func (f Annotator) Unimplemented(err error) *Exception {
return f.New(Unimplemented, err)
}

func (f Annotator) Unimplementedf(format string, args ...interface{}) *Exception {
return f.Unimplemented(fmt.Errorf(format, args...))
}

func (f Annotator) Annotate(err error, msg string) *Exception {
return Annotate(string(f), msg, err)
}

func (f Annotator) Annotatef(err error, format string, args ...interface{}) *Exception {
return f.Annotate(err, fmt.Sprintf(format, args...))
}
23 changes: 2 additions & 21 deletions internal/errors/errors_test.go → exc/exc_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package errors
package exc

import (
"errors"
Expand All @@ -13,7 +13,7 @@ func TestUnwrap(t *testing.T) {
var (
errGeneric = errors.New("something went wrong")
err = Annotate("annotated", "test", errGeneric)
exc Error
exc = new(Exception)
)

assert.EqualError(t, errors.Unwrap(err), "test: something went wrong")
Expand Down Expand Up @@ -43,25 +43,6 @@ func TestErrorString(t *testing.T) {
}
}

func TestTypeOf(t *testing.T) {
t.Parallel()

tests := []struct {
err error
want Type
}{
{nil, Failed},
{errors.New("generic"), Failed},
{New(Failed, "capnp", "failed error"), Failed},
{New(Overloaded, "capnp", "overloaded error"), Overloaded},
{New(Disconnected, "capnp", "disconnected error"), Disconnected},
{New(Unimplemented, "capnp", "unimplemented error"), Unimplemented},
}
for _, test := range tests {
assert.Equal(t, test.want, TypeOf(test.err))
}
}

func TestAnnotate(t *testing.T) {
t.Parallel()

Expand Down
Loading