Skip to content

Commit

Permalink
*(engine): remove some deprecated code
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Oct 20, 2022
1 parent f31668b commit 7f6e8b8
Show file tree
Hide file tree
Showing 11 changed files with 14 additions and 340 deletions.
40 changes: 2 additions & 38 deletions engine/executor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/pingcap/tiflow/engine/pkg/rpcerror"
"github.com/pingcap/tiflow/engine/pkg/tenant"
"github.com/pingcap/tiflow/engine/test"
"github.com/pingcap/tiflow/engine/test/mock"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
Expand Down Expand Up @@ -72,8 +71,7 @@ const (

// Server is an executor server.
type Server struct {
cfg *Config
testCtx *test.Context
cfg *Config

tcpServer tcpserver.TCPServer
grpcSrv *grpc.Server
Expand All @@ -96,13 +94,12 @@ type Server struct {
}

// NewServer creates a new executor server instance
func NewServer(cfg *Config, ctx *test.Context) *Server {
func NewServer(cfg *Config) *Server {
log.Info("creating executor", zap.Stringer("config", cfg))

registerWorkerOnce.Do(registerWorkers)
s := Server{
cfg: cfg,
testCtx: ctx,
jobAPISrv: newJobAPIServer(),
metastores: server.NewMetastoreManager(),
}
Expand Down Expand Up @@ -374,27 +371,6 @@ func (s *Server) Stop() {
// TODO: unregister self from master.
}

func (s *Server) startForTest(ctx context.Context) (err error) {
s.mockSrv, err = mock.NewExecutorServer(s.cfg.Addr, s)
if err != nil {
return err
}

err = s.initClients()
if err != nil {
return err
}
err = s.selfRegister(ctx)
if err != nil {
return err
}
go func() {
err := s.keepHeartbeat(ctx)
log.L().Info("heartbeat quits", zap.Error(err))
}()
return nil
}

func (s *Server) startMsgService(ctx context.Context, wg *errgroup.Group) (err error) {
s.msgServer, err = p2p.NewDependentMessageRPCService(string(s.selfID), nil, s.grpcSrv)
if err != nil {
Expand All @@ -414,10 +390,6 @@ func (s *Server) isReadyToServe() bool {
// Run drives server logic in independent background goroutines, and use error
// group to collect errors.
func (s *Server) Run(ctx context.Context) error {
if test.GetGlobalTestFlag() {
return s.startForTest(ctx)
}

wg, ctx := errgroup.WithContext(ctx)
s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency)
s.taskCommitter = worker.NewTaskCommitter(s.taskRunner, defaultTaskPreDispatchRequestTTL)
Expand Down Expand Up @@ -651,14 +623,6 @@ func (s *Server) selfRegister(ctx context.Context) error {
func (s *Server) keepHeartbeat(ctx context.Context) error {
ticker := time.NewTicker(s.cfg.KeepAliveInterval)
s.lastHearbeatTime = time.Now()
defer func() {
if test.GetGlobalTestFlag() {
s.testCtx.NotifyExecutorChange(&test.ExecutorChangeEvent{
Tp: test.Delete,
Time: time.Now(),
})
}
}()
rl := rate.NewLimiter(rate.Every(time.Second*5), 1 /*burst*/)
for {
select {
Expand Down
6 changes: 3 additions & 3 deletions engine/executor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestStartTCPSrv(t *testing.T) {
require.Nil(t, err)
addr := fmt.Sprintf("127.0.0.1:%d", port)
cfg.Addr = addr
s := NewServer(cfg, nil)
s := NewServer(cfg)

s.grpcSrv = grpc.NewServer()
wg, ctx := errgroup.WithContext(context.Background())
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestCollectMetric(t *testing.T) {
require.Nil(t, err)
addr := fmt.Sprintf("127.0.0.1:%d", port)
cfg.Addr = addr
s := NewServer(cfg, nil)
s := NewServer(cfg)
s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency)

s.grpcSrv = grpc.NewServer()
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestSelfRegister(t *testing.T) {
require.Nil(t, err)
addr := fmt.Sprintf("127.0.0.1:%d", port)
cfg.AdvertiseAddr = addr
s := NewServer(cfg, nil)
s := NewServer(cfg)
mockMasterClient := newMockRegisterMasterClient(10)
s.masterClient = mockMasterClient

Expand Down
2 changes: 1 addition & 1 deletion engine/pkg/cmd/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (o *options) run(cmd *cobra.Command) error {

ticdcutil.LogHTTPProxies()

server := executor.NewServer(o.executorConfig, nil)
server := executor.NewServer(o.executorConfig)

err = server.Run(cmdconetxt.GetDefaultContext())
if err != nil && errors.Cause(err) != context.Canceled {
Expand Down
2 changes: 1 addition & 1 deletion engine/pkg/cmd/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (o *options) run(cmd *cobra.Command) error {

ticdcutil.LogHTTPProxies()

server, err := servermaster.NewServer(o.masterConfig, nil)
server, err := servermaster.NewServer(o.masterConfig)
if err != nil {
return errors.Trace(err)
}
Expand Down
22 changes: 0 additions & 22 deletions engine/pkg/containers/queue.go

This file was deleted.

16 changes: 3 additions & 13 deletions engine/servermaster/executor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
execModel "github.com/pingcap/tiflow/engine/servermaster/executormeta/model"
"github.com/pingcap/tiflow/engine/servermaster/resource"
schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model"
"github.com/pingcap/tiflow/engine/test"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/label"
"go.uber.org/zap"
Expand Down Expand Up @@ -60,9 +59,8 @@ type ExecutorManager interface {

// ExecutorManagerImpl holds all the executors' info, including liveness, status, resource usage.
type ExecutorManagerImpl struct {
testContext *test.Context
wg sync.WaitGroup
metaClient executormeta.Client
wg sync.WaitGroup
metaClient executormeta.Client

mu sync.Mutex
executors map[model.ExecutorID]*Executor
Expand All @@ -77,9 +75,8 @@ type ExecutorManagerImpl struct {
}

// NewExecutorManagerImpl creates a new ExecutorManagerImpl instance
func NewExecutorManagerImpl(metaClient executormeta.Client, initHeartbeatTTL, keepAliveInterval time.Duration, ctx *test.Context) *ExecutorManagerImpl {
func NewExecutorManagerImpl(metaClient executormeta.Client, initHeartbeatTTL, keepAliveInterval time.Duration) *ExecutorManagerImpl {
return &ExecutorManagerImpl{
testContext: ctx,
metaClient: metaClient,
executors: make(map[model.ExecutorID]*Executor),
initHeartbeatTTL: initHeartbeatTTL,
Expand All @@ -104,13 +101,6 @@ func (e *ExecutorManagerImpl) removeExecutorLocked(id model.ExecutorID) error {
e.rescMgr.Unregister(id)
log.Info("notify to offline exec")

if test.GetGlobalTestFlag() {
e.testContext.NotifyExecutorChange(&test.ExecutorChangeEvent{
Tp: test.Delete,
Time: time.Now(),
})
}

e.notifier.Notify(model.ExecutorStatusChange{
ID: id,
Tp: model.EventExecutorOffline,
Expand Down
4 changes: 2 additions & 2 deletions engine/servermaster/executor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestExecutorManager(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
heartbeatTTL := time.Millisecond * 100
checkInterval := time.Millisecond * 10
mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval, nil)
mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval)

// register an executor server
executorAddr := "127.0.0.1:10001"
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestExecutorManagerWatch(t *testing.T) {
heartbeatTTL := time.Millisecond * 400
checkInterval := time.Millisecond * 50
ctx, cancel := context.WithCancel(context.Background())
mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval, nil)
mgr := NewExecutorManagerImpl(metaClient, heartbeatTTL, checkInterval)

// register an executor server
executorAddr := "127.0.0.1:10001"
Expand Down
26 changes: 2 additions & 24 deletions engine/servermaster/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pingcap/tiflow/engine/servermaster/scheduler"
schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model"
"github.com/pingcap/tiflow/engine/servermaster/serverutil"
"github.com/pingcap/tiflow/engine/test"
"github.com/pingcap/tiflow/engine/test/mock"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/label"
Expand Down Expand Up @@ -121,8 +120,6 @@ type Server struct {
// mocked server for test
mockGrpcServer mock.GrpcServer

testCtx *test.Context

// framework metastore client
frameMetaClient pkgOrm.Client
frameworkClientConn metaModel.ClientConn
Expand Down Expand Up @@ -155,7 +152,7 @@ func newServerMasterMetric() *serverMasterMetric {
}

// NewServer creates a new master-server.
func NewServer(cfg *Config, ctx *test.Context) (*Server, error) {
func NewServer(cfg *Config) (*Server, error) {
log.Info("creating server master", zap.Stringer("config", cfg))

id := generateNodeID(cfg.Name)
Expand All @@ -166,7 +163,6 @@ func NewServer(cfg *Config, ctx *test.Context) (*Server, error) {
id: id,
cfg: cfg,
leaderInitialized: *atomic.NewBool(false),
testCtx: ctx,
leader: atomic.Value{},
masterCli: &rpcutil.LeaderClientWithLock[multiClient]{},
msgService: msgService,
Expand Down Expand Up @@ -460,20 +456,6 @@ func (s *Server) ReportExecutorWorkload(
return &pb.ExecWorkloadResponse{}, nil
}

func (s *Server) startForTest() (err error) {
// TODO: implement mock-etcd and leader election

s.mockGrpcServer, err = mock.NewMasterServer(s.cfg.Addr, s)
if err != nil {
return err
}

// TODO: start job manager
s.leader.Store(&rpcutil.Member{Name: s.name(), IsLeader: true})
s.leaderInitialized.Store(true)
return
}

// Stop and clean resources.
// TODO: implement stop gracefully.
func (s *Server) Stop() {
Expand All @@ -500,10 +482,6 @@ func (s *Server) Stop() {

// Run the server master.
func (s *Server) Run(ctx context.Context) error {
if test.GetGlobalTestFlag() {
return s.startForTest()
}

err := s.registerMetaStore(ctx)
if err != nil {
return err
Expand All @@ -523,7 +501,7 @@ func (s *Server) Run(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.executorManager = NewExecutorManagerImpl(executorMetaClient, s.cfg.KeepAliveTTL, s.cfg.KeepAliveInterval, s.testCtx)
s.executorManager = NewExecutorManagerImpl(executorMetaClient, s.cfg.KeepAliveTTL, s.cfg.KeepAliveInterval)

// ResourceManagerService should be initialized after registerMetaStore.
// FIXME: We should do these work inside NewServer.
Expand Down
82 changes: 0 additions & 82 deletions engine/test/context.go

This file was deleted.

Loading

0 comments on commit 7f6e8b8

Please sign in to comment.