Skip to content

Commit

Permalink
fix: try fix udp lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Jun 2, 2021
1 parent 403bfdb commit 30bccf0
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 23 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ require (
github.com/prometheus/client_golang v1.9.0
github.com/urfave/cli/v2 v2.3.0
github.com/xtaci/smux v1.5.15
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/zap v1.16.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
Expand Down
6 changes: 3 additions & 3 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func (r *Relay) RunLocalUDPServer() error {
}
bc := r.TP.GetOrCreateBufferCh(addr)
bc.Ch <- buf[0:n]
if !bc.Handled {
bc.Handled = true
go r.TP.HandleUDPConn(addr, lis)
if !bc.Handled.Load() {
bc.Handled.Store(true)
go r.TP.HandleUDPConn(bc.UDPAddr, lis)
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions internal/transporter/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net"
"syscall"

"go.uber.org/atomic"

"github.com/Ehco1996/ehco/internal/constant"
"github.com/Ehco1996/ehco/internal/web"
)
Expand Down Expand Up @@ -89,12 +91,14 @@ func transport(rw1, rw2 io.ReadWriter) error {

type BufferCh struct {
Ch chan []byte
Handled bool
Handled atomic.Bool
UDPAddr *net.UDPAddr
}

func newudpBufferCh() *BufferCh {
func newudpBufferCh(clientUDPAddr *net.UDPAddr) *BufferCh {
return &BufferCh{
Ch: make(chan []byte, 100),
Handled: false,
Handled: atomic.Bool{},
UDPAddr: clientUDPAddr,
}
}
32 changes: 15 additions & 17 deletions internal/transporter/raw.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transporter

import (
"context"
"net"
"net/http"
"sync"
Expand All @@ -22,9 +23,10 @@ type Raw struct {
func (raw *Raw) GetOrCreateBufferCh(uaddr *net.UDPAddr) *BufferCh {
raw.udpmu.Lock()
defer raw.udpmu.Unlock()

bc, found := raw.UDPBufferChMap[uaddr.String()]
if !found {
bc := newudpBufferCh()
bc := newudpBufferCh(uaddr)
raw.UDPBufferChMap[uaddr.String()] = bc
return bc
}
Expand All @@ -36,10 +38,9 @@ func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
defer web.CurUDPNum.Dec()

bc := raw.GetOrCreateBufferCh(uaddr)
defer close(bc.Ch)
remote := raw.UDPRemotes.Next()

rc, err := net.Dial("udp", remote)
remoteUdp, _ := net.ResolveUDPAddr("udp", raw.UDPRemotes.Next())
rc, err := net.DialUDP("udp", nil, remoteUdp)
if err != nil {
logger.Info(err)
return
Expand All @@ -49,32 +50,26 @@ func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
delete(raw.UDPBufferChMap, uaddr.String())
}()

logger.Infof("[raw] HandleUDPConn from %s to %s", local.LocalAddr().String(), remote)
logger.Infof("[raw] HandleUDPConn from %s to %s", local.LocalAddr().String(), remoteUdp.String())

buf := BufferPool.Get()
defer BufferPool.Put(buf)

var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())

go func() {
defer wg.Done()
defer cancel()
wt := 0
for {
if serr := rc.SetDeadline(time.Now().Add(time.Second * 3)); serr != nil {
logger.Info(err)
break
}
rc.SetReadDeadline(time.Now().Add(time.Second * 15))
i, err := rc.Read(buf)
if err != nil {
logger.Info(err)
break
} else {
if serr := rc.SetDeadline(time.Now().Add(time.Second * 3)); serr != nil {
logger.Info(err)
break
}
}

if _, err := local.WriteToUDP(buf[0:i], uaddr); err != nil {
logger.Info(err)
break
Expand All @@ -88,11 +83,14 @@ func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
go func() {
defer wg.Done()
wt := 0
for b := range bc.Ch {
select {
case <-ctx.Done():
return
case b := <-bc.Ch:
wt += len(b)
if _, err := rc.Write(b); err != nil {
logger.Info(err)
break
return
}
}
web.NetWorkTransmitBytes.Add(float64(wt * 2))
Expand Down

0 comments on commit 30bccf0

Please sign in to comment.