diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 120915044..5cde179b1 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -2,7 +2,10 @@ # addr (i.e. 'host:port') to listen on for drainer connections # will register this addr into etcd -# addr = "127.0.0.1:8249" +addr = "127.0.0.1:8249" + +# addr(i.e. 'host:port') to advertise to the public +advertise-addr = "" # the interval time (in seconds) of detect pumps' status detect-interval = 10 diff --git a/drainer/config.go b/drainer/config.go index fb34591dc..42e4a212f 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -79,6 +79,7 @@ type Config struct { *flag.FlagSet `json:"-"` LogLevel string `toml:"log-level" json:"log-level"` ListenAddr string `toml:"addr" json:"addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` DataDir string `toml:"data-dir" json:"data-dir"` DetectInterval int `toml:"detect-interval" json:"detect-interval"` EtcdURLs string `toml:"pd-urls" json:"pd-urls"` @@ -109,6 +110,7 @@ func NewConfig() *Config { fs.PrintDefaults() } fs.StringVar(&cfg.ListenAddr, "addr", util.DefaultListenAddr(8249), "addr (i.e. 'host:port') to listen on for drainer connections") + fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", "addr(i.e. 'host:port') to advertise to the public, default to be the same value as -addr") fs.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "drainer data directory path (default data.drainer)") fs.IntVar(&cfg.DetectInterval, "detect-interval", defaultDetectInterval, "the interval time (in seconds) of detect pumps' status") fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints") @@ -224,19 +226,11 @@ func (cfg *Config) configFromFile(path string) error { // validate checks whether the configuration is valid func (cfg *Config) validate() error { - // check ListenAddr - urllis, err := url.Parse(cfg.ListenAddr) - if err != nil { - return errors.Errorf("parse ListenAddr error: %s, %v", cfg.ListenAddr, err) - } - - var host string - if host, _, err = net.SplitHostPort(urllis.Host); err != nil { - return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err) + if err := validateAddr(cfg.ListenAddr); err != nil { + return errors.Annotate(err, "invalid addr") } - - if !util.IsValidateListenHost(host) { - log.Warn("pump may not be able to access drainer using this listen addr config", zap.String("listen addr", host)) + if err := validateAddr(cfg.AdvertiseAddr); err != nil { + return errors.Annotate(err, "invalid advertise-addr") } // check EtcdEndpoints @@ -264,7 +258,9 @@ func (cfg *Config) validate() error { func (cfg *Config) adjustConfig() error { // adjust configuration util.AdjustString(&cfg.ListenAddr, util.DefaultListenAddr(8249)) - cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing + util.AdjustString(&cfg.AdvertiseAddr, cfg.ListenAddr) + cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing + cfg.AdvertiseAddr = "http://" + cfg.AdvertiseAddr // add 'http:' scheme to facilitate parsing util.AdjustString(&cfg.DataDir, defaultDataDir) util.AdjustInt(&cfg.DetectInterval, defaultDetectInterval) @@ -349,3 +345,20 @@ func (cfg *Config) adjustConfig() error { return nil } + +func validateAddr(addr string) error { + urllis, err := url.Parse(addr) + if err != nil { + return errors.Annotatef(err, "failed to parse addr %v", addr) + } + + var host string + if host, _, err = net.SplitHostPort(urllis.Host); err != nil { + return errors.Annotatef(err, "invalid host %v", urllis.Host) + } + + if !util.IsValidateListenHost(host) { + log.Warn("pump may not be able to access drainer using this addr", zap.String("listen addr", addr)) + } + return nil +} diff --git a/drainer/config_test.go b/drainer/config_test.go index 349702f63..5ae65dd83 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -19,6 +19,7 @@ import ( "github.com/coreos/etcd/integration" . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-binlog/pkg/util" ) var testEtcdCluster *integration.ClusterV3 @@ -36,17 +37,19 @@ type testDrainerSuite struct{} func (t *testDrainerSuite) TestConfig(c *C) { args := []string{ - "-metrics-addr", "127.0.0.1:9091", + "-metrics-addr", "192.168.15.10:9091", "-txn-batch", "1", "-data-dir", "data.drainer", "-dest-db-type", "mysql", "-config", "../cmd/drainer/drainer.toml", + "-addr", "192.168.15.10:8257", + "-advertise-addr", "192.168.15.10:8257", } cfg := NewConfig() err := cfg.Parse(args) c.Assert(err, IsNil) - c.Assert(cfg.MetricsAddr, Equals, "127.0.0.1:9091") + c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091") c.Assert(cfg.DataDir, Equals, "data.drainer") c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1) c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql") @@ -61,9 +64,13 @@ func (t *testDrainerSuite) TestValidate(c *C) { cfg.ListenAddr = "http://123:9091" err := cfg.validate() - c.Assert(err, ErrorMatches, ".*ListenAddr.*") + c.Assert(err, ErrorMatches, ".*invalid addr.*") + cfg.ListenAddr = "http://192.168.10.12:9091" + err = cfg.validate() + c.Assert(err, ErrorMatches, ".*invalid advertise-addr.*") + cfg.AdvertiseAddr = "http://192.168.10.12:9091" cfg.EtcdURLs = "127.0.0.1:2379,127.0.0.1:2380" err = cfg.validate() c.Assert(err, ErrorMatches, ".*EtcdURLs.*") @@ -96,4 +103,18 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file") c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1) c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue) + + cfg = NewConfig() + err = cfg.adjustConfig() + c.Assert(err, IsNil) + c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249)) + c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr) + + cfg = NewConfig() + cfg.ListenAddr = "0.0.0.0:8257" + cfg.AdvertiseAddr = "192.168.15.12:8257" + err = cfg.adjustConfig() + c.Assert(err, IsNil) + c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257") + c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257") } diff --git a/drainer/server.go b/drainer/server.go index 27a7653af..a1014d2ee 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -143,9 +143,9 @@ func NewServer(cfg *Config) (*Server, error) { ) } - advURL, err := url.Parse(cfg.ListenAddr) + advURL, err := url.Parse(cfg.AdvertiseAddr) if err != nil { - return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.ListenAddr) + return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.AdvertiseAddr) } status := node.NewStatus(ID, advURL.Host, node.Online, 0, syncer.GetLatestCommitTS(), util.GetApproachTS(latestTS, latestTime))