Skip to content

Commit

Permalink
*:Fix the txn mounter of owner ddl handler and add some log (pingcap#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Nov 7, 2019
1 parent 077ec46 commit 91dec0c
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 21 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ bin
*.iml
.idea
.DS_Store
.devcontainer
.vscode

cscope.*
**/*.swp
Expand Down
11 changes: 8 additions & 3 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/model"
Expand All @@ -30,6 +31,7 @@ import (
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -63,6 +65,11 @@ func NewCapture(pdEndpoints []string) (c *Capture, err error) {
}

id := uuid.New().String()
info := &model.CaptureInfo{
ID: id,
}

log.Info("creating capture", zap.String("capture-id", id))

manager := roles.NewOwnerManager(cli, id, CaptureOwnerKey)
pdCli, err := fNewPDCli(pdEndpoints, pd.SecurityOption{})
Expand All @@ -77,9 +84,7 @@ func NewCapture(pdEndpoints []string) (c *Capture, err error) {
etcdClient: cli,
ownerManager: manager,
ownerWorker: worker,
info: &model.CaptureInfo{
ID: id,
},
info: info,
}

return
Expand Down
1 change: 1 addition & 0 deletions cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *Server) startStatusHTTP() {

addr := fmt.Sprintf("%s:%d", s.opts.statusHost, s.opts.statusPort)
s.statusServer = &http.Server{Addr: addr, Handler: serverMux}
log.Info("status http server is running", zap.String("addr", addr))
go func() {
err := s.statusServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
Expand Down
3 changes: 1 addition & 2 deletions cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (s *httpStatusSuite) waitUntilServerOnline(c *check.C) {
}

func (s *httpStatusSuite) TestHTTPStatus(c *check.C) {
server, err := NewServer()
c.Assert(err, check.IsNil)
server := &Server{opts: defaultServerOptions}
server.startStatusHTTP()
defer func() {
c.Assert(server.statusServer.Close(), check.IsNil)
Expand Down
11 changes: 9 additions & 2 deletions cdc/owner_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"context"
"database/sql"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/schema"
"github.com/pingcap/tidb-cdc/cdc/sink"
"github.com/pingcap/tidb-cdc/cdc/txn"
"github.com/pingcap/tidb-cdc/pkg/util"
Expand All @@ -44,9 +46,14 @@ type ddlHandler struct {
func NewDDLHandler(pdCli pd.Client) *ddlHandler {
puller := NewPuller(pdCli, 0, []util.Span{util.GetDDLSpan()})
ctx, cancel := context.WithCancel(context.Background())
// TODO this TxnMounter only mount DDL transaction, so it needn't schemaStorage
schemaStorage, _ := schema.NewStorage(nil, false)
// TODO get time loc from config
txnMounter, _ := txn.NewTxnMounter(schemaStorage, time.UTC)
h := &ddlHandler{
puller: puller,
cancel: cancel,
puller: puller,
cancel: cancel,
mounter: txnMounter,
}
// Set it up so that one failed goroutine cancels all others sharing the same ctx
errg, ctx := errgroup.WithContext(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cdc/roles/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type Owner interface {
// OwnerDDLHandler defines the ddl handler for Owner
// which can pull ddl jobs and execute ddl jobs
type OwnerDDLHandler interface {

// PullDDL pulls the ddl jobs and returns resolvedTs of DDL Puller and job list.
PullDDL() (resolvedTs uint64, jobs []*txn.DDL, err error)

Expand Down Expand Up @@ -363,6 +362,7 @@ func (o *ownerImpl) assignChangeFeed(ctx context.Context, changefeedID string) (
}
result[captures[i].ID] = info
}
log.Info("assignChangeFeed", zap.Reflect("result", result))

return result, nil
}
18 changes: 11 additions & 7 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,26 @@ func NewServer(opt ...ServerOption) (*Server, error) {
for _, o := range opt {
o(&opts)
}
log.Info("creating CDC server",
zap.String("pd-addr", opts.pdEndpoints),
zap.String("status-host", opts.statusHost),
zap.Int("status-port", opts.statusPort))

s := &Server{
opts: opts,
capture, err := NewCapture(strings.Split(opts.pdEndpoints, ","))
if err != nil {
return nil, err
}

s := &Server{
opts: opts,
capture: capture,
}
return s, nil
}

// Run runs the server.
func (s *Server) Run(ctx context.Context) error {
s.startStatusHTTP()
capture, err := NewCapture(strings.Split(s.opts.pdEndpoints, ","))
if err != nil {
return err
}
s.capture = capture
return s.capture.Start(ctx)
}

Expand Down
12 changes: 6 additions & 6 deletions tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ sync-log = false
# make tikv don't split now
[coprocessor]
## When it is set to `true`, TiKV will try to split a Region with table prefix if that Region
## When it is set to \`true\`, TiKV will try to split a Region with table prefix if that Region
## crosses tables.
## It is recommended to turn off this option if there will be a large number of tables created.
split-region-on-table = false
Expand All @@ -75,15 +75,15 @@ split-region-on-table = false
## split keys in one batch.
# batch-split-limit = 10
## When Region [a,e) size exceeds `region_max_size`, it will be split into several Regions [a,b),
## [b,c), [c,d), [d,e) and the size of [a,b), [b,c), [c,d) will be `region_split_size` (or a
## When Region [a,e) size exceeds \`region_max_size\`, it will be split into several Regions [a,b),
## [b,c), [c,d), [d,e) and the size of [a,b), [b,c), [c,d) will be \`region_split_size\` (or a
## little larger).
region-max-size = "100000MB"
region-split-size = "100000MB"
## When the number of keys in Region [a,e) exceeds the `region_max_keys`, it will be split into
## When the number of keys in Region [a,e) exceeds the \`region_max_keys\`, it will be split into
## several Regions [a,b), [b,c), [c,d), [d,e) and the number of keys in [a,b), [b,c), [c,d) will be
## `region_split_keys`.
## \`region_split_keys\`.
region-max-keys = 100000000
region-split-keys = 100000000
EOF
Expand Down Expand Up @@ -153,7 +153,7 @@ EOF
done

echo "Starting CDC..."
cdc server --log-file "$OUT_DIR/cdc.log" &
cdc server --log-file "$OUT_DIR/cdc.log" --log-level info &
sleep 1
}

Expand Down

0 comments on commit 91dec0c

Please sign in to comment.