Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fb/latency(cdc): merge master to resolve conflicts. #5677

Merged
merged 37 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
377f802
kv(ticdc): set debug level for resolved ts fallback log (#5558)
overvenus May 24, 2022
745d924
codec(ticdc): update goavro to 2.11.1 to solve avro decimal encoding …
zhangyangyu May 24, 2022
1dc5c2d
e2e(engine): test two running DM jobs (#5581)
lance6716 May 25, 2022
e533cd7
metrics(cdc): tracking memory consumption at processor level (#5538)
3AceShowHand May 25, 2022
de03021
externalresources(engine): implement GCCoordinator & GCRunner (#5557)
liuzix May 25, 2022
03a259f
DM(dmctl/openapi): fix incremental task's source meta check (#5552)
WizardXiao May 25, 2022
98fbefd
executer_manager(engine): Support watching executor online/offline ev…
liuzix May 25, 2022
664b773
sink/mq(ticdc): Add support for Confluent Cloud Kafka (#5553)
zhaoxinyu May 25, 2022
1c5e550
server(cdc): avoid printing help messages when cdc server exits (#5524)
May 26, 2022
e87ea0f
lib/master(engine): fix PreDispatch failure handling (#5603)
liuzix May 26, 2022
4ed4693
codec(ticdc): use handlekey instead of primary key for avro (#5601)
zhangyangyu May 26, 2022
ef41531
api(ticdc): setup open api v2 (#5585)
sdojjy May 26, 2022
960ef59
dm: fix typo (#5582)
buchuitoudegou May 26, 2022
ab18af2
syncer: fix table checkpoint panic problem (#5452)
lichunzhu May 26, 2022
7609fea
pipeline(ticdc): fix table actor cyclic node bug (#5600)
sdojjy May 27, 2022
d957fe8
validator(dm): support start from time & fix & integration test (#5534)
D3Hunter May 27, 2022
c467834
redo(ticdc): use uuid in s3 log file to avoid name conflict (#5595)
amyangfei May 27, 2022
2e75a32
externalresource(engine): Integrate all parts related to GC (Part I) …
liuzix May 27, 2022
240fde8
test(pkg/cmd): migrate test-infra to testify for pkg/cmd (#5547)
CharlesCheung96 May 27, 2022
80c1532
redo(ticdc): fix resolved moves too fast when part of tables are not …
amyangfei May 27, 2022
21a78f8
relay, syncer(dm): rename UUIDSuffix to RelaySubDirSuffix and others …
lance6716 May 27, 2022
dd7944e
pipeline(ticdc): delete pipeline and node runner (#5592)
sdojjy May 27, 2022
3faaeb6
tls(dm): support init tls without client kay/certs (#5393)
Ehco1996 May 28, 2022
9b29eef
redo(ticdc): fix a bug that flush log executed before writing logs (#…
amyangfei May 29, 2022
e0a2280
pb(engine): rename to enginepb to avoid name conflicts (#5627)
amyangfei May 30, 2022
c7bc1a5
promutil(engine): extract wrapping factory for DM (#5616)
lance6716 May 30, 2022
b091498
test(ticdc): fix log suffix in run_cdc_server script (#5638)
amyangfei May 30, 2022
780f244
externalresource(engine): implement Discard local resource & OnWorker…
liuzix May 30, 2022
a76d3f6
metrics(*): fix for promtheus_client breaking change (#5643)
Ehco1996 May 30, 2022
728c693
test(engine): remove unit test from Github Action. (#5651)
amyangfei May 30, 2022
bc4d01c
test: use `T.TempDir` to create temporary test directory (#5660)
Juneezee May 31, 2022
8925486
engine: normalize the csv job package path name (#5662)
lonng May 31, 2022
dd41f0f
sink/mq(ticdc): flush waits for all events to be acked (#5657)
Rustin170506 May 31, 2022
329230a
lib/worker(engine): Move ExitController to another package and add UT…
liuzix May 31, 2022
2ccee24
test(dm): show full result of ps on kill fail (#5667)
D3Hunter May 31, 2022
1e87b56
Merge branch 'master' into merge-master-531
3AceShowHand May 31, 2022
bdf989e
remove pipeline
3AceShowHand May 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions .github/workflows/dataflow_engine_ut.yaml

This file was deleted.

5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ tools/bin/msgp: tools/check/go.mod
tools/bin/protoc:
./scripts/download-protoc.sh

tools/bin/goimports:
cd tools/check && $(GO) build -mod=mod -o ../bin/goimports golang.org/x/tools/cmd/goimports

check_failpoint_ctl: tools/bin/failpoint-ctl

failpoint-enable: check_failpoint_ctl
Expand All @@ -473,7 +476,7 @@ failpoint-disable: check_failpoint_ctl

engine: df-master df-executor df-master-client df-demo

df-proto:
df-proto: tools/bin/protoc tools/bin/protoc-gen-gogofaster tools/bin/goimports
./engine/generate-proto.sh

df-master:
Expand Down
38 changes: 38 additions & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"github.com/gin-gonic/gin"
"github.com/pingcap/tiflow/cdc/api/middleware"
"github.com/pingcap/tiflow/cdc/capture"
)

// OpenAPIV2 provides CDC v2 APIs
type OpenAPIV2 struct {
capture *capture.Capture
}

// NewOpenAPI creates a new openAPIs.
func NewOpenAPI(c *capture.Capture) *OpenAPIV2 {
return &OpenAPIV2{capture: c}
}

// RegisterOpenAPIRoutes registers routes for OpenAPI
func RegisterOpenAPIRoutes(router *gin.Engine, api *OpenAPIV2) {
v2 := router.Group("/api/v2")

v2.Use(middleware.LogMiddleware())
v2.Use(middleware.ErrorHandleMiddleware())
}
14 changes: 14 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2
20 changes: 20 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

// Tso contains timestamp get from PD
type Tso struct {
Timestamp int64 `json:"timestamp"`
LogicTime int64 `json:"logic-time"`
}
14 changes: 6 additions & 8 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,12 @@ func (c *Capture) reset(ctx context.Context) error {
if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
}
if conf.Debug.EnableTableActor {
c.tableActorSystem = system.NewSystem()
err = c.tableActorSystem.Start(ctx)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create table actor system")
}
c.tableActorSystem = system.NewSystem()
err = c.tableActorSystem.Start(ctx)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create table actor system")
}
if conf.Debug.EnableDBSorter {
if c.sorterSystem != nil {
Expand Down
20 changes: 10 additions & 10 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,24 +247,24 @@ func unflatten(datum types.Datum, ft *types.FieldType, loc *time.Location) (type
if datum.IsNull() {
return datum, nil
}
switch ft.Tp {
switch ft.GetType() {
case mysql.TypeFloat:
datum.SetFloat32(float32(datum.GetFloat64()))
return datum, nil
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob,
mysql.TypeMediumBlob, mysql.TypeBlob, mysql.TypeLongBlob:
datum.SetString(datum.GetString(), ft.Collate)
datum.SetString(datum.GetString(), ft.GetCollate())
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeYear, mysql.TypeInt24,
mysql.TypeLong, mysql.TypeLonglong, mysql.TypeDouble:
return datum, nil
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp:
t := types.NewTime(types.ZeroCoreTime, ft.Tp, ft.Decimal)
t := types.NewTime(types.ZeroCoreTime, ft.GetType(), ft.GetDecimal())
var err error
err = t.FromPackedUint(datum.GetUint64())
if err != nil {
return datum, cerror.WrapError(cerror.ErrDatumUnflatten, err)
}
if ft.Tp == mysql.TypeTimestamp && !t.IsZero() {
if ft.GetType() == mysql.TypeTimestamp && !t.IsZero() {
err = t.ConvertTimeZone(time.UTC, loc)
if err != nil {
return datum, cerror.WrapError(cerror.ErrDatumUnflatten, err)
Expand All @@ -274,27 +274,27 @@ func unflatten(datum types.Datum, ft *types.FieldType, loc *time.Location) (type
datum.SetMysqlTime(t)
return datum, nil
case mysql.TypeDuration: // duration should read fsp from column meta data
dur := types.Duration{Duration: time.Duration(datum.GetInt64()), Fsp: ft.Decimal}
dur := types.Duration{Duration: time.Duration(datum.GetInt64()), Fsp: ft.GetDecimal()}
datum.SetMysqlDuration(dur)
return datum, nil
case mysql.TypeEnum:
// ignore error deliberately, to read empty enum value.
enum, err := types.ParseEnumValue(ft.Elems, datum.GetUint64())
enum, err := types.ParseEnumValue(ft.GetElems(), datum.GetUint64())
if err != nil {
enum = types.Enum{}
}
datum.SetMysqlEnum(enum, ft.Collate)
datum.SetMysqlEnum(enum, ft.GetCollate())
return datum, nil
case mysql.TypeSet:
set, err := types.ParseSetValue(ft.Elems, datum.GetUint64())
set, err := types.ParseSetValue(ft.GetElems(), datum.GetUint64())
if err != nil {
return datum, cerror.WrapError(cerror.ErrDatumUnflatten, err)
}
datum.SetMysqlSet(set, ft.Collate)
datum.SetMysqlSet(set, ft.GetCollate())
return datum, nil
case mysql.TypeBit:
val := datum.GetUint64()
byteSize := (ft.Flen + 7) >> 3
byteSize := (ft.GetFlen() + 7) >> 3
datum.SetUint64(0)
datum.SetMysqlBit(types.NewBinaryLiteralFromUint(val, byteSize))
}
Expand Down
14 changes: 7 additions & 7 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colSize += size
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colName,
Type: colInfo.Tp,
Charset: colInfo.Charset,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Value: colValue,
Default: defaultValue,
Flag: tableInfo.ColumnsFlag[colInfo.ID],
Expand Down Expand Up @@ -395,7 +395,7 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) (
if datum.IsNull() {
return nil, 0, "", nil
}
switch col.Tp {
switch col.GetType() {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
v := datum.GetMysqlTime().String()
return v, sizeOfString(v), "", nil
Expand Down Expand Up @@ -472,24 +472,24 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, e
return d.GetValue(), sizeOfDatum(d), "", nil
}

if !mysql.HasNotNullFlag(col.Flag) {
if !mysql.HasNotNullFlag(col.GetFlag()) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
// must use null if TiDB not write the column value when default value is null
// and the value is null, see https://github.com/pingcap/tidb/issues/9304
d = types.NewDatum(nil)
} else {
switch col.Tp {
switch col.GetType() {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d = types.NewDatum(col.FieldType.Elems[0])
d = types.NewDatum(col.FieldType.GetElem(0))
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, sizeOfEmptyBytes, "", nil
default:
d = table.GetZeroValue(col)
if d.IsNull() {
log.Error("meet unsupported column type", zap.String("columnInfo", col.String()))
log.Error("meet unsupported column type", zap.String("columnInfo", col.FieldType.String()))
}
}
}
Expand Down
Loading