From 846d4fa7e80a4694ddc7c9de967cb35e72ba476a Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 17 Dec 2021 17:30:36 +0800 Subject: [PATCH] This is an automated cherry-pick of #3902 Signed-off-by: ti-chi-bot --- .golangci.yml | 41 ++++++ cdc/puller/frontier/list.go | 3 +- cdc/scheduler/util/table_set.go | 211 ++++++++++++++++++++++++++++ cdc/sink/codec/canal.go | 5 +- cdc/sink/common/flow_control.go | 2 +- integration/tests/case_date_time.go | 9 +- pkg/context/context.go | 5 +- pkg/context/context_test.go | 4 + pkg/etcd/etcdkey.go | 2 +- pkg/orchestrator/util/key_utils.go | 3 +- 10 files changed, 278 insertions(+), 7 deletions(-) create mode 100644 cdc/scheduler/util/table_set.go diff --git a/.golangci.yml b/.golangci.yml index b19a99a8dca..870ee6d43d9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -2,3 +2,44 @@ linters: enable: - unconvert - unparam +<<<<<<< HEAD +======= + - revive + - depguard + +linters-settings: + revive: + ignore-generated-header: false + severity: error + confidence: 0.8 + error-code: -1 + warning-code: -1 + rules: + - name: blank-imports + - name: context-as-argument + - name: dot-imports + - name: error-return + - name: error-strings + - name: error-naming + - name: exported + - name: if-return + - name: var-naming + - name: package-comments + - name: range + - name: receiver-naming + - name: indent-error-flow + - name: superfluous-else + - name: modifies-parameter + - name: unreachable-code + + depguard: + list-type: blacklist + include-go-root: false + packages: + - log + - github.com/juju/errors + packages-with-error-message: + # specify an error message to output when a blacklisted package is used + - log: "logging is allowed only by pingcap/log" + - github.com/juju/errors: "error handling is allowed only by pingcap/errors" +>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902)) diff --git a/cdc/puller/frontier/list.go b/cdc/puller/frontier/list.go index 2a14aeafff8..249127d6bfb 100644 --- a/cdc/puller/frontier/list.go +++ b/cdc/puller/frontier/list.go @@ -16,11 +16,12 @@ package frontier import ( "bytes" "fmt" - "log" "math" "strings" _ "unsafe" // required by go:linkname + + "github.com/pingcap/log" ) const ( diff --git a/cdc/scheduler/util/table_set.go b/cdc/scheduler/util/table_set.go new file mode 100644 index 00000000000..963f7a30b1d --- /dev/null +++ b/cdc/scheduler/util/table_set.go @@ -0,0 +1,211 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "go.uber.org/zap" +) + +// TableSet provides a data structure to store the tables' states for the +// scheduler. +type TableSet struct { + // all tables' records + tableIDMap map[model.TableID]*TableRecord + + // a non-unique index to facilitate looking up tables + // assigned to a given capture. + captureIndex map[model.CaptureID]map[model.TableID]*TableRecord +} + +// TableRecord is a record to be inserted into TableSet. +type TableRecord struct { + TableID model.TableID + CaptureID model.CaptureID + Status TableStatus +} + +// Clone returns a copy of the TableSet. +// This method is future-proof in case we add +// something not trivially copyable. +func (r *TableRecord) Clone() *TableRecord { + return &TableRecord{ + TableID: r.TableID, + CaptureID: r.CaptureID, + Status: r.Status, + } +} + +// TableStatus is a type representing the table's replication status. +type TableStatus int32 + +const ( + AddingTable = TableStatus(iota) + 1 + RemovingTable + RunningTable +) + +// NewTableSet creates a new TableSet. +func NewTableSet() *TableSet { + return &TableSet{ + tableIDMap: map[model.TableID]*TableRecord{}, + captureIndex: map[model.CaptureID]map[model.TableID]*TableRecord{}, + } +} + +// AddTableRecord inserts a new TableRecord. +// It returns true if it succeeds. Returns false if there is a duplicate. +func (s *TableSet) AddTableRecord(record *TableRecord) (successful bool) { + if _, ok := s.tableIDMap[record.TableID]; ok { + // duplicate tableID + return false + } + recordCloned := record.Clone() + s.tableIDMap[record.TableID] = recordCloned + + captureIndexEntry := s.captureIndex[record.CaptureID] + if captureIndexEntry == nil { + captureIndexEntry = make(map[model.TableID]*TableRecord) + s.captureIndex[record.CaptureID] = captureIndexEntry + } + + captureIndexEntry[record.TableID] = recordCloned + return true +} + +// UpdateTableRecord updates an existing TableRecord. +// All modifications to a table's status should be done by this method. +func (s *TableSet) UpdateTableRecord(record *TableRecord) (successful bool) { + oldRecord, ok := s.tableIDMap[record.TableID] + if !ok { + // table does not exist + return false + } + + // If there is no need to modify the CaptureID, we simply + // update the record. + if record.CaptureID == oldRecord.CaptureID { + recordCloned := record.Clone() + s.tableIDMap[record.TableID] = recordCloned + s.captureIndex[record.CaptureID][record.TableID] = recordCloned + return true + } + + // If the CaptureID is changed, we do a proper RemoveTableRecord followed + // by AddTableRecord. + if record.CaptureID != oldRecord.CaptureID { + if ok := s.RemoveTableRecord(record.TableID); !ok { + log.Panic("unreachable", zap.Any("record", record)) + } + if ok := s.AddTableRecord(record); !ok { + log.Panic("unreachable", zap.Any("record", record)) + } + } + return true +} + +// GetTableRecord tries to obtain a record with the specified tableID. +func (s *TableSet) GetTableRecord(tableID model.TableID) (*TableRecord, bool) { + rec, ok := s.tableIDMap[tableID] + if ok { + return rec.Clone(), ok + } + return nil, false +} + +// RemoveTableRecord removes the record with tableID. Returns false +// if none exists. +func (s *TableSet) RemoveTableRecord(tableID model.TableID) bool { + record, ok := s.tableIDMap[tableID] + if !ok { + return false + } + delete(s.tableIDMap, record.TableID) + + captureIndexEntry, ok := s.captureIndex[record.CaptureID] + if !ok { + log.Panic("unreachable", zap.Int64("table-id", tableID)) + } + delete(captureIndexEntry, record.TableID) + if len(captureIndexEntry) == 0 { + delete(s.captureIndex, record.CaptureID) + } + return true +} + +// RemoveTableRecordByCaptureID removes all table records associated with +// captureID. +func (s *TableSet) RemoveTableRecordByCaptureID(captureID model.CaptureID) []*TableRecord { + captureIndexEntry, ok := s.captureIndex[captureID] + if !ok { + return nil + } + + var ret []*TableRecord + for tableID, record := range captureIndexEntry { + delete(s.tableIDMap, tableID) + // Since the record has been removed, + // there is no need to clone it before returning. + ret = append(ret, record) + } + delete(s.captureIndex, captureID) + return ret +} + +// CountTableByCaptureID counts the number of tables associated with the captureID. +func (s *TableSet) CountTableByCaptureID(captureID model.CaptureID) int { + return len(s.captureIndex[captureID]) +} + +// GetDistinctCaptures counts distinct captures with tables. +func (s *TableSet) GetDistinctCaptures() []model.CaptureID { + var ret []model.CaptureID + for captureID := range s.captureIndex { + ret = append(ret, captureID) + } + return ret +} + +// GetAllTables returns all stored information on all tables. +func (s *TableSet) GetAllTables() map[model.TableID]*TableRecord { + ret := make(map[model.TableID]*TableRecord) + for tableID, record := range s.tableIDMap { + ret[tableID] = record.Clone() + } + return ret +} + +// GetAllTablesGroupedByCaptures returns all stored information grouped by associated CaptureID. +func (s *TableSet) GetAllTablesGroupedByCaptures() map[model.CaptureID]map[model.TableID]*TableRecord { + ret := make(map[model.CaptureID]map[model.TableID]*TableRecord) + for captureID, tableIDMap := range s.captureIndex { + tableIDMapCloned := make(map[model.TableID]*TableRecord) + for tableID, record := range tableIDMap { + tableIDMapCloned[tableID] = record.Clone() + } + ret[captureID] = tableIDMapCloned + } + return ret +} + +// CountTableByStatus counts the number of tables with the given status. +func (s *TableSet) CountTableByStatus(status TableStatus) (count int) { + for _, record := range s.tableIDMap { + if record.Status == status { + count++ + } + } + return +} diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 482637bc854..6a3eff052e4 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -15,15 +15,18 @@ package codec import ( "fmt" - "log" "strconv" "strings" "github.com/golang/protobuf/proto" // nolint:staticcheck "github.com/pingcap/errors" +<<<<<<< HEAD mm "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" parser_types "github.com/pingcap/parser/types" +======= + "github.com/pingcap/log" +>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902)) "github.com/pingcap/ticdc/cdc/model" cerror "github.com/pingcap/ticdc/pkg/errors" canal "github.com/pingcap/ticdc/proto/canal" diff --git a/cdc/sink/common/flow_control.go b/cdc/sink/common/flow_control.go index 47ad19fc6b7..a008d58b25a 100644 --- a/cdc/sink/common/flow_control.go +++ b/cdc/sink/common/flow_control.go @@ -14,12 +14,12 @@ package common import ( - "log" "sync" "sync/atomic" "github.com/edwingeng/deque" "github.com/pingcap/errors" + "github.com/pingcap/log" cerrors "github.com/pingcap/ticdc/pkg/errors" "go.uber.org/zap" ) diff --git a/integration/tests/case_date_time.go b/integration/tests/case_date_time.go index 8d5f622a398..d318f7761ab 100644 --- a/integration/tests/case_date_time.go +++ b/integration/tests/case_date_time.go @@ -15,13 +15,20 @@ package tests import ( "errors" - "log" "time" +<<<<<<< HEAD:integration/tests/case_date_time.go "github.com/pingcap/ticdc/integration/framework" "github.com/pingcap/ticdc/integration/framework/avro" "github.com/pingcap/ticdc/integration/framework/canal" "github.com/pingcap/ticdc/integration/framework/mysql" +======= + "github.com/pingcap/log" + "github.com/pingcap/ticdc/tests/mq_protocol_tests/framework" + "github.com/pingcap/ticdc/tests/mq_protocol_tests/framework/avro" + "github.com/pingcap/ticdc/tests/mq_protocol_tests/framework/canal" + "github.com/pingcap/ticdc/tests/mq_protocol_tests/framework/mysql" +>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902)):tests/mq_protocol_tests/cases/case_date_time.go ) // DateTimeCase is base impl of test case for different types data diff --git a/pkg/context/context.go b/pkg/context/context.go index 9449638e4a0..af2261d688e 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -15,13 +15,16 @@ package context import ( "context" - "log" "time" +<<<<<<< HEAD "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/version" +======= + "github.com/pingcap/log" +>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902)) "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" diff --git a/pkg/context/context_test.go b/pkg/context/context_test.go index 78186e1673d..08c0886c338 100644 --- a/pkg/context/context_test.go +++ b/pkg/context/context_test.go @@ -136,7 +136,11 @@ func (s *contextSuite) TestThrowPanic(c *check.C) { defer testleak.AfterTest(c)() defer func() { panicMsg := recover() +<<<<<<< HEAD c.Assert(panicMsg, check.Equals, "an error has escaped, please report a bug{error 26 0 mock error}") +======= + require.Equal(t, panicMsg, "an error has escaped, please report a bug") +>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902)) }() stdCtx := context.Background() ctx := NewContext(stdCtx, &GlobalVars{}) diff --git a/pkg/etcd/etcdkey.go b/pkg/etcd/etcdkey.go index 51d4c91cf0a..ca7ac9cfe15 100644 --- a/pkg/etcd/etcdkey.go +++ b/pkg/etcd/etcdkey.go @@ -14,9 +14,9 @@ package etcd import ( - "log" "strings" + "github.com/pingcap/log" cerror "github.com/pingcap/ticdc/pkg/errors" ) diff --git a/pkg/orchestrator/util/key_utils.go b/pkg/orchestrator/util/key_utils.go index e9b68ca491c..008c8ae9705 100644 --- a/pkg/orchestrator/util/key_utils.go +++ b/pkg/orchestrator/util/key_utils.go @@ -14,8 +14,9 @@ package util import ( - "log" "strings" + + "github.com/pingcap/log" ) // EtcdKey represents a complete key in Etcd.