Skip to content

Commit

Permalink
优化 pool 的数据结构,以及健康检查逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
mingcheng committed Jul 9, 2022
1 parent 1eadd79 commit 594aa30
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 512 deletions.
10 changes: 5 additions & 5 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type BackendCheckConfig struct {
}

type Backend struct {
Addr string `yaml:"addr"`
Socks5UserName string `yaml:"username"`
Socks5Password string `yaml:"password"`
CheckConfig *BackendCheckConfig `yaml:"check_config"`
Addr string `yaml:"addr"`
UserName string `yaml:"username"`
Password string `yaml:"password"`
CheckConfig *BackendCheckConfig `yaml:"check_config"`

mux sync.RWMutex
alive bool
Expand Down Expand Up @@ -82,7 +82,7 @@ func (b *Backend) httpProxyClient() (*http.Client, error) {

// socks5Client to create http client with socks5 proxy
func (b *Backend) socks5Client(timeout int) (*socks5.Client, error) {
return socks5.NewClient(b.Addr, b.Socks5UserName, b.Socks5Password, timeout, timeout)
return socks5.NewClient(string(b.Addr), b.UserName, b.Password, timeout, timeout)
}

// Socks5Conn to create a connection by specific params
Expand Down
22 changes: 6 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,19 @@ require (
github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed
github.com/judwhite/go-svc v1.2.1
github.com/sirupsen/logrus v1.8.1
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.7.1
github.com/txthinking/socks5 v0.0.0-20220615051428-39268faee3e6
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9
gopkg.in/yaml.v3 v3.0.0
)

require (
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/txthinking/runnergroup v0.0.0-20210608031112-152c7c4432bf // indirect
github.com/txthinking/x v0.0.0-20210326105829-476fab902fbe // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
457 changes: 4 additions & 453 deletions go.sum

Large diffs are not rendered by default.

64 changes: 54 additions & 10 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,40 @@
package socks5lb

import (
"fmt"
log "github.com/sirupsen/logrus"
"sync"
"sync/atomic"
)

type Pool struct {
backends []*Backend
backends map[string]*Backend
current uint64
}

func (b *Pool) Add(backend *Backend) {
b.backends = append(b.backends, backend)
func (b *Pool) Add(backend *Backend) (err error) {
if b.backends[backend.Addr] != nil {
return fmt.Errorf("%v is already exists, remove it first", backend.Addr)
}

b.backends[backend.Addr] = backend
return
}

func (b *Pool) Remove(addr string) (err error) {
delete(b.backends, addr)
return
}

// AllHealthy returns all healthy backends
func (b *Pool) AllHealthy() (backends []*Backend) {
for _, v := range b.backends {
if v.Alive() {
backends = append(backends, v)
}
}

return
}

func (b *Pool) NextIndex() int {
Expand All @@ -31,20 +54,32 @@ func (b *Pool) NextIndex() int {
// Next returns the next index in the pool if there is one available
// Only supports round-robin operations by default
func (b *Pool) Next() *Backend {

// return healthy backends first
backends := b.AllHealthy()
log.Tracef("found all %d available backends", len(backends))

// can not found any backends available
if len(backends) <= 0 {
return nil
}

// loop entire backends to find out an Alive backend
next := b.NextIndex()
// start from next and move a full cycle
l := len(b.backends) + next
l := len(backends) + next

for i := next; i < l; i++ {
// take an index by modding
idx := i % len(b.backends)
idx := i % len(backends)

// if we have an alive backend, use it and store if its not the original one
if b.backends[idx].Alive() {
if backends[idx].Alive() {
if i != next {
atomic.StoreUint64(&b.current, uint64(idx))
}

return b.backends[idx]
return backends[idx]
}
}

Expand All @@ -63,9 +98,18 @@ func (b *Pool) Check() {
}
}

var (
instance *Pool
once sync.Once
)

// NewPool instance for a new Pools instance
func NewPool() *Pool {
return &Pool{
backends: []*Backend{},
}
once.Do(func() {
instance = &Pool{
backends: make(map[string]*Backend),
}
})

return instance
}
75 changes: 50 additions & 25 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,43 @@ package socks5lb

import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)

func TestPool_HealthCheck(t *testing.T) {
func NewProxyPool(t *testing.T) (pool *Pool, err error) {
pool = NewPool()

pool := Pool{
backends: []*Backend{
{
Addr: "10.0.20.254:1086",
},
{
Addr: "10.0.20.254:1086",
},
{
Addr: "192.168.100.254:1086",
},
{
Addr: "10.0.11.254:1086",
},
{
Addr: "192.168.1.254:1086",
},
{
Addr: "172.16.1.254:1086",
},
},
}

pool.Check("https://www.google.com/robots.txt")
proxies := []string{
"10.0.20.254:1086",
"192.168.100.254:1086",
"192.168.1.254:1086",
"172.16.1.254:1086",
}

for _, v := range proxies {
err = pool.Add(NewBackend(v, BackendCheckConfig{
CheckURL: "https://www.google.com/robots.txt",
Timeout: 5,
InitialAlive: false,
}))
}

for i := 0; i < 100; i++ {
p := NewPool()
assert.Equal(t, &pool, &p, "proxyPool should be singleton")
}

return
}

func TestPool_HealthCheck(t *testing.T) {
pool, err := NewProxyPool(t)
if err != nil {
t.Error(err)
}

pool.Check()
for i := 0; i < 100; i++ {
b := pool.Next()
if b != nil {
Expand All @@ -40,3 +47,21 @@ func TestPool_HealthCheck(t *testing.T) {
}
}
}

func TestPool_NextCheck(t *testing.T) {
pool, err := NewProxyPool(t)
if err != nil {
t.Error(err)
}

for i := 0; i < 1000; i++ {
pool.Add(NewBackend(fmt.Sprintf("%d", i), BackendCheckConfig{
InitialAlive: true,
}))
}

for i := 0; i < 10000; i++ {
next := pool.Next()
assert.NotNil(t, next)
}
}
4 changes: 3 additions & 1 deletion redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

package socks5lb

import "fmt"

// ListenProxy is not implemented by default
func (s *Server) ListenTProxy(_ string) error {
return error.New("sorry transparent proxy is ONLY supports Linux platform")
return fmt.Errorf("sorry transparent proxy is ONLY supports Linux platform")
}
2 changes: 1 addition & 1 deletion socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Server) ListenSocks5(addr string) (err error) {
}

//log.Tracef("[socks5-tcp] %s -> %s", socks5Conn.RemoteAddr(), socks5Conn.LocalAddr())
backendConn, err := net.Dial("tcp", backend.Addr)
backendConn, err := net.Dial("tcp", string(backend.Addr))
if err != nil {
log.Error(err)
return
Expand Down
7 changes: 6 additions & 1 deletion socks5lb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ backends:
check_url: https://www.google.com/robots.txt
initial_alive: true
timeout: 3
- addr: 192.168.1.254:1080
- addr: 192.168.1.254:1086
check_config:
check_url: https://twitter.com/robots.txt
initial_alive: true
timeout: 3
- addr: 172.16.1.254:1086
check_config:
check_url: https://twitter.com/robots.txt
initial_alive: true
Expand Down

0 comments on commit 594aa30

Please sign in to comment.