Skip to content

Commit

Permalink
trace err (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Dec 16, 2019
1 parent 80ee381 commit 2c8f044
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 30 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ _testmain.go
*.exe
*.test
*.prof
*.log

bin
*.iml
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *Capture) Start(ctx context.Context) (err error) {
// TODO: better channgefeed model with etcd storage
err = c.register(ctx)
if err != nil {
return err
return errors.Trace(err)
}

err = c.ownerManager.CampaignOwner(ctx)
Expand Down
6 changes: 3 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (c *CDCClient) partialRegionFeed(

if err != nil {
log.Warn("get meta failed", zap.Error(err))
return err
return errors.Trace(err)
}

maxTs, err := c.singleEventFeed(ctx, regionInfo.span, regionInfo.ts, regionInfo.meta, eventCh)
Expand All @@ -241,13 +241,13 @@ func (c *CDCClient) partialRegionFeed(
case *eventError:
if eerr.GetNotLeader() != nil {
regionInfo.meta = nil
return err
return errors.Trace(err)
} else if eerr.GetEpochNotMatch() != nil {
regionSplitCounter.Inc()
return c.divideAndSendEventFeedToRegions(ctx, regionInfo.span, ts, regionCh)
} else if eerr.GetRegionNotFound() != nil {
regionInfo.meta = nil
return err
return errors.Trace(err)
} else {
log.Warn("receive empty or unknown error msg", zap.Stringer("error", eerr))
return errors.Annotate(err, "receive empty or unknow error msg")
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func PutChangeFeedStatus(
key := GetEtcdKeyChangeFeedStatus(changefeedID)
value, err := info.Marshal()
if err != nil {
return err
return errors.Trace(err)
}
_, err = client.Put(ctx, key, value, opts...)
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ waitCheckpointTsLoop:
zap.String("ChangeFeedID", changeFeedID),
zap.Error(err),
zap.Reflect("ddlJob", todoDDLJob))
return err
return errors.Trace(err)
}
log.Info("Execute DDL succeeded",
zap.String("ChangeFeedID", changeFeedID),
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,5 @@ func (h *ddlHandler) ExecDDL(ctx context.Context, sinkURI string, ddl *model.DDL
func (h *ddlHandler) Close() error {
h.cancel()
err := h.wg.Wait()
return err
return errors.Trace(err)
}
4 changes: 2 additions & 2 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func (p *pullerImpl) Run(ctx context.Context) error {
}

if err := p.buf.AddKVEntry(ctx, kv); err != nil {
return err
return errors.Trace(err)
}
} else if e.Checkpoint != nil {
cp := e.Checkpoint
if err := p.buf.AddResolved(ctx, cp.Span, cp.ResolvedTs); err != nil {
return err
return errors.Trace(err)
}
}
case <-ctx.Done():
Expand Down
8 changes: 4 additions & 4 deletions cdc/roles/storage/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (rw *ChangeFeedInfoRWriter) Write(ctx context.Context, infos map[model.Chan
for changefeedID, info := range infos {
storeVal, err := info.Marshal()
if err != nil {
return err
return errors.Trace(err)
}
key := kv.GetEtcdKeyChangeFeedStatus(changefeedID)
ops = append(ops, clientv3.OpPut(key, storeVal))
Expand Down Expand Up @@ -184,7 +184,7 @@ func (rw *ProcessorTsEtcdRWriter) WriteInfoIntoStorage(
key := kv.GetEtcdKeySubChangeFeed(rw.changefeedID, rw.captureID)
value, err := rw.info.Marshal()
if err != nil {
return err
return errors.Trace(err)
}

resp, err := rw.etcdClient.KV.Txn(ctx).If(
Expand Down Expand Up @@ -328,7 +328,7 @@ func (ow *OwnerSubCFInfoEtcdWriter) Write(
err = retry.Run(func() error {
value, err := newInfo.Marshal()
if err != nil {
return err
return errors.Trace(err)
}

resp, err := ow.etcdClient.KV.Txn(ctx).If(
Expand All @@ -350,7 +350,7 @@ func (ow *OwnerSubCFInfoEtcdWriter) Write(
case nil:
return errors.Trace(model.ErrWriteSubChangeFeedInfoConlict)
default:
return err
return errors.Trace(err)
}
}

Expand Down
12 changes: 6 additions & 6 deletions cdc/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (w *ChangeFeedWatcher) processPutKv(kv *mvccpb.KeyValue) (bool, string, mod
func (w *ChangeFeedWatcher) processDeleteKv(kv *mvccpb.KeyValue) error {
changefeedID, err := util.ExtractKeySuffix(string(kv.Key))
if err != nil {
return err
return errors.Trace(err)
}
w.lock.Lock()
delete(w.details, changefeedID)
Expand All @@ -93,12 +93,12 @@ func (w *ChangeFeedWatcher) Watch(ctx context.Context, cb processorCallback) err

revision, details, err := kv.GetChangeFeeds(ctx, w.etcdCli)
if err != nil {
return err
return errors.Trace(err)
}
for changefeedID, kv := range details {
needRunWatcher, _, detail, err := w.processPutKv(kv)
if err != nil {
return err
return errors.Trace(err)
}
if needRunWatcher {
runProcessorWatcher(ctx, changefeedID, w.captureID, w.pdEndpoints, w.etcdCli, detail, errCh, cb)
Expand All @@ -111,7 +111,7 @@ func (w *ChangeFeedWatcher) Watch(ctx context.Context, cb processorCallback) err
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
return err
return errors.Trace(err)
case resp, ok := <-watchCh:
if !ok {
log.Info("watcher is closed")
Expand All @@ -126,15 +126,15 @@ func (w *ChangeFeedWatcher) Watch(ctx context.Context, cb processorCallback) err
case mvccpb.PUT:
needRunWatcher, changefeedID, detail, err := w.processPutKv(ev.Kv)
if err != nil {
return err
return errors.Trace(err)
}
if needRunWatcher {
runProcessorWatcher(ctx, changefeedID, w.captureID, w.pdEndpoints, w.etcdCli, detail, errCh, cb)
}
case mvccpb.DELETE:
err := w.processDeleteKv(ev.Kv)
if err != nil {
return err
return errors.Trace(err)
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDL) error {

tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
return errors.Trace(err)
}

if shouldSwitchDB {
Expand All @@ -194,19 +194,19 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDL) error {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.Error(err))
}
return err
return errors.Trace(err)
}
}

if _, err = tx.ExecContext(ctx, ddl.Job.Query); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", ddl.Job.Query), zap.Error(err))
}
return err
return errors.Trace(err)
}

if err = tx.Commit(); err != nil {
return err
return errors.Trace(err)
}

log.Info("Exec DDL succeeded", zap.String("sql", ddl.Job.Query))
Expand All @@ -216,7 +216,7 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDL) error {
func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*model.DML) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
return errors.Trace(err)
}

for _, dml := range dmls {
Expand All @@ -234,18 +234,19 @@ func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*model.DML) error {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.Error(err))
}
return err
return errors.Trace(err)
}
log.Debug("exec dml", zap.String("sql", query), zap.Any("args", args))
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", query), zap.Error(err))
}
return err
return errors.Trace(err)
}
}

if err = tx.Commit(); err != nil {
return err
return errors.Trace(err)
}

log.Info("Exec DML succeeded", zap.Int("num of DMLs", len(dmls)))
Expand Down
7 changes: 4 additions & 3 deletions cdc/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
Expand All @@ -41,7 +42,7 @@ func CollectRawTxns(
for {
be, err := inputFn(ctx)
if err != nil {
return err
return errors.Trace(err)
}
if be.KV != nil {
entryGroups[be.KV.Ts] = append(entryGroups[be.KV.Ts], be.KV)
Expand All @@ -68,7 +69,7 @@ func CollectRawTxns(
for _, t := range readyTxns {
err := outputFn(ctx, t)
if err != nil {
return err
return errors.Trace(err)
}
}
if len(readyTxns) == 0 {
Expand All @@ -79,7 +80,7 @@ func CollectRawTxns(
}
err := outputFn(ctx, fakeTxn)
if err != nil {
return err
return errors.Trace(err)
}
}
}
Expand Down

0 comments on commit 2c8f044

Please sign in to comment.