Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config advertise-addr for drainer #634

Merged
merged 9 commits into from
Jun 18, 2019
Merged
5 changes: 4 additions & 1 deletion cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

# addr (i.e. 'host:port') to listen on for drainer connections
# will register this addr into etcd
# addr = "127.0.0.1:8249"
addr = "127.0.0.1:8249"

# addr(i.e. 'host:port') to advertise to the public
advertise-addr = ""

# the interval time (in seconds) of detect pumps' status
detect-interval = 10
Expand Down
39 changes: 26 additions & 13 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Config struct {
*flag.FlagSet `json:"-"`
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
DataDir string `toml:"data-dir" json:"data-dir"`
DetectInterval int `toml:"detect-interval" json:"detect-interval"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
Expand Down Expand Up @@ -109,6 +110,7 @@ func NewConfig() *Config {
fs.PrintDefaults()
}
fs.StringVar(&cfg.ListenAddr, "addr", util.DefaultListenAddr(8249), "addr (i.e. 'host:port') to listen on for drainer connections")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", "addr(i.e. 'host:port') to advertise to the public, default to be the same value as -addr")
fs.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "drainer data directory path (default data.drainer)")
fs.IntVar(&cfg.DetectInterval, "detect-interval", defaultDetectInterval, "the interval time (in seconds) of detect pumps' status")
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints")
Expand Down Expand Up @@ -224,19 +226,11 @@ func (cfg *Config) configFromFile(path string) error {

// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check ListenAddr
urllis, err := url.Parse(cfg.ListenAddr)
if err != nil {
return errors.Errorf("parse ListenAddr error: %s, %v", cfg.ListenAddr, err)
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err)
if err := validateAddr(cfg.ListenAddr); err != nil {
return errors.Annotate(err, "invalid addr")
}

if !util.IsValidateListenHost(host) {
log.Warn("pump may not be able to access drainer using this listen addr config", zap.String("listen addr", host))
if err := validateAddr(cfg.AdvertiseAddr); err != nil {
return errors.Annotate(err, "invalid advertise-addr")
}

// check EtcdEndpoints
Expand Down Expand Up @@ -264,7 +258,9 @@ func (cfg *Config) validate() error {
func (cfg *Config) adjustConfig() error {
// adjust configuration
util.AdjustString(&cfg.ListenAddr, util.DefaultListenAddr(8249))
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
util.AdjustString(&cfg.AdvertiseAddr, cfg.ListenAddr)
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
cfg.AdvertiseAddr = "http://" + cfg.AdvertiseAddr // add 'http:' scheme to facilitate parsing
util.AdjustString(&cfg.DataDir, defaultDataDir)
util.AdjustInt(&cfg.DetectInterval, defaultDetectInterval)

Expand Down Expand Up @@ -349,3 +345,20 @@ func (cfg *Config) adjustConfig() error {

return nil
}

func validateAddr(addr string) error {
urllis, err := url.Parse(addr)
if err != nil {
return errors.Annotatef(err, "failed to parse addr %v", addr)
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Annotatef(err, "invalid host %v", urllis.Host)
}

if !util.IsValidateListenHost(host) {
log.Warn("pump may not be able to access drainer using this addr", zap.String("listen addr", addr))
}
return nil
}
27 changes: 24 additions & 3 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/coreos/etcd/integration"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
)

var testEtcdCluster *integration.ClusterV3
Expand All @@ -36,17 +37,19 @@ type testDrainerSuite struct{}

func (t *testDrainerSuite) TestConfig(c *C) {
args := []string{
"-metrics-addr", "127.0.0.1:9091",
"-metrics-addr", "192.168.15.10:9091",
"-txn-batch", "1",
"-data-dir", "data.drainer",
"-dest-db-type", "mysql",
"-config", "../cmd/drainer/drainer.toml",
"-addr", "192.168.15.10:8257",
"-advertise-addr", "192.168.15.10:8257",
}

cfg := NewConfig()
err := cfg.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg.MetricsAddr, Equals, "127.0.0.1:9091")
c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091")
c.Assert(cfg.DataDir, Equals, "data.drainer")
c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql")
Expand All @@ -61,9 +64,13 @@ func (t *testDrainerSuite) TestValidate(c *C) {

cfg.ListenAddr = "http://123:9091"
err := cfg.validate()
c.Assert(err, ErrorMatches, ".*ListenAddr.*")
c.Assert(err, ErrorMatches, ".*invalid addr.*")

cfg.ListenAddr = "http://192.168.10.12:9091"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*invalid advertise-addr.*")

cfg.AdvertiseAddr = "http://192.168.10.12:9091"
cfg.EtcdURLs = "127.0.0.1:2379,127.0.0.1:2380"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*EtcdURLs.*")
Expand Down Expand Up @@ -96,4 +103,18 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue)

cfg = NewConfig()
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249))
c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr)

cfg = NewConfig()
cfg.ListenAddr = "0.0.0.0:8257"
cfg.AdvertiseAddr = "192.168.15.12:8257"
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257")
c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257")
}
4 changes: 2 additions & 2 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func NewServer(cfg *Config) (*Server, error) {
)
}

advURL, err := url.Parse(cfg.ListenAddr)
advURL, err := url.Parse(cfg.AdvertiseAddr)
if err != nil {
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.ListenAddr)
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.AdvertiseAddr)
}

status := node.NewStatus(ID, advURL.Host, node.Online, 0, syncer.GetLatestCommitTS(), util.GetApproachTS(latestTS, latestTime))
Expand Down