Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
golang.org/x/net v0.48.0
golang.org/x/sync v0.19.0
golang.org/x/sys v0.39.0
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
google.golang.org/grpc v1.78.0
google.golang.org/protobuf v1.36.11
Expand All @@ -50,7 +51,6 @@ require (
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/tools v0.39.0 // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
130 changes: 115 additions & 15 deletions proxy/wireguard/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,26 @@ type netBindClient struct {
ctx context.Context
dialer internet.Dialer
reserved []byte

// Track all peer connections for unified reading
connMutex sync.RWMutex
conns map[*netEndpoint]net.Conn
dataChan chan *receivedData
closeChan chan struct{}
closeOnce sync.Once
}

const (
// Buffer size for dataChan - allows some buffering of received packets
// while dispatcher matches them with read requests
dataChannelBufferSize = 100
)

type receivedData struct {
data []byte
n int
endpoint *netEndpoint
err error
}

func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
Expand All @@ -133,34 +153,114 @@ func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
}
endpoint.conn = c

go func(readQueue <-chan *netReadInfo, endpoint *netEndpoint) {
// Initialize channels on first connection
bind.connMutex.Lock()
if bind.conns == nil {
bind.conns = make(map[*netEndpoint]net.Conn)
bind.dataChan = make(chan *receivedData, dataChannelBufferSize)
bind.closeChan = make(chan struct{})

// Start unified reader dispatcher
go bind.unifiedReader()
}
bind.conns[endpoint] = c
bind.connMutex.Unlock()

// Start a reader goroutine for this specific connection
go func(conn net.Conn, endpoint *netEndpoint) {
const maxPacketSize = 1500
for {
v, ok := <-readQueue
if !ok {
select {
case <-bind.closeChan:
return
default:
}
i, err := c.Read(v.buff)

if i > 3 {
v.buff[1] = 0
v.buff[2] = 0
v.buff[3] = 0

buf := make([]byte, maxPacketSize)
n, err := conn.Read(buf)

// Send only the valid data portion to dispatcher
dataToSend := buf
if n > 0 && n < len(buf) {
dataToSend = buf[:n]
}

v.bytes = i
v.endpoint = endpoint
v.err = err
v.waiter.Done()

// Send received data to dispatcher
select {
case bind.dataChan <- &receivedData{
data: dataToSend,
n: n,
endpoint: endpoint,
err: err,
}:
case <-bind.closeChan:
return
}

if err != nil {
bind.connMutex.Lock()
delete(bind.conns, endpoint)
endpoint.conn = nil
bind.connMutex.Unlock()
return
}
}
}(bind.readQueue, endpoint)
}(c, endpoint)

return nil
}

// unifiedReader dispatches received data to waiting read requests
func (bind *netBindClient) unifiedReader() {
for {
select {
case data := <-bind.dataChan:
// Bounds check to prevent panic
if data.n > len(data.data) {
data.n = len(data.data)
}

// Wait for a read request with timeout to prevent blocking forever
select {
case v := <-bind.readQueue:
// Copy data to request buffer
n := copy(v.buff, data.data[:data.n])

// Clear reserved bytes if needed
if n > 3 {
v.buff[1] = 0
v.buff[2] = 0
v.buff[3] = 0
}

v.bytes = n
v.endpoint = data.endpoint
v.err = data.err
v.waiter.Done()
case <-bind.closeChan:
return
}
case <-bind.closeChan:
return
}
}
}

// Close implements conn.Bind.Close for netBindClient
func (bind *netBindClient) Close() error {
// Use sync.Once to prevent double-close panic
bind.closeOnce.Do(func() {
bind.connMutex.Lock()
if bind.closeChan != nil {
close(bind.closeChan)
}
bind.connMutex.Unlock()
})

// Call parent Close
return bind.netBind.Close()
}

func (bind *netBindClient) Send(buff [][]byte, endpoint conn.Endpoint) error {
var err error

Expand Down
10 changes: 8 additions & 2 deletions proxy/wireguard/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,22 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
}

// bind := conn.NewStdNetBind() // TODO: conn.Bind wrapper for dialer
// Set workers to number of peers if not explicitly configured
// This allows concurrent packet reception from multiple peers
workers := int(h.conf.NumWorkers)
if workers <= 0 && len(h.conf.Peers) > 0 {
workers = len(h.conf.Peers)
}
h.bind = &netBindClient{
netBind: netBind{
dns: h.dns,
dnsOption: dns.IPOption{
IPv4Enable: h.hasIPv4,
IPv6Enable: h.hasIPv6,
},
workers: int(h.conf.NumWorkers),
workers: workers,
},
ctx: ctx,
ctx: core.ToBackgroundDetachedContext(ctx),
dialer: dialer,
reserved: h.conf.Reserved,
}
Expand Down