From 18603c9a46abcc78a3e9b34e32efef379ed07dbc Mon Sep 17 00:00:00 2001 From: Dreamacro <305009791@qq.com> Date: Sun, 26 Apr 2020 22:38:15 +0800 Subject: [PATCH] Improve: provider can be auto GC --- adapters/provider/fetcher.go | 154 ++++++++++++++++++++++++ adapters/provider/provider.go | 215 ++++++++++------------------------ config/config.go | 9 -- hub/executor/executor.go | 7 -- 4 files changed, 218 insertions(+), 167 deletions(-) create mode 100644 adapters/provider/fetcher.go diff --git a/adapters/provider/fetcher.go b/adapters/provider/fetcher.go new file mode 100644 index 0000000000..aeaf1f4257 --- /dev/null +++ b/adapters/provider/fetcher.go @@ -0,0 +1,154 @@ +package provider + +import ( + "bytes" + "crypto/md5" + "io/ioutil" + "os" + "time" + + "github.com/Dreamacro/clash/log" +) + +var ( + fileMode os.FileMode = 0666 +) + +type parser = func([]byte) (interface{}, error) + +type fetcher struct { + name string + vehicle Vehicle + updatedAt *time.Time + ticker *time.Ticker + hash [16]byte + parser parser + onUpdate func(interface{}) +} + +func (f *fetcher) Name() string { + return f.name +} + +func (f *fetcher) VehicleType() VehicleType { + return f.vehicle.Type() +} + +func (f *fetcher) Initial() (interface{}, error) { + var buf []byte + var err error + var isLocal bool + if stat, err := os.Stat(f.vehicle.Path()); err == nil { + buf, err = ioutil.ReadFile(f.vehicle.Path()) + modTime := stat.ModTime() + f.updatedAt = &modTime + isLocal = true + } else { + buf, err = f.vehicle.Read() + } + + if err != nil { + return nil, err + } + + proxies, err := f.parser(buf) + if err != nil { + if !isLocal { + return nil, err + } + + // parse local file error, fallback to remote + buf, err = f.vehicle.Read() + if err != nil { + return nil, err + } + + proxies, err = f.parser(buf) + if err != nil { + return nil, err + } + } + + if err := ioutil.WriteFile(f.vehicle.Path(), buf, fileMode); err != nil { + return nil, err + } + + f.hash = md5.Sum(buf) + + // pull proxies automatically + if f.ticker != nil { + go f.pullLoop() + } + + return proxies, nil +} + +func (f *fetcher) Update() (interface{}, bool, error) { + buf, err := f.vehicle.Read() + if err != nil { + return nil, false, err + } + + now := time.Now() + hash := md5.Sum(buf) + if bytes.Equal(f.hash[:], hash[:]) { + f.updatedAt = &now + return nil, true, nil + } + + proxies, err := f.parser(buf) + if err != nil { + return nil, false, err + } + + if err := ioutil.WriteFile(f.vehicle.Path(), buf, fileMode); err != nil { + return nil, false, err + } + + f.updatedAt = &now + f.hash = hash + + return proxies, false, nil +} + +func (f *fetcher) Destroy() error { + if f.ticker != nil { + f.ticker.Stop() + } + return nil +} + +func (f *fetcher) pullLoop() { + for range f.ticker.C { + elm, same, err := f.Update() + if err != nil { + log.Warnln("[Provider] %s pull error: %s", f.Name(), err.Error()) + continue + } + + if same { + log.Debugln("[Provider] %s's proxies doesn't change", f.Name()) + continue + } + + log.Infoln("[Provider] %s's proxies update", f.Name()) + if f.onUpdate != nil { + f.onUpdate(elm) + } + } +} + +func newFetcher(name string, interval time.Duration, vehicle Vehicle, parser parser, onUpdate func(interface{})) *fetcher { + var ticker *time.Ticker + if interval != 0 { + ticker = time.NewTicker(interval) + } + + return &fetcher{ + name: name, + ticker: ticker, + vehicle: vehicle, + parser: parser, + onUpdate: onUpdate, + } +} diff --git a/adapters/provider/provider.go b/adapters/provider/provider.go index 8ec4a1089b..4968a48dda 100644 --- a/adapters/provider/provider.go +++ b/adapters/provider/provider.go @@ -1,26 +1,20 @@ package provider import ( - "bytes" - "crypto/md5" "encoding/json" "errors" "fmt" - "io/ioutil" - "os" + "runtime" "time" "github.com/Dreamacro/clash/adapters/outbound" C "github.com/Dreamacro/clash/constant" - "github.com/Dreamacro/clash/log" "gopkg.in/yaml.v2" ) const ( ReservedName = "default" - - fileMode = 0666 ) // Provider Type @@ -49,8 +43,7 @@ type Provider interface { VehicleType() VehicleType Type() ProviderType Initial() error - Reload() error - Destroy() error + Update() error } // ProxyProvider interface @@ -58,24 +51,24 @@ type ProxyProvider interface { Provider Proxies() []C.Proxy HealthCheck() - Update() error } type ProxySchema struct { Proxies []map[string]interface{} `yaml:"proxies"` } +// for auto gc type ProxySetProvider struct { - name string - vehicle Vehicle - hash [16]byte + *proxySetProvider +} + +type proxySetProvider struct { + *fetcher proxies []C.Proxy healthCheck *HealthCheck - ticker *time.Ticker - updatedAt *time.Time } -func (pp *ProxySetProvider) MarshalJSON() ([]byte, error) { +func (pp *proxySetProvider) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]interface{}{ "name": pp.Name(), "type": pp.Type().String(), @@ -85,134 +78,41 @@ func (pp *ProxySetProvider) MarshalJSON() ([]byte, error) { }) } -func (pp *ProxySetProvider) Name() string { +func (pp *proxySetProvider) Name() string { return pp.name } -func (pp *ProxySetProvider) Reload() error { - return nil -} - -func (pp *ProxySetProvider) HealthCheck() { +func (pp *proxySetProvider) HealthCheck() { pp.healthCheck.check() } -func (pp *ProxySetProvider) Update() error { - return pp.pull() -} - -func (pp *ProxySetProvider) Destroy() error { - pp.healthCheck.close() - - if pp.ticker != nil { - pp.ticker.Stop() +func (pp *proxySetProvider) Update() error { + elm, same, err := pp.fetcher.Update() + if err == nil && !same { + pp.onUpdate(elm) } - - return nil + return err } -func (pp *ProxySetProvider) Initial() error { - var buf []byte - var err error - var isLocal bool - if stat, err := os.Stat(pp.vehicle.Path()); err == nil { - buf, err = ioutil.ReadFile(pp.vehicle.Path()) - modTime := stat.ModTime() - pp.updatedAt = &modTime - isLocal = true - } else { - buf, err = pp.vehicle.Read() - } - - if err != nil { - return err - } - - proxies, err := pp.parse(buf) +func (pp *proxySetProvider) Initial() error { + elm, err := pp.fetcher.Initial() if err != nil { - if !isLocal { - return err - } - - // parse local file error, fallback to remote - buf, err = pp.vehicle.Read() - if err != nil { - return err - } - - proxies, err = pp.parse(buf) - if err != nil { - return err - } - } - - if err := ioutil.WriteFile(pp.vehicle.Path(), buf, fileMode); err != nil { return err } - pp.hash = md5.Sum(buf) - pp.setProxies(proxies) - - // pull proxies automatically - if pp.ticker != nil { - go pp.pullLoop() - } - + pp.onUpdate(elm) return nil } -func (pp *ProxySetProvider) VehicleType() VehicleType { - return pp.vehicle.Type() -} - -func (pp *ProxySetProvider) Type() ProviderType { +func (pp *proxySetProvider) Type() ProviderType { return Proxy } -func (pp *ProxySetProvider) Proxies() []C.Proxy { +func (pp *proxySetProvider) Proxies() []C.Proxy { return pp.proxies } -func (pp *ProxySetProvider) pullLoop() { - for range pp.ticker.C { - if err := pp.pull(); err != nil { - log.Warnln("[Provider] %s pull error: %s", pp.Name(), err.Error()) - } - } -} - -func (pp *ProxySetProvider) pull() error { - buf, err := pp.vehicle.Read() - if err != nil { - return err - } - - now := time.Now() - hash := md5.Sum(buf) - if bytes.Equal(pp.hash[:], hash[:]) { - log.Debugln("[Provider] %s's proxies doesn't change", pp.Name()) - pp.updatedAt = &now - return nil - } - - proxies, err := pp.parse(buf) - if err != nil { - return err - } - log.Infoln("[Provider] %s's proxies update", pp.Name()) - - if err := ioutil.WriteFile(pp.vehicle.Path(), buf, fileMode); err != nil { - return err - } - - pp.updatedAt = &now - pp.hash = hash - pp.setProxies(proxies) - - return nil -} - -func (pp *ProxySetProvider) parse(buf []byte) ([]C.Proxy, error) { +func proxiesParse(buf []byte) (interface{}, error) { schema := &ProxySchema{} if err := yaml.Unmarshal(buf, schema); err != nil { @@ -239,38 +139,52 @@ func (pp *ProxySetProvider) parse(buf []byte) ([]C.Proxy, error) { return proxies, nil } -func (pp *ProxySetProvider) setProxies(proxies []C.Proxy) { +func (pp *proxySetProvider) setProxies(proxies []C.Proxy) { pp.proxies = proxies pp.healthCheck.setProxy(proxies) go pp.healthCheck.check() } -func NewProxySetProvider(name string, interval time.Duration, vehicle Vehicle, hc *HealthCheck) *ProxySetProvider { - var ticker *time.Ticker - if interval != 0 { - ticker = time.NewTicker(interval) - } +func stopProxyProvider(pd *ProxySetProvider) { + pd.healthCheck.close() + pd.fetcher.Destroy() +} +func NewProxySetProvider(name string, interval time.Duration, vehicle Vehicle, hc *HealthCheck) *ProxySetProvider { if hc.auto() { go hc.process() } - return &ProxySetProvider{ - name: name, - vehicle: vehicle, + pd := &proxySetProvider{ proxies: []C.Proxy{}, healthCheck: hc, - ticker: ticker, } + + onUpdate := func(elm interface{}) { + ret := elm.([]C.Proxy) + pd.setProxies(ret) + } + + fetcher := newFetcher(name, interval, vehicle, proxiesParse, onUpdate) + pd.fetcher = fetcher + + wrapper := &ProxySetProvider{pd} + runtime.SetFinalizer(wrapper, stopProxyProvider) + return wrapper } +// for auto gc type CompatibleProvider struct { + *compatibleProvider +} + +type compatibleProvider struct { name string healthCheck *HealthCheck proxies []C.Proxy } -func (cp *CompatibleProvider) MarshalJSON() ([]byte, error) { +func (cp *compatibleProvider) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]interface{}{ "name": cp.Name(), "type": cp.Type().String(), @@ -279,43 +193,38 @@ func (cp *CompatibleProvider) MarshalJSON() ([]byte, error) { }) } -func (cp *CompatibleProvider) Name() string { +func (cp *compatibleProvider) Name() string { return cp.name } -func (cp *CompatibleProvider) Reload() error { - return nil -} - -func (cp *CompatibleProvider) Destroy() error { - cp.healthCheck.close() - return nil -} - -func (cp *CompatibleProvider) HealthCheck() { +func (cp *compatibleProvider) HealthCheck() { cp.healthCheck.check() } -func (cp *CompatibleProvider) Update() error { +func (cp *compatibleProvider) Update() error { return nil } -func (cp *CompatibleProvider) Initial() error { +func (cp *compatibleProvider) Initial() error { return nil } -func (cp *CompatibleProvider) VehicleType() VehicleType { +func (cp *compatibleProvider) VehicleType() VehicleType { return Compatible } -func (cp *CompatibleProvider) Type() ProviderType { +func (cp *compatibleProvider) Type() ProviderType { return Proxy } -func (cp *CompatibleProvider) Proxies() []C.Proxy { +func (cp *compatibleProvider) Proxies() []C.Proxy { return cp.proxies } +func stopCompatibleProvider(pd *CompatibleProvider) { + pd.healthCheck.close() +} + func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*CompatibleProvider, error) { if len(proxies) == 0 { return nil, errors.New("Provider need one proxy at least") @@ -325,9 +234,13 @@ func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*Co go hc.process() } - return &CompatibleProvider{ + pd := &compatibleProvider{ name: name, proxies: proxies, healthCheck: hc, - }, nil + } + + wrapper := &CompatibleProvider{pd} + runtime.SetFinalizer(wrapper, stopCompatibleProvider) + return wrapper, nil } diff --git a/config/config.go b/config/config.go index 3c4821959b..0321629272 100644 --- a/config/config.go +++ b/config/config.go @@ -268,15 +268,6 @@ func parseProxies(cfg *RawConfig) (proxies map[string]C.Proxy, providersMap map[ providersConfig = cfg.ProxyProviderOld } - defer func() { - // Destroy already created provider when err != nil - if err != nil { - for _, provider := range providersMap { - provider.Destroy() - } - } - }() - proxies["DIRECT"] = outbound.NewProxy(outbound.NewDirect()) proxies["REJECT"] = outbound.NewProxy(outbound.NewReject()) proxyList = append(proxyList, "DIRECT", "REJECT") diff --git a/hub/executor/executor.go b/hub/executor/executor.go index 5916bfbb26..f073c1ce96 100644 --- a/hub/executor/executor.go +++ b/hub/executor/executor.go @@ -158,13 +158,6 @@ func updateHosts(tree *trie.Trie) { } func updateProxies(proxies map[string]C.Proxy, providers map[string]provider.ProxyProvider) { - oldProviders := tunnel.Providers() - - // close providers goroutine - for _, provider := range oldProviders { - provider.Destroy() - } - tunnel.UpdateProxies(proxies, providers) }