Skip to content

Commit

Permalink
This is an automated cherry-pick of #3902
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Dec 17, 2021
1 parent 0c853a6 commit 846d4fa
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 7 deletions.
41 changes: 41 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,44 @@ linters:
enable:
- unconvert
- unparam
<<<<<<< HEAD
=======
- revive
- depguard

linters-settings:
revive:
ignore-generated-header: false
severity: error
confidence: 0.8
error-code: -1
warning-code: -1
rules:
- name: blank-imports
- name: context-as-argument
- name: dot-imports
- name: error-return
- name: error-strings
- name: error-naming
- name: exported
- name: if-return
- name: var-naming
- name: package-comments
- name: range
- name: receiver-naming
- name: indent-error-flow
- name: superfluous-else
- name: modifies-parameter
- name: unreachable-code

depguard:
list-type: blacklist
include-go-root: false
packages:
- log
- github.com/juju/errors
packages-with-error-message:
# specify an error message to output when a blacklisted package is used
- log: "logging is allowed only by pingcap/log"
- github.com/juju/errors: "error handling is allowed only by pingcap/errors"
>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902))
3 changes: 2 additions & 1 deletion cdc/puller/frontier/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package frontier
import (
"bytes"
"fmt"
"log"
"math"
"strings"

_ "unsafe" // required by go:linkname

"github.com/pingcap/log"
)

const (
Expand Down
211 changes: 211 additions & 0 deletions cdc/scheduler/util/table_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"go.uber.org/zap"
)

// TableSet provides a data structure to store the tables' states for the
// scheduler.
type TableSet struct {
// all tables' records
tableIDMap map[model.TableID]*TableRecord

// a non-unique index to facilitate looking up tables
// assigned to a given capture.
captureIndex map[model.CaptureID]map[model.TableID]*TableRecord
}

// TableRecord is a record to be inserted into TableSet.
type TableRecord struct {
TableID model.TableID
CaptureID model.CaptureID
Status TableStatus
}

// Clone returns a copy of the TableSet.
// This method is future-proof in case we add
// something not trivially copyable.
func (r *TableRecord) Clone() *TableRecord {
return &TableRecord{
TableID: r.TableID,
CaptureID: r.CaptureID,
Status: r.Status,
}
}

// TableStatus is a type representing the table's replication status.
type TableStatus int32

const (
AddingTable = TableStatus(iota) + 1
RemovingTable
RunningTable
)

// NewTableSet creates a new TableSet.
func NewTableSet() *TableSet {
return &TableSet{
tableIDMap: map[model.TableID]*TableRecord{},
captureIndex: map[model.CaptureID]map[model.TableID]*TableRecord{},
}
}

// AddTableRecord inserts a new TableRecord.
// It returns true if it succeeds. Returns false if there is a duplicate.
func (s *TableSet) AddTableRecord(record *TableRecord) (successful bool) {
if _, ok := s.tableIDMap[record.TableID]; ok {
// duplicate tableID
return false
}
recordCloned := record.Clone()
s.tableIDMap[record.TableID] = recordCloned

captureIndexEntry := s.captureIndex[record.CaptureID]
if captureIndexEntry == nil {
captureIndexEntry = make(map[model.TableID]*TableRecord)
s.captureIndex[record.CaptureID] = captureIndexEntry
}

captureIndexEntry[record.TableID] = recordCloned
return true
}

// UpdateTableRecord updates an existing TableRecord.
// All modifications to a table's status should be done by this method.
func (s *TableSet) UpdateTableRecord(record *TableRecord) (successful bool) {
oldRecord, ok := s.tableIDMap[record.TableID]
if !ok {
// table does not exist
return false
}

// If there is no need to modify the CaptureID, we simply
// update the record.
if record.CaptureID == oldRecord.CaptureID {
recordCloned := record.Clone()
s.tableIDMap[record.TableID] = recordCloned
s.captureIndex[record.CaptureID][record.TableID] = recordCloned
return true
}

// If the CaptureID is changed, we do a proper RemoveTableRecord followed
// by AddTableRecord.
if record.CaptureID != oldRecord.CaptureID {
if ok := s.RemoveTableRecord(record.TableID); !ok {
log.Panic("unreachable", zap.Any("record", record))
}
if ok := s.AddTableRecord(record); !ok {
log.Panic("unreachable", zap.Any("record", record))
}
}
return true
}

// GetTableRecord tries to obtain a record with the specified tableID.
func (s *TableSet) GetTableRecord(tableID model.TableID) (*TableRecord, bool) {
rec, ok := s.tableIDMap[tableID]
if ok {
return rec.Clone(), ok
}
return nil, false
}

// RemoveTableRecord removes the record with tableID. Returns false
// if none exists.
func (s *TableSet) RemoveTableRecord(tableID model.TableID) bool {
record, ok := s.tableIDMap[tableID]
if !ok {
return false
}
delete(s.tableIDMap, record.TableID)

captureIndexEntry, ok := s.captureIndex[record.CaptureID]
if !ok {
log.Panic("unreachable", zap.Int64("table-id", tableID))
}
delete(captureIndexEntry, record.TableID)
if len(captureIndexEntry) == 0 {
delete(s.captureIndex, record.CaptureID)
}
return true
}

// RemoveTableRecordByCaptureID removes all table records associated with
// captureID.
func (s *TableSet) RemoveTableRecordByCaptureID(captureID model.CaptureID) []*TableRecord {
captureIndexEntry, ok := s.captureIndex[captureID]
if !ok {
return nil
}

var ret []*TableRecord
for tableID, record := range captureIndexEntry {
delete(s.tableIDMap, tableID)
// Since the record has been removed,
// there is no need to clone it before returning.
ret = append(ret, record)
}
delete(s.captureIndex, captureID)
return ret
}

// CountTableByCaptureID counts the number of tables associated with the captureID.
func (s *TableSet) CountTableByCaptureID(captureID model.CaptureID) int {
return len(s.captureIndex[captureID])
}

// GetDistinctCaptures counts distinct captures with tables.
func (s *TableSet) GetDistinctCaptures() []model.CaptureID {
var ret []model.CaptureID
for captureID := range s.captureIndex {
ret = append(ret, captureID)
}
return ret
}

// GetAllTables returns all stored information on all tables.
func (s *TableSet) GetAllTables() map[model.TableID]*TableRecord {
ret := make(map[model.TableID]*TableRecord)
for tableID, record := range s.tableIDMap {
ret[tableID] = record.Clone()
}
return ret
}

// GetAllTablesGroupedByCaptures returns all stored information grouped by associated CaptureID.
func (s *TableSet) GetAllTablesGroupedByCaptures() map[model.CaptureID]map[model.TableID]*TableRecord {
ret := make(map[model.CaptureID]map[model.TableID]*TableRecord)
for captureID, tableIDMap := range s.captureIndex {
tableIDMapCloned := make(map[model.TableID]*TableRecord)
for tableID, record := range tableIDMap {
tableIDMapCloned[tableID] = record.Clone()
}
ret[captureID] = tableIDMapCloned
}
return ret
}

// CountTableByStatus counts the number of tables with the given status.
func (s *TableSet) CountTableByStatus(status TableStatus) (count int) {
for _, record := range s.tableIDMap {
if record.Status == status {
count++
}
}
return
}
5 changes: 4 additions & 1 deletion cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ package codec

import (
"fmt"
"log"
"strconv"
"strings"

"github.com/golang/protobuf/proto" // nolint:staticcheck
"github.com/pingcap/errors"
<<<<<<< HEAD
mm "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
parser_types "github.com/pingcap/parser/types"
=======
"github.com/pingcap/log"
>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902))
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
canal "github.com/pingcap/ticdc/proto/canal"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/common/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package common

import (
"log"
"sync"
"sync/atomic"

"github.com/edwingeng/deque"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"go.uber.org/zap"
)
Expand Down
9 changes: 8 additions & 1 deletion integration/tests/case_date_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ package tests

import (
"errors"
"log"
"time"

<<<<<<< HEAD:integration/tests/case_date_time.go
"github.com/pingcap/ticdc/integration/framework"
"github.com/pingcap/ticdc/integration/framework/avro"
"github.com/pingcap/ticdc/integration/framework/canal"
"github.com/pingcap/ticdc/integration/framework/mysql"
=======
"github.com/pingcap/log"
"github.com/pingcap/ticdc/tests/mq_protocol_tests/framework"
"github.com/pingcap/ticdc/tests/mq_protocol_tests/framework/avro"
"github.com/pingcap/ticdc/tests/mq_protocol_tests/framework/canal"
"github.com/pingcap/ticdc/tests/mq_protocol_tests/framework/mysql"
>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902)):tests/mq_protocol_tests/cases/case_date_time.go
)

// DateTimeCase is base impl of test case for different types data
Expand Down
5 changes: 4 additions & 1 deletion pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ package context

import (
"context"
"log"
"time"

<<<<<<< HEAD
"github.com/pingcap/ticdc/pkg/pdtime"

"github.com/pingcap/ticdc/pkg/version"

=======
"github.com/pingcap/log"
>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902))
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
Expand Down
4 changes: 4 additions & 0 deletions pkg/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func (s *contextSuite) TestThrowPanic(c *check.C) {
defer testleak.AfterTest(c)()
defer func() {
panicMsg := recover()
<<<<<<< HEAD
c.Assert(panicMsg, check.Equals, "an error has escaped, please report a bug{error 26 0 mock error}")
=======
require.Equal(t, panicMsg, "an error has escaped, please report a bug")
>>>>>>> 2ed02366d (pkg,cdc: do not use log package (#3902))
}()
stdCtx := context.Background()
ctx := NewContext(stdCtx, &GlobalVars{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/etcdkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package etcd

import (
"log"
"strings"

"github.com/pingcap/log"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/orchestrator/util/key_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
package util

import (
"log"
"strings"

"github.com/pingcap/log"
)

// EtcdKey represents a complete key in Etcd.
Expand Down

0 comments on commit 846d4fa

Please sign in to comment.