From 7f6e8b881a1965351a7bee3b4051ddcf5e0ab18a Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 20 Oct 2022 15:33:22 +0800 Subject: [PATCH] *(engine): remove some deprecated code --- engine/executor/server.go | 40 +----- engine/executor/server_test.go | 6 +- engine/pkg/cmd/executor/executor.go | 2 +- engine/pkg/cmd/master/master.go | 2 +- engine/pkg/containers/queue.go | 22 ---- engine/servermaster/executor_manager.go | 16 +-- engine/servermaster/executor_manager_test.go | 4 +- engine/servermaster/server.go | 26 +--- engine/test/context.go | 82 ------------- engine/test/flag.go | 32 ----- engine/test/util_test.go | 122 ------------------- 11 files changed, 14 insertions(+), 340 deletions(-) delete mode 100644 engine/pkg/containers/queue.go delete mode 100644 engine/test/context.go delete mode 100644 engine/test/flag.go delete mode 100644 engine/test/util_test.go diff --git a/engine/executor/server.go b/engine/executor/server.go index d50380ea6b9..7d199b5ef8c 100644 --- a/engine/executor/server.go +++ b/engine/executor/server.go @@ -44,7 +44,6 @@ import ( "github.com/pingcap/tiflow/engine/pkg/promutil" "github.com/pingcap/tiflow/engine/pkg/rpcerror" "github.com/pingcap/tiflow/engine/pkg/tenant" - "github.com/pingcap/tiflow/engine/test" "github.com/pingcap/tiflow/engine/test/mock" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" @@ -72,8 +71,7 @@ const ( // Server is an executor server. type Server struct { - cfg *Config - testCtx *test.Context + cfg *Config tcpServer tcpserver.TCPServer grpcSrv *grpc.Server @@ -96,13 +94,12 @@ type Server struct { } // NewServer creates a new executor server instance -func NewServer(cfg *Config, ctx *test.Context) *Server { +func NewServer(cfg *Config) *Server { log.Info("creating executor", zap.Stringer("config", cfg)) registerWorkerOnce.Do(registerWorkers) s := Server{ cfg: cfg, - testCtx: ctx, jobAPISrv: newJobAPIServer(), metastores: server.NewMetastoreManager(), } @@ -374,27 +371,6 @@ func (s *Server) Stop() { // TODO: unregister self from master. } -func (s *Server) startForTest(ctx context.Context) (err error) { - s.mockSrv, err = mock.NewExecutorServer(s.cfg.Addr, s) - if err != nil { - return err - } - - err = s.initClients() - if err != nil { - return err - } - err = s.selfRegister(ctx) - if err != nil { - return err - } - go func() { - err := s.keepHeartbeat(ctx) - log.L().Info("heartbeat quits", zap.Error(err)) - }() - return nil -} - func (s *Server) startMsgService(ctx context.Context, wg *errgroup.Group) (err error) { s.msgServer, err = p2p.NewDependentMessageRPCService(string(s.selfID), nil, s.grpcSrv) if err != nil { @@ -414,10 +390,6 @@ func (s *Server) isReadyToServe() bool { // Run drives server logic in independent background goroutines, and use error // group to collect errors. func (s *Server) Run(ctx context.Context) error { - if test.GetGlobalTestFlag() { - return s.startForTest(ctx) - } - wg, ctx := errgroup.WithContext(ctx) s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency) s.taskCommitter = worker.NewTaskCommitter(s.taskRunner, defaultTaskPreDispatchRequestTTL) @@ -651,14 +623,6 @@ func (s *Server) selfRegister(ctx context.Context) error { func (s *Server) keepHeartbeat(ctx context.Context) error { ticker := time.NewTicker(s.cfg.KeepAliveInterval) s.lastHearbeatTime = time.Now() - defer func() { - if test.GetGlobalTestFlag() { - s.testCtx.NotifyExecutorChange(&test.ExecutorChangeEvent{ - Tp: test.Delete, - Time: time.Now(), - }) - } - }() rl := rate.NewLimiter(rate.Every(time.Second*5), 1 /*burst*/) for { select { diff --git a/engine/executor/server_test.go b/engine/executor/server_test.go index 888b4effdd9..e86ad117e8d 100644 --- a/engine/executor/server_test.go +++ b/engine/executor/server_test.go @@ -62,7 +62,7 @@ func TestStartTCPSrv(t *testing.T) { require.Nil(t, err) addr := fmt.Sprintf("127.0.0.1:%d", port) cfg.Addr = addr - s := NewServer(cfg, nil) + s := NewServer(cfg) s.grpcSrv = grpc.NewServer() wg, ctx := errgroup.WithContext(context.Background()) @@ -127,7 +127,7 @@ func TestCollectMetric(t *testing.T) { require.Nil(t, err) addr := fmt.Sprintf("127.0.0.1:%d", port) cfg.Addr = addr - s := NewServer(cfg, nil) + s := NewServer(cfg) s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency) s.grpcSrv = grpc.NewServer() @@ -194,7 +194,7 @@ func TestSelfRegister(t *testing.T) { require.Nil(t, err) addr := fmt.Sprintf("127.0.0.1:%d", port) cfg.AdvertiseAddr = addr - s := NewServer(cfg, nil) + s := NewServer(cfg) mockMasterClient := newMockRegisterMasterClient(10) s.masterClient = mockMasterClient diff --git a/engine/pkg/cmd/executor/executor.go b/engine/pkg/cmd/executor/executor.go index e9490706b3d..bb793c5197e 100644 --- a/engine/pkg/cmd/executor/executor.go +++ b/engine/pkg/cmd/executor/executor.go @@ -76,7 +76,7 @@ func (o *options) run(cmd *cobra.Command) error { ticdcutil.LogHTTPProxies() - server := executor.NewServer(o.executorConfig, nil) + server := executor.NewServer(o.executorConfig) err = server.Run(cmdconetxt.GetDefaultContext()) if err != nil && errors.Cause(err) != context.Canceled { diff --git a/engine/pkg/cmd/master/master.go b/engine/pkg/cmd/master/master.go index cfbdeaca1d2..729d19fbf97 100644 --- a/engine/pkg/cmd/master/master.go +++ b/engine/pkg/cmd/master/master.go @@ -76,7 +76,7 @@ func (o *options) run(cmd *cobra.Command) error { ticdcutil.LogHTTPProxies() - server, err := servermaster.NewServer(o.masterConfig, nil) + server, err := servermaster.NewServer(o.masterConfig) if err != nil { return errors.Trace(err) } diff --git a/engine/pkg/containers/queue.go b/engine/pkg/containers/queue.go deleted file mode 100644 index c93aec63442..00000000000 --- a/engine/pkg/containers/queue.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package containers - -// Queue abstracts a generics FIFO queue, which is thread-safe -type Queue[T any] interface { - Push(elem T) - Pop() (T, bool) - Peek() (T, bool) - Size() int -} diff --git a/engine/servermaster/executor_manager.go b/engine/servermaster/executor_manager.go index af71db33582..0990335bc1a 100644 --- a/engine/servermaster/executor_manager.go +++ b/engine/servermaster/executor_manager.go @@ -27,7 +27,6 @@ import ( execModel "github.com/pingcap/tiflow/engine/servermaster/executormeta/model" "github.com/pingcap/tiflow/engine/servermaster/resource" schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model" - "github.com/pingcap/tiflow/engine/test" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/label" "go.uber.org/zap" @@ -60,9 +59,8 @@ type ExecutorManager interface { // ExecutorManagerImpl holds all the executors' info, including liveness, status, resource usage. type ExecutorManagerImpl struct { - testContext *test.Context - wg sync.WaitGroup - metaClient executormeta.Client + wg sync.WaitGroup + metaClient executormeta.Client mu sync.Mutex executors map[model.ExecutorID]*Executor @@ -77,9 +75,8 @@ type ExecutorManagerImpl struct { } // NewExecutorManagerImpl creates a new ExecutorManagerImpl instance -func NewExecutorManagerImpl(metaClient executormeta.Client, initHeartbeatTTL, keepAliveInterval time.Duration, ctx *test.Context) *ExecutorManagerImpl { +func NewExecutorManagerImpl(metaClient executormeta.Client, initHeartbeatTTL, keepAliveInterval time.Duration) *ExecutorManagerImpl { return &ExecutorManagerImpl{ - testContext: ctx, metaClient: metaClient, executors: make(map[model.ExecutorID]*Executor), initHeartbeatTTL: initHeartbeatTTL, @@ -104,13 +101,6 @@ func (e *ExecutorManagerImpl) removeExecutorLocked(id model.ExecutorID) error { e.rescMgr.Unregister(id) log.Info("notify to offline exec") - if test.GetGlobalTestFlag() { - e.testContext.NotifyExecutorChange(&test.ExecutorChangeEvent{ - Tp: test.Delete, - Time: time.Now(), - }) - } - e.notifier.Notify(model.ExecutorStatusChange{ ID: id, Tp: model.EventExecutorOffline, diff --git a/engine/servermaster/executor_manager_test.go b/engine/servermaster/executor_manager_test.go index 54e464cc618..70515491f0b 100644 --- a/engine/servermaster/executor_manager_test.go +++ b/engine/servermaster/executor_manager_test.go @@ -35,7 +35,7 @@ func TestExecutorManager(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) heartbeatTTL := time.Millisecond * 100 checkInterval := time.Millisecond * 10 - mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval, nil) + mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval) // register an executor server executorAddr := "127.0.0.1:10001" @@ -108,7 +108,7 @@ func TestExecutorManagerWatch(t *testing.T) { heartbeatTTL := time.Millisecond * 400 checkInterval := time.Millisecond * 50 ctx, cancel := context.WithCancel(context.Background()) - mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval, nil) + mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval) // register an executor server executorAddr := "127.0.0.1:10001" diff --git a/engine/servermaster/server.go b/engine/servermaster/server.go index 57e49826cf0..241f1c58650 100644 --- a/engine/servermaster/server.go +++ b/engine/servermaster/server.go @@ -51,7 +51,6 @@ import ( "github.com/pingcap/tiflow/engine/servermaster/scheduler" schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model" "github.com/pingcap/tiflow/engine/servermaster/serverutil" - "github.com/pingcap/tiflow/engine/test" "github.com/pingcap/tiflow/engine/test/mock" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/label" @@ -121,8 +120,6 @@ type Server struct { // mocked server for test mockGrpcServer mock.GrpcServer - testCtx *test.Context - // framework metastore client frameMetaClient pkgOrm.Client frameworkClientConn metaModel.ClientConn @@ -155,7 +152,7 @@ func newServerMasterMetric() *serverMasterMetric { } // NewServer creates a new master-server. -func NewServer(cfg *Config, ctx *test.Context) (*Server, error) { +func NewServer(cfg *Config) (*Server, error) { log.Info("creating server master", zap.Stringer("config", cfg)) id := generateNodeID(cfg.Name) @@ -166,7 +163,6 @@ func NewServer(cfg *Config, ctx *test.Context) (*Server, error) { id: id, cfg: cfg, leaderInitialized: *atomic.NewBool(false), - testCtx: ctx, leader: atomic.Value{}, masterCli: &rpcutil.LeaderClientWithLock[multiClient]{}, msgService: msgService, @@ -460,20 +456,6 @@ func (s *Server) ReportExecutorWorkload( return &pb.ExecWorkloadResponse{}, nil } -func (s *Server) startForTest() (err error) { - // TODO: implement mock-etcd and leader election - - s.mockGrpcServer, err = mock.NewMasterServer(s.cfg.Addr, s) - if err != nil { - return err - } - - // TODO: start job manager - s.leader.Store(&rpcutil.Member{Name: s.name(), IsLeader: true}) - s.leaderInitialized.Store(true) - return -} - // Stop and clean resources. // TODO: implement stop gracefully. func (s *Server) Stop() { @@ -500,10 +482,6 @@ func (s *Server) Stop() { // Run the server master. func (s *Server) Run(ctx context.Context) error { - if test.GetGlobalTestFlag() { - return s.startForTest() - } - err := s.registerMetaStore(ctx) if err != nil { return err @@ -523,7 +501,7 @@ func (s *Server) Run(ctx context.Context) error { if err != nil { return errors.Trace(err) } - s.executorManager = NewExecutorManagerImpl(executorMetaClient, s.cfg.KeepAliveTTL, s.cfg.KeepAliveInterval, s.testCtx) + s.executorManager = NewExecutorManagerImpl(executorMetaClient, s.cfg.KeepAliveTTL, s.cfg.KeepAliveInterval) // ResourceManagerService should be initialized after registerMetaStore. // FIXME: We should do these work inside NewServer. diff --git a/engine/test/context.go b/engine/test/context.go deleted file mode 100644 index 243a84b1b69..00000000000 --- a/engine/test/context.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package test - -import ( - "context" - "time" -) - -// ExecutorChangeType defines executor change type, used in test only -type ExecutorChangeType int - -// Defines all ExecutorChangeType -const ( - Delete ExecutorChangeType = iota -) - -// ExecutorChangeEvent contains executor change type and time -type ExecutorChangeEvent struct { - Tp ExecutorChangeType - Time time.Time -} - -// Context is used in test only, containing some essential data structure -type Context struct { - executorChangeCh chan *ExecutorChangeEvent - dataCh chan interface{} -} - -// NewContext creates a new Context instance -func NewContext() *Context { - return &Context{ - executorChangeCh: make(chan *ExecutorChangeEvent, 1024), - dataCh: make(chan interface{}, 128), - } -} - -// ExecutorChange returns the notify channel of executor change -func (c *Context) ExecutorChange() <-chan *ExecutorChangeEvent { - return c.executorChangeCh -} - -// NotifyExecutorChange adds notification to executor change channel -func (c *Context) NotifyExecutorChange(event *ExecutorChangeEvent) { - c.executorChangeCh <- event -} - -// SendRecord adds data to data channel -func (c *Context) SendRecord(data interface{}) { - c.dataCh <- data -} - -// RecvRecord receives data from data channel in a blocking way -func (c *Context) RecvRecord(ctx context.Context) interface{} { - select { - case data := <-c.dataCh: - return data - case <-ctx.Done(): - return nil - } -} - -// TryRecvRecord tries to receive one record from data channel -func (c *Context) TryRecvRecord() interface{} { - select { - case data := <-c.dataCh: - return data - default: - return nil - } -} diff --git a/engine/test/flag.go b/engine/test/flag.go deleted file mode 100644 index 5aa26049436..00000000000 --- a/engine/test/flag.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package test - -import ( - "go.uber.org/atomic" -) - -// globalTestFlag indicates if this program is in test mode. -// If so, we use mock-grpc rather than a real one. -var globalTestFlag = *atomic.NewBool(false) - -// GetGlobalTestFlag returns the value of global test flag -func GetGlobalTestFlag() bool { - return globalTestFlag.Load() -} - -// SetGlobalTestFlag sets global test flag to given value -func SetGlobalTestFlag(val bool) { - globalTestFlag.Store(val) -} diff --git a/engine/test/util_test.go b/engine/test/util_test.go deleted file mode 100644 index 1e2a0b5cc77..00000000000 --- a/engine/test/util_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package test_test - -import ( - "context" - "fmt" - "time" - - "github.com/phayes/freeport" - . "github.com/pingcap/check" - "github.com/pingcap/tiflow/engine/executor" - "github.com/pingcap/tiflow/engine/servermaster" - "github.com/pingcap/tiflow/engine/test" - "github.com/pingcap/tiflow/engine/test/mock" -) - -// TODO: support multi master / executor -type MiniCluster struct { - master *servermaster.Server - masterCancel func() - - exec *executor.Server - execCancel func() -} - -func (c *MiniCluster) CreateMaster(cfg *servermaster.Config) (*test.Context, error) { - masterCtx := test.NewContext() - master, err := servermaster.NewServer(cfg, masterCtx) - c.master = master - return masterCtx, err -} - -func (c *MiniCluster) AsyncStartMaster() error { - ctx := context.Background() - masterCtx, masterCancel := context.WithCancel(ctx) - err := c.master.Run(masterCtx) - c.masterCancel = masterCancel - return err -} - -func (c *MiniCluster) CreateExecutor(cfg *executor.Config) *test.Context { - execContext := test.NewContext() - exec := executor.NewServer(cfg, execContext) - c.exec = exec - return execContext -} - -func (c *MiniCluster) AsyncStartExector() error { - ctx := context.Background() - execCtx, execCancel := context.WithCancel(ctx) - err := c.exec.Run(execCtx) - c.execCancel = execCancel - return err -} - -func (c *MiniCluster) StopExec() { - c.execCancel() - c.exec.Stop() -} - -func (c *MiniCluster) StopMaster() { - c.masterCancel() - c.master.Stop() -} - -// Start 1 master 1 executor. -func (c *MiniCluster) Start1M1E(cc *C) ( - masterAddr string, workerAddr string, - masterCtx *test.Context, workerCtx *test.Context, -) { - ports, err := freeport.GetFreePorts(2) - cc.Assert(err, IsNil) - masterAddr = fmt.Sprintf("127.0.0.1:%d", ports[0]) - workerAddr = fmt.Sprintf("127.0.0.1:%d", ports[1]) - masterCfg := &servermaster.Config{ - Addr: masterAddr, - AdvertiseAddr: masterAddr, - KeepAliveTTL: 20000000 * time.Second, - KeepAliveInterval: 200 * time.Millisecond, - RPCTimeout: time.Second, - } - // one master + one executor - executorCfg := &executor.Config{ - Join: masterAddr, - Addr: workerAddr, - AdvertiseAddr: workerAddr, - KeepAliveTTL: 20000000 * time.Second, - KeepAliveInterval: 200 * time.Millisecond, - RPCTimeout: time.Second, - } - - masterCtx, err = c.CreateMaster(masterCfg) - cc.Assert(err, IsNil) - workerCtx = c.CreateExecutor(executorCfg) - // Start cluster - err = c.AsyncStartMaster() - cc.Assert(err, IsNil) - - err = c.AsyncStartExector() - cc.Assert(err, IsNil) - - time.Sleep(2 * time.Second) - return -} - -func (c *MiniCluster) StopCluster() { - c.StopExec() - c.StopMaster() - mock.ResetGrpcCtx() -}