Skip to content

Commit

Permalink
cfg: support hot reload cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Sep 5, 2021
1 parent ac23007 commit c1822ca
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 45 deletions.
146 changes: 107 additions & 39 deletions cmd/ehco/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -155,73 +156,121 @@ func main() {

func start(ctx *cli.Context) error {
cfg := loadConfig()
initTls(cfg)

if cfg.WebPort > 0 {
go web.StartWebServer(cfg)
}
return startAndWatchRelayServers(cfg.Configs)
return startAndWatchRelayServers(cfg)
}

func startAndWatchRelayServers(relayConfigList []config.RelayConfig) error {
// relay name -> relay
relayM := make(map[string]*relay.Relay)
for idx := range relayConfigList {
r, err := relay.NewRelay(&relayConfigList[idx])
if err != nil {
return err
func startAndWatchRelayServers(cfg *config.Config) error {
// init main ctx
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// relay ListenAddress -> relay
var relayM sync.Map

// func used to start one relay
var startOneRelayFunc = func(r *relay.Relay) {
addr := r.ListenAddress
relayM.Store(addr, r)
if err := r.ListenAndServe(); err != nil && !errors.Is(err, net.ErrClosed) {
logger.Errorf("[relay] Name=%s ListenAndServe err=%s", r.Name, err)
}
relayM[r.Name] = r
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name := range relayM {
go func(name string) {
err := relayM[name].ListenAndServe()
if err != nil {
if _, ok := err.(*net.OpError); !ok {
logger.Errorf("[relay] name=%s ListenAndServe err=%s", name, err)
}
}
cancel()
}(name)
var stopOneRelayFunc = func(r *relay.Relay) {
addr := r.ListenAddress
r.Close()
relayM.Delete(addr)
}

// init relay map
for idx := range cfg.Configs {
r, err := relay.NewRelay(&cfg.Configs[idx])
if err != nil {
return err
}
go startOneRelayFunc(r)
}
// wg to control sub goroutine
var wg sync.WaitGroup

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

var wg sync.WaitGroup
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()

select {
case <-ctx.Done():
logger.Info("Relay server exit")
logger.Info("ctx cancelled relay server exit")
case <-sigs:
for name := range relayM {
logger.Infof("[relay] Stop %s ...", name)
relayM[name].Close()
relayM.Range(func(key, value interface{}) bool {
r := value.(*relay.Relay)
r.Close()
return true
})
cancel()
}
}(ctx)

// start reload loop
reloadCH := make(chan os.Signal, 1)
signal.Notify(reloadCH, syscall.SIGHUP)
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-reloadCH:
logger.Info("[cfg-reload] Got A HUP Signal! Now Reloading Conf ...")
newCfg := loadConfig()

var newRelayAddrList []string
for idx := range newCfg.Configs {
r, err := relay.NewRelay(&newCfg.Configs[idx])
if err != nil {
logger.Fatalf("[cfg-reload] reload new relay failed err=%s", err.Error())
}
newRelayAddrList = append(newRelayAddrList, r.ListenAddress)

// reload old relay
if oldR, ok := relayM.Load(r.ListenAddress); ok {
oldR := oldR.(*relay.Relay)
if oldR.ListenAddress != r.ListenAddress {
logger.Infof("[cfg-reload] close old relay name=%s", oldR.Name)
stopOneRelayFunc(oldR)
go startOneRelayFunc(r)
}
continue // no need to reload
}
// start bread new relay that not in old relayM
logger.Infof("[cfg-reload] starr new relay name=%s", r.Name)
go startOneRelayFunc(r)
}
// closed relay not in new config
relayM.Range(func(key, value interface{}) bool {
oldAddr := key.(string)
if !InArray(oldAddr, newRelayAddrList) {
v, _ := relayM.Load(oldAddr)
oldR := v.(*relay.Relay)
stopOneRelayFunc(oldR)
}
return true
})
}
}
}(ctx)

//TODO refine this
web.EhcoAlive.Set(web.EhcoAliveStateRunning)
wg.Wait()
return nil
}

func initTls(cfg *config.Config) {
for _, cfg := range cfg.Configs {
if cfg.ListenType == constant.Listen_WSS || cfg.ListenType == constant.Listen_MWSS ||
cfg.TransportType == constant.Transport_WSS || cfg.TransportType == constant.Transport_MWSS {
tls.InitTlsCfg()
break
}
}
}

func loadConfig() (cfg *config.Config) {
if ConfigPath != "" {
cfg = config.NewConfigByPath(ConfigPath)
Expand All @@ -245,5 +294,24 @@ func loadConfig() (cfg *config.Config) {
},
}
}

// init tls
for _, cfg := range cfg.Configs {
if cfg.ListenType == constant.Listen_WSS || cfg.ListenType == constant.Listen_MWSS ||
cfg.TransportType == constant.Transport_WSS || cfg.TransportType == constant.Transport_MWSS {
tls.InitTlsCfg()
break
}
}

return cfg
}

func InArray(ele string, array []string) bool {
for _, v := range array {
if v == ele {
return true
}
}
return false
}
2 changes: 1 addition & 1 deletion config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"web_port": 9000,
"web_token": "",
"enable_ping": true,
"enable_ping": false,

"relay_configs": [
{
Expand Down
10 changes: 6 additions & 4 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type Relay struct {
closeTcpF func() error
closeUdpF func() error

Name string
Name string
ListenAddress string
}

func NewRelay(cfg *config.RelayConfig) (*Relay, error) {
Expand Down Expand Up @@ -73,8 +74,9 @@ func NewRelay(cfg *config.RelayConfig) (*Relay, error) {
lb.NewRoundRobin(udpNodeList),
),
}
r.Name = fmt.Sprintf("[At=%s Over=%s To=%s Through=%s]",
r.LocalTCPAddr, r.ListenType, r.cfg.TCPRemotes, r.TransportType)
r.ListenAddress = r.LocalTCPAddr.String()
r.Name = fmt.Sprintf("[At=%s Over=%s TCP-To=%s UDP-To=%s Through=%s]",
r.LocalTCPAddr, r.ListenType, r.cfg.TCPRemotes, r.cfg.UDPRemotes, r.TransportType)
return r, nil
}

Expand Down Expand Up @@ -108,6 +110,7 @@ func (r *Relay) ListenAndServe() error {
}

func (r *Relay) Close() {
logger.Infof("[relay] Close relay %s", r.Name)
if r.closeUdpF != nil {
err := r.closeUdpF()
if err != nil {
Expand All @@ -120,7 +123,6 @@ func (r *Relay) Close() {
logger.Errorf(err.Error())
}
}

}

func (r *Relay) RunLocalTCPServer() error {
Expand Down
3 changes: 3 additions & 0 deletions internal/tls/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ var (
)

func InitTlsCfg() {
if DefaultTLSConfig != nil {
return
}
cert, err := genCertificate()
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/transporter/wss.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ func (s *Wss) HandleTCPConn(c *net.TCPConn) error {
return err
}
defer wsc.Close()
logger.Infof("[ws] HandleTCPConn from %s to %s", c.LocalAddr().String(), remote.Label)
logger.Infof("[wss] HandleTCPConn from %s to %s", c.LocalAddr().String(), remote.Label)
return transportWithDeadline(c, wsc, remote.Label)
}

0 comments on commit c1822ca

Please sign in to comment.