Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Fix for Websocket behind Proxy Issue (#196)
Browse files Browse the repository at this point in the history
* Fix lastError race

* Move websocket pkg out of std lib to deal with proxy.

* Add WebsocketEOF to connection closed error check.

* add comment

* added ctx to flow to ns.newClient

Co-authored-by: Alexander Pashkov <alexpashkov123@gmail.com>
Co-authored-by: Chris <christopher.mcmillon@halliburton.com>
Co-authored-by: Joel Hendrix <jhendrix@microsoft.com>
  • Loading branch information
4 people authored Dec 2, 2020
1 parent c0e1f59 commit ff0e3f0
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 19 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ require (
github.com/joho/godotenv v1.3.0
github.com/mitchellh/mapstructure v1.3.3
github.com/stretchr/testify v1.6.1
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect
nhooyr.io/websocket v1.8.6
)
54 changes: 52 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,85 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mitchellh/mapstructure v1.3.3 h1:SzB1nHZ2Xi+17FP0zVQBHIZqvwRN9408fJO8h+eeNA8=
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k=
nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
19 changes: 11 additions & 8 deletions namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@ import (
"context"
"crypto/tls"
"fmt"
"runtime"
"strings"

"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v3/cbs"
"github.com/Azure/azure-amqp-common-go/v3/conn"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/azure"
"golang.org/x/net/websocket"
"nhooyr.io/websocket"
"runtime"
"strings"
)

const (
Expand Down Expand Up @@ -194,7 +193,9 @@ func NewNamespace(opts ...NamespaceOption) (*Namespace, error) {
return ns, nil
}

func (ns *Namespace) newClient() (*amqp.Client, error) {
func (ns *Namespace) newClient(ctx context.Context) (*amqp.Client, error) {
ctx, span := ns.startSpanFromContext(ctx, "sb.namespace.newClient")
defer span.End()
defaultConnOptions := []amqp.ConnOption{
amqp.ConnSASLAnonymous(),
amqp.ConnMaxSessions(65535),
Expand All @@ -215,13 +216,15 @@ func (ns *Namespace) newClient() (*amqp.Client, error) {

if ns.useWebSocket {
wssHost := ns.getWSSHostURI() + "$servicebus/websocket"
wssConn, err := websocket.Dial(wssHost, "amqp", "http://localhost/")
opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}}
wssConn, _, err := websocket.Dial(ctx, wssHost, opts)

if err != nil {
return nil, err
}
nConn := websocket.NetConn(context.Background(), wssConn, websocket.MessageBinary)

wssConn.PayloadType = websocket.BinaryFrame
return amqp.New(wssConn, append(defaultConnOptions, amqp.ConnServerHostname(ns.getHostname()))...)
return amqp.New(nConn, append(defaultConnOptions, amqp.ConnServerHostname(ns.getHostname()))...)
}

return amqp.Dial(ns.getAMQPHostURI(), defaultConnOptions...)
Expand Down
3 changes: 2 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ func (q *Queue) Close(ctx context.Context) error {
return lastErr
}

// failed to close WebSocket: failed to read frame header: EOF returned for websocket closing frm net conn.
func isConnectionClosed(err error) bool {
return err.Error() == "amqp: connection closed"
return err.Error() == "amqp: connection closed" || err.Error() == "failed to close WebSocket: failed to read frame header: EOF"
}

func (q *Queue) newReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error) {
Expand Down
19 changes: 14 additions & 5 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type (
useSessions bool
sessionID *string
lastError error
lastErrorMu sync.RWMutex
mode ReceiveMode
prefetch uint32
DefaultDisposition DispositionAction
Expand Down Expand Up @@ -242,7 +243,7 @@ func (r *Receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
if err != nil {
_, span := r.startConsumerSpanFromContext(ctx, optName)
span.Logger().Error(err)
r.lastError = err
r.setLastError(err)
if r.doneListening != nil {
r.doneListening()
}
Expand All @@ -259,7 +260,7 @@ func (r *Receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler

if err := handler.Handle(ctx, event); err != nil {
// stop handling messages since the message consumer ran into an unexpected error
r.lastError = err
r.setLastError(err)
if r.doneListening != nil {
r.doneListening()
}
Expand All @@ -283,7 +284,7 @@ func (r *Receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
// if an error is returned by the default disposition, then we must alert the message consumer as we can't
// be sure the final message disposition.
tab.For(ctx).Error(err)
r.lastError = err
r.setLastError(err)
if r.doneListening != nil {
r.doneListening()
}
Expand Down Expand Up @@ -329,7 +330,7 @@ func (r *Receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes

if retryErr != nil {
tab.For(ctx).Debug("retried, but error was unrecoverable")
r.lastError = retryErr
r.setLastError(retryErr)
if err := r.Close(ctx); err != nil {
tab.For(ctx).Error(err)
}
Expand All @@ -340,6 +341,12 @@ func (r *Receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
}
}

func (r *Receiver) setLastError(err error) {
r.lastErrorMu.Lock()
r.lastError = err
r.lastErrorMu.Unlock()
}

func (r *Receiver) listenForMessage(ctx context.Context) (*amqp.Message, error) {
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.listenForMessage")
defer span.End()
Expand Down Expand Up @@ -381,7 +388,7 @@ func (r *Receiver) newSessionAndLink(ctx context.Context) error {
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.newSessionAndLink")
defer span.End()

client, err := r.namespace.newClient()
client, err := r.namespace.newClient(ctx)
if err != nil {
tab.For(ctx).Error(err)
return err
Expand Down Expand Up @@ -501,6 +508,8 @@ func (lc *ListenerHandle) Done() <-chan struct{} {

// Err will return the last error encountered
func (lc *ListenerHandle) Err() error {
lc.r.lastErrorMu.RLock()
defer lc.r.lastErrorMu.RUnlock()
if lc.r.lastError != nil {
return lc.r.lastError
}
Expand Down
2 changes: 1 addition & 1 deletion rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *rpcClient) ensureConn(ctx context.Context) error {
r.clientMu.Lock()
defer r.clientMu.Unlock()

client, err := r.ec.Namespace().newClient()
client, err := r.ec.Namespace().newClient(ctx)
err = r.ec.Namespace().negotiateClaim(ctx, client, r.ec.ManagementPath())
if err != nil {
tab.For(ctx).Error(err)
Expand Down
2 changes: 1 addition & 1 deletion sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (s *Sender) newSessionAndLink(ctx context.Context) error {
ctx, span := s.startProducerSpanFromContext(ctx, "sb.Sender.newSessionAndLink")
defer span.End()

connection, err := s.namespace.newClient()
connection, err := s.namespace.newClient(ctx)
if err != nil {
tab.For(ctx).Error(err)
return err
Expand Down

0 comments on commit ff0e3f0

Please sign in to comment.