-
Notifications
You must be signed in to change notification settings - Fork 81
/
Copy pathproxy.go
146 lines (134 loc) · 3.35 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package proxy
import (
"context"
"fmt"
"io"
"net"
"strconv"
"time"
"github.com/xxf098/lite-proxy/common"
N "github.com/xxf098/lite-proxy/common/net"
"github.com/xxf098/lite-proxy/common/pool"
"github.com/xxf098/lite-proxy/log"
"github.com/xxf098/lite-proxy/tunnel"
"github.com/xxf098/lite-proxy/utils"
)
// proxy http/scocks to vmess
const Name = "PROXY"
type Proxy struct {
sources []tunnel.Server
sink tunnel.Client
ctx context.Context
cancel context.CancelFunc
pool *utils.WorkerPool
}
func (p *Proxy) Run() error {
p.relayConnLoop()
// p.relayPacketLoop()
<-p.ctx.Done()
return nil
}
func (p *Proxy) Close() error {
p.cancel()
p.sink.Close()
for _, source := range p.sources {
source.Close()
}
if p.pool != nil {
p.pool.Stop()
}
return nil
}
// forward from socks/http connection to vmess/trojan
// TODO: bypass cn
func (p *Proxy) relayConnLoop() {
pool := utils.WorkerPool{
WorkerFunc: func(inbound tunnel.Conn) error {
defer inbound.Close()
addr := inbound.Metadata().Address
var outbound net.Conn
var err error
start := time.Now()
if addr.IP != nil && N.IsPrivateAddress(addr.IP) {
networkType := addr.NetworkType
if networkType == "" {
networkType = "tcp"
}
add := net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
outbound, err = net.Dial(networkType, add)
} else {
outbound, err = p.sink.DialConn(addr, nil)
}
if err != nil {
log.Error(common.NewError("proxy failed to dial connection").Base(err))
return err
}
elapsed := fmt.Sprintf("%dms", time.Since(start).Milliseconds())
log.D("connect to:", addr, elapsed)
defer outbound.Close()
// relay
return relay(outbound, inbound)
},
MaxWorkersCount: 2000,
LogAllErrors: false,
MaxIdleWorkerDuration: 2 * time.Minute,
}
p.pool = &pool
pool.Start()
for _, source := range p.sources {
go func(source tunnel.Server) {
for {
inbound, err := source.AcceptConn(nil)
if err != nil {
select {
case <-p.ctx.Done():
log.D("exiting")
return
default:
}
log.Error(common.NewError("failed to accept connection").Base(err))
continue
}
pool.Serve(inbound)
}
}(source)
}
}
func relay(leftConn, rightConn net.Conn) error {
ch := make(chan error)
go func() {
buf := pool.Get(pool.RelayBufferSize)
// Wrapping to avoid using *net.TCPConn.(ReadFrom)
// See also https://github.com/Dreamacro/clash/pull/1209
_, err := io.CopyBuffer(N.WriteOnlyWriter{Writer: leftConn}, N.ReadOnlyReader{Reader: rightConn}, buf)
if err != nil {
log.E(err.Error())
}
pool.Put(buf)
leftConn.SetReadDeadline(time.Now())
ch <- err
}()
buf := pool.Get(pool.RelayBufferSize)
_, err := io.CopyBuffer(N.WriteOnlyWriter{Writer: rightConn}, N.ReadOnlyReader{Reader: leftConn}, buf)
if err != nil {
log.E(err.Error())
}
pool.Put(buf)
rightConn.SetReadDeadline(time.Now())
err = <-ch
return err
}
func NewProxy(ctx context.Context, cancel context.CancelFunc, sources []tunnel.Server, sink tunnel.Client) *Proxy {
return &Proxy{
sources: sources,
sink: sink,
ctx: ctx,
cancel: cancel,
}
}
// A Dialer is a means to establish a connection.
// Custom dialers should also implement ContextDialer.
type Dialer interface {
// Dial connects to the given address via the proxy.
Dial(network, addr string) (c net.Conn, err error)
}