From 5da0f49ba153d20f5be1883e74d0d6c9781de4e8 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Thu, 10 Nov 2022 13:49:52 +0800 Subject: [PATCH] *(engine): clean up deadcode (#7571) ref pingcap/tiflow#6849 --- engine/enginepb/errorv2.pb.go | 164 ---------------------- engine/pkg/workerpool/async_pool.go | 41 ------ engine/pkg/workerpool/async_pool_impl.go | 166 ----------------------- engine/proto/errorv2.proto | 11 -- 4 files changed, 382 deletions(-) delete mode 100644 engine/enginepb/errorv2.pb.go delete mode 100644 engine/pkg/workerpool/async_pool.go delete mode 100644 engine/pkg/workerpool/async_pool_impl.go delete mode 100644 engine/proto/errorv2.proto diff --git a/engine/enginepb/errorv2.pb.go b/engine/enginepb/errorv2.pb.go deleted file mode 100644 index bd5b4f46fd3..00000000000 --- a/engine/enginepb/errorv2.pb.go +++ /dev/null @@ -1,164 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.0 -// protoc v3.20.1 -// source: engine/proto/errorv2.proto - -package enginepb - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type ErrorV2 struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Details []byte `protobuf:"bytes,2,opt,name=details,proto3" json:"details,omitempty"` - StackTrace string `protobuf:"bytes,3,opt,name=stack_trace,json=stackTrace,proto3" json:"stack_trace,omitempty"` -} - -func (x *ErrorV2) Reset() { - *x = ErrorV2{} - if protoimpl.UnsafeEnabled { - mi := &file_engine_proto_errorv2_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ErrorV2) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ErrorV2) ProtoMessage() {} - -func (x *ErrorV2) ProtoReflect() protoreflect.Message { - mi := &file_engine_proto_errorv2_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ErrorV2.ProtoReflect.Descriptor instead. -func (*ErrorV2) Descriptor() ([]byte, []int) { - return file_engine_proto_errorv2_proto_rawDescGZIP(), []int{0} -} - -func (x *ErrorV2) GetName() string { - if x != nil { - return x.Name - } - return "" -} - -func (x *ErrorV2) GetDetails() []byte { - if x != nil { - return x.Details - } - return nil -} - -func (x *ErrorV2) GetStackTrace() string { - if x != nil { - return x.StackTrace - } - return "" -} - -var File_engine_proto_errorv2_proto protoreflect.FileDescriptor - -var file_engine_proto_errorv2_proto_rawDesc = []byte{ - 0x0a, 0x1a, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x76, 0x32, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x65, 0x6e, - 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x22, 0x58, 0x0a, 0x07, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x56, - 0x32, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, - 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x5f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x54, 0x72, 0x61, 0x63, 0x65, - 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, - 0x69, 0x6e, 0x67, 0x63, 0x61, 0x70, 0x2f, 0x74, 0x69, 0x66, 0x6c, 0x6f, 0x77, 0x2f, 0x65, 0x6e, - 0x67, 0x69, 0x6e, 0x65, 0x2f, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_engine_proto_errorv2_proto_rawDescOnce sync.Once - file_engine_proto_errorv2_proto_rawDescData = file_engine_proto_errorv2_proto_rawDesc -) - -func file_engine_proto_errorv2_proto_rawDescGZIP() []byte { - file_engine_proto_errorv2_proto_rawDescOnce.Do(func() { - file_engine_proto_errorv2_proto_rawDescData = protoimpl.X.CompressGZIP(file_engine_proto_errorv2_proto_rawDescData) - }) - return file_engine_proto_errorv2_proto_rawDescData -} - -var file_engine_proto_errorv2_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_engine_proto_errorv2_proto_goTypes = []interface{}{ - (*ErrorV2)(nil), // 0: enginepb.ErrorV2 -} -var file_engine_proto_errorv2_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_engine_proto_errorv2_proto_init() } -func file_engine_proto_errorv2_proto_init() { - if File_engine_proto_errorv2_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_engine_proto_errorv2_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ErrorV2); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_engine_proto_errorv2_proto_rawDesc, - NumEnums: 0, - NumMessages: 1, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_engine_proto_errorv2_proto_goTypes, - DependencyIndexes: file_engine_proto_errorv2_proto_depIdxs, - MessageInfos: file_engine_proto_errorv2_proto_msgTypes, - }.Build() - File_engine_proto_errorv2_proto = out.File - file_engine_proto_errorv2_proto_rawDesc = nil - file_engine_proto_errorv2_proto_goTypes = nil - file_engine_proto_errorv2_proto_depIdxs = nil -} diff --git a/engine/pkg/workerpool/async_pool.go b/engine/pkg/workerpool/async_pool.go deleted file mode 100644 index 2afaa28f226..00000000000 --- a/engine/pkg/workerpool/async_pool.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package workerpool - -import "context" - -// AsyncPool provides a simple Goroutine pool, where the order in which jobs are run is non-deterministic. -type AsyncPool interface { - // Go mimics the semantics of the "go" keyword, with the only difference being the `ctx` parameter, - // which is used to cancel **the submission of task**. - // **All** tasks successfully submitted will be run eventually, as long as Run are called infinitely many times. - // Go might block when the AsyncPool is not running. - Go(ctx context.Context, f func()) error - - // Run runs the AsyncPool. - Run(ctx context.Context) error -} diff --git a/engine/pkg/workerpool/async_pool_impl.go b/engine/pkg/workerpool/async_pool_impl.go deleted file mode 100644 index 7c75c41a3ca..00000000000 --- a/engine/pkg/workerpool/async_pool_impl.go +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package workerpool - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/pingcap/tiflow/pkg/errors" - "golang.org/x/sync/errgroup" -) - -type defaultAsyncPoolImpl struct { - workers []*asyncWorker - nextWorkerID int32 - isRunning int32 - runningLock sync.RWMutex -} - -// NewDefaultAsyncPool creates a new AsyncPool that uses the default implementation -func NewDefaultAsyncPool(numWorkers int) AsyncPool { - return newDefaultAsyncPoolImpl(numWorkers) -} - -func newDefaultAsyncPoolImpl(numWorkers int) *defaultAsyncPoolImpl { - workers := make([]*asyncWorker, numWorkers) - - return &defaultAsyncPoolImpl{ - workers: workers, - } -} - -func (p *defaultAsyncPoolImpl) Go(ctx context.Context, f func()) error { - if p.doGo(ctx, f) == nil { - return nil - } - return errors.New("fail to run") -} - -func (p *defaultAsyncPoolImpl) doGo(ctx context.Context, f func()) error { - p.runningLock.RLock() - defer p.runningLock.RUnlock() - - task := &asyncTask{f: f} - worker := p.workers[int(atomic.AddInt32(&p.nextWorkerID, 1))%len(p.workers)] - - worker.chLock.RLock() - defer worker.chLock.RUnlock() - - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case worker.inputCh <- task: - } - - return nil -} - -func (p *defaultAsyncPoolImpl) Run(ctx context.Context) error { - p.prepare() - errg := errgroup.Group{} - - p.runningLock.Lock() - atomic.StoreInt32(&p.isRunning, 1) - p.runningLock.Unlock() - - defer func() { - p.runningLock.Lock() - atomic.StoreInt32(&p.isRunning, 0) - p.runningLock.Unlock() - }() - - errCh := make(chan error, len(p.workers)) - defer close(errCh) - - for _, worker := range p.workers { - workerFinal := worker - errg.Go(func() error { - err := workerFinal.run() - if err != nil { - errCh <- err - } - return nil - }) - } - - errg.Go(func() error { - var err error - select { - case <-ctx.Done(): - err = ctx.Err() - case err = <-errCh: - } - - for _, worker := range p.workers { - worker.close() - } - - return err - }) - - return errors.Trace(errg.Wait()) -} - -func (p *defaultAsyncPoolImpl) prepare() { - for i := range p.workers { - p.workers[i] = newAsyncWorker() - } -} - -type asyncTask struct { - f func() -} - -type asyncWorker struct { - inputCh chan *asyncTask - isClosed int32 - chLock sync.RWMutex -} - -func newAsyncWorker() *asyncWorker { - return &asyncWorker{inputCh: make(chan *asyncTask, 12800)} -} - -func (w *asyncWorker) run() error { - for { - task := <-w.inputCh - task.f() - } -} - -func (w *asyncWorker) close() { - if atomic.SwapInt32(&w.isClosed, 1) == 1 { - return - } - - w.chLock.Lock() - defer w.chLock.Unlock() - - close(w.inputCh) -} diff --git a/engine/proto/errorv2.proto b/engine/proto/errorv2.proto deleted file mode 100644 index 3e3dd5d8c08..00000000000 --- a/engine/proto/errorv2.proto +++ /dev/null @@ -1,11 +0,0 @@ -syntax = "proto3"; - -package enginepb; - -option go_package = "github.com/pingcap/tiflow/engine/enginepb"; - -message ErrorV2 { - string name = 1; - bytes details = 2; - string stack_trace = 3; -}