Skip to content

Commit

Permalink
refine: refine smux conn #40 (#41)
Browse files Browse the repository at this point in the history
* refine: refine smux conn

* add cancel in copybuffer

* use round_robin as lb algorithm #31 #40
  • Loading branch information
Ehco1996 authored May 7, 2021
1 parent 6ee3a16 commit a0047f5
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 278 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ go get -u "github.com/Ehco1996/ehco/cmd/ehco"
}
```

## Benchmark
## Benchmark(Apple m1)

iperf:

Expand Down Expand Up @@ -140,5 +140,5 @@ iperf3 -c 0.0.0.0 -p 1234 -u -b 1G --length 1024

| iperf | raw | relay(raw) | relay(ws) |relay(wss) | relay(mwss)|
| ---- | ---- | ---- | ---- | ---- | ---- |
| tcp | 62.6 Gbits/sec | 23.9 Gbits/sec | 14.65 Gbits/sec | 4.22 Gbits/sec | 2.43 Gbits/sec |
| tcp | 123 Gbits/sec | 55 Gbits/sec | 41 Gbits/sec | 10 Gbits/sec | 5.78 Gbits/sec |
| udp | 14.5 Gbits/sec | 3.3 Gbits/sec | 直接转发 | 直接转发 | 直接转发 |
98 changes: 0 additions & 98 deletions internal/lb/lb.go

This file was deleted.

39 changes: 0 additions & 39 deletions internal/lb/lb_test.go

This file was deleted.

24 changes: 24 additions & 0 deletions internal/lb/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package lb

import (
"sync/atomic"
)

// RoundRobin is an interface for representing round-robin balancing.
type RoundRobin interface {
Next() string
}

type roundrobin struct {
remotes []string
next uint32
}

func NewRBRemotes(remotes []string) RoundRobin {
return &roundrobin{remotes: remotes}
}

func (r *roundrobin) Next() string {
n := atomic.AddUint32(&r.next, 1)
return r.remotes[(int(n)-1)%len(r.remotes)]
}
19 changes: 19 additions & 0 deletions internal/lb/round_robin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package lb

import (
"testing"
)

func Test_roundrobin_Next(t *testing.T) {
remotes := []string{
"127.0.0.1",
"127.0.0.2",
"127.0.0.3",
}
rb := NewRBRemotes(remotes)
for i := 0; i < len(remotes); i++ {
if res := rb.Next(); res != remotes[i] {
t.Fatalf("need %s got %s", remotes[i], res)
}
}
}
24 changes: 11 additions & 13 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func NewRelay(cfg *config.RelayConfig) (*Relay, error) {

TP: transporter.PickTransporter(
cfg.TransportType,
lb.New(cfg.TCPRemotes),
lb.New(cfg.UDPRemotes),
lb.NewRBRemotes(cfg.TCPRemotes),
lb.NewRBRemotes(cfg.UDPRemotes),
),
}

Expand Down Expand Up @@ -179,35 +179,33 @@ func (r *Relay) RunLocalWSSServer() error {
func (r *Relay) RunLocalMWSSServer() error {
r.LogRelay()
tp := r.TP.(*transporter.Raw)
s := &transporter.MWSSServer{
ConnChan: make(chan net.Conn, 1024),
ErrChan: make(chan error, 1),
}
mwssServer := transporter.NewMWSSServer()
mux := mux.NewRouter()
mux.Handle("/", http.HandlerFunc(web.Index))
mux.Handle("/mwss/", http.HandlerFunc(s.Upgrade))
server := &http.Server{
mux.Handle("/mwss/", http.HandlerFunc(mwssServer.Upgrade))
httpServer := &http.Server{
Addr: r.LocalTCPAddr.String(),
Handler: mux,
TLSConfig: mytls.DefaultTLSConfig,
ReadHeaderTimeout: 30 * time.Second,
}
s.Server = server
mwssServer.Server = httpServer

ln, err := net.Listen("tcp", r.LocalTCPAddr.String())
if err != nil {
return err
}
go func() {
err := server.Serve(tls.NewListener(ln, server.TLSConfig))
err := httpServer.Serve(tls.NewListener(ln, httpServer.TLSConfig))
if err != nil {
s.ErrChan <- err
mwssServer.ErrChan <- err
}
close(s.ErrChan)
close(mwssServer.ErrChan)
}()

var tempDelay time.Duration
for {
conn, e := s.Accept()
conn, e := mwssServer.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
Expand Down
22 changes: 18 additions & 4 deletions internal/transporter/buffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transporter

import (
"context"
"errors"
"io"
"net"
Expand Down Expand Up @@ -72,15 +73,28 @@ func copyBuffer(dst io.Writer, src io.Reader, bufferPool *sync.Pool) (written in

// NOTE must call setdeadline before use this func or may goroutine leak
func transport(rw1, rw2 io.ReadWriter) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errc := make(chan error, 1)
go func() {
_, err := copyBuffer(rw1, rw2, InboundBufferPool)
errc <- err
select {
case <-ctx.Done():
println("ctx done exits copy1")
default:
_, err := copyBuffer(rw1, rw2, InboundBufferPool)
errc <- err
}
}()

go func() {
_, err := copyBuffer(rw2, rw1, OutboundBufferPool)
errc <- err
select {
case <-ctx.Done():
println("ctx done exit copy1")
default:
_, err := copyBuffer(rw2, rw1, InboundBufferPool)
errc <- err
}
}()
err := <-errc
// NOTE 我们不关心operror 比如 eof/reset/broken pipe
Expand Down
Loading

0 comments on commit a0047f5

Please sign in to comment.