Skip to content

Commit 2d4d3f6

Browse files
author
ffffwh
committed
put job config on dest instead of src #592-1
1 parent b28172a commit 2d4d3f6

File tree

5 files changed

+82
-61
lines changed

5 files changed

+82
-61
lines changed

driver/common/store.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,25 @@ func (sm *StoreManager) GetGtidForJob(jobName string) (string, error) {
101101
return string(p.Value), nil
102102
}
103103

104-
func (sm *StoreManager) GetConfig(jobName string) (*MySQLDriverConfig, error) {
104+
func (sm *StoreManager) WatchConfig(jobName string, stopCh <-chan struct{}) (*MySQLDriverConfig, error) {
105105
key := fmt.Sprintf("dtle/%v/Config", jobName)
106106

107-
kv, err := sm.consulStore.Get(key)
107+
ch, err := sm.consulStore.Watch(key, stopCh)
108108
if err != nil {
109109
return nil, err
110110
}
111111

112-
config := &MySQLDriverConfig{}
113-
err = json.Unmarshal(kv.Value, config)
114-
if err != nil {
115-
return nil, err
112+
pair := <-ch
113+
if pair == nil {
114+
return nil, fmt.Errorf("failed to get config")
115+
} else {
116+
config := &MySQLDriverConfig{}
117+
err = json.Unmarshal(pair.Value, config)
118+
if err != nil {
119+
return nil, err
120+
}
121+
return config, nil
116122
}
117-
return config, nil
118123
}
119124

120125
func (sm *StoreManager) GetSourceType(jobName string) (string, error) {
@@ -205,7 +210,7 @@ func (sm *StoreManager) DstPutNats(jobName string, natsAddr string, stopCh chan
205210
return nil
206211
}
207212

208-
func (sm *StoreManager) SrcWatchNats(jobName string, stopCh chan struct{},
213+
func (sm *StoreManager) SrcWatchNats(jobName string, stopCh <-chan struct{},
209214
onWatchError func(error)) (natsAddr string, err error) {
210215
sm.logger.Debug("SrcWatchNats")
211216

driver/handle.go

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,19 +166,64 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
166166

167167
switch common.TaskTypeFromString(h.taskConfig.Name) {
168168
case common.TaskTypeSrc:
169+
if h.driverConfig.ConnectionConfig != nil {
170+
h.driverConfig.SrcConnectionConfig = h.driverConfig.ConnectionConfig
171+
}
172+
// get job config v1. Call `PutSourceType` before `SrcWatchNats`.
173+
if h.driverConfig.OracleConfig != nil {
174+
err = d.storeManager.PutSourceType(ctx.Subject, "oracle")
175+
if err != nil {
176+
return nil, errors.Wrap(err, "PutSourceType")
177+
}
178+
} else if h.driverConfig.SrcConnectionConfig != nil {
179+
err = d.storeManager.PutSourceType(ctx.Subject, "mysql")
180+
if err != nil {
181+
return nil, errors.Wrap(err, "PutSourceType")
182+
}
183+
} else {
184+
// get job config v2. No need to PutSourceType.
185+
}
186+
187+
natsAddr, err := d.storeManager.SrcWatchNats(ctx.Subject, h.ctx.Done(), func(err error) {
188+
d.logger.Error("**** SrcWatchNats error")
189+
})
190+
if err != nil {
191+
return nil, errors.Wrap(err, "SrcWatchNats")
192+
}
193+
194+
switch h.driverConfig.GetConfigFrom {
195+
case "":
196+
d.logger.Info("get job config from nomad config")
197+
case "consul":
198+
d.logger.Info("get job config from consul")
199+
h.driverConfig, err = d.storeManager.WatchConfig(ctx.Subject, h.ctx.Done())
200+
if err != nil {
201+
return nil, errors.Wrap(err, "WatchConfig")
202+
}
203+
default:
204+
return nil, fmt.Errorf("unrecognized GetConfigFrom %v", h.driverConfig.GetConfigFrom)
205+
}
206+
169207
if h.driverConfig.OracleConfig != nil {
170208
h.logger.Debug("found oracle src", "OracleConfig", h.driverConfig.OracleConfig)
171-
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh)
209+
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, natsAddr)
172210
if err != nil {
173211
return nil, errors.Wrap(err, "NewExtractor")
174212
}
175-
} else {
176-
runner, err = mysql.NewExtractor(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx)
213+
} else if h.driverConfig.SrcConnectionConfig != nil {
214+
runner, err = mysql.NewExtractor(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx, natsAddr)
177215
if err != nil {
178216
return nil, errors.Wrap(err, "NewOracleExtractor")
179217
}
218+
} else {
219+
err = fmt.Errorf("no src connection config")
220+
h.logger.Error(err.Error())
221+
return nil, err
180222
}
181223
case common.TaskTypeDest:
224+
if h.driverConfig.ConnectionConfig != nil {
225+
h.driverConfig.DestConnectionConfig = h.driverConfig.ConnectionConfig
226+
}
182227
h.logger.Debug("found dest", "allConfig", h.driverConfig)
183228
if h.driverConfig.KafkaConfig != nil {
184229
h.logger.Debug("found kafka", "KafkaConfig", h.driverConfig.KafkaConfig)

driver/mysql/applier.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -278,25 +278,24 @@ func (a *Applier) Run() {
278278
return
279279
}
280280

281-
switch a.mysqlContext.GetConfigFrom {
282-
case "":
283-
a.mysqlContext.DestConnectionConfig = a.mysqlContext.SrcConnectionConfig
284-
a.logger.Info("get job config from nomad config")
285-
case "consul":
286-
a.logger.Info("get job config from consul")
287-
a.mysqlContext, err = a.storeManager.GetConfig(a.subject)
281+
err = a.storeManager.PutConfig(a.subject, a.mysqlContext)
282+
if err != nil {
283+
a.onError(common.TaskStateDead, errors.Wrap(err, "PutConfig"))
284+
return
285+
}
286+
287+
var sourceType string
288+
if a.mysqlContext.SrcConnectionConfig != nil {
289+
sourceType = "mysql"
290+
} else if a.mysqlContext.OracleConfig != nil {
291+
sourceType = "oracle"
292+
} else {
293+
// src get config from nomad job config
294+
sourceType, err = a.storeManager.GetSourceType(a.subject)
288295
if err != nil {
289-
a.onError(common.TaskStateDead, errors.Wrap(err, "GetConfig"))
296+
a.onError(common.TaskStateDead, errors.Wrap(err, "watchSourceType"))
290297
return
291298
}
292-
default:
293-
a.onError(common.TaskStateDead, fmt.Errorf("unrecognized GetConfigFrom %v", a.mysqlContext.GetConfigFrom))
294-
}
295-
296-
sourceType, err := a.storeManager.GetSourceType(a.subject)
297-
if err != nil {
298-
a.onError(common.TaskStateDead, errors.Wrap(err, "watchSourceType"))
299-
return
300299
}
301300

302301
if sourceType == "mysql" {

driver/mysql/extractor.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ type Extractor struct {
109109
targetGtid string
110110
}
111111

112-
func NewExtractor(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger g.LoggerType, storeManager *common.StoreManager, waitCh chan *drivers.ExitResult, ctx context.Context) (*Extractor, error) {
112+
func NewExtractor(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger g.LoggerType,
113+
storeManager *common.StoreManager, waitCh chan *drivers.ExitResult, ctx context.Context,
114+
natsAddr string) (*Extractor, error) {
113115
logger.Info("NewExtractor", "job", execCtx.Subject)
114116

115117
e := &Extractor{
@@ -130,6 +132,7 @@ func NewExtractor(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, lo
130132
memory1: new(int64),
131133
memory2: new(int64),
132134
replicateDoDb: map[string]*common.SchemaContext{},
135+
natsAddr: natsAddr,
133136
}
134137
e.dataChannel = make(chan *common.EntryContext, cfg.ReplChanBufferSize*4)
135138
e.timestampCtx = NewTimestampContext(e.shutdownCh, e.logger, func() bool {
@@ -151,17 +154,6 @@ func NewExtractor(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, lo
151154
func (e *Extractor) Run() {
152155
var err error
153156

154-
err = e.storeManager.PutSourceType(e.subject, "mysql")
155-
if err != nil {
156-
e.onError(common.TaskStateDead, errors.Wrap(err, "PutSourceType"))
157-
return
158-
}
159-
160-
err = e.storeManager.PutConfig(e.subject, e.mysqlContext)
161-
if err != nil {
162-
e.onError(common.TaskStateDead, errors.Wrap(err, "PutKey config"))
163-
}
164-
165157
{
166158
jobStatus, _ := e.storeManager.GetJobStatus(e.subject)
167159
if jobStatus == common.TargetGtidFinished {
@@ -203,13 +195,6 @@ func (e *Extractor) Run() {
203195
}
204196
e.logger.Info("after WaitOnJob", "job2", e.mysqlContext.WaitOnJob, "firstWait", firstWait)
205197
}
206-
e.natsAddr, err = e.storeManager.SrcWatchNats(e.subject, e.shutdownCh, func(err error) {
207-
e.onError(common.TaskStateDead, err)
208-
})
209-
if err != nil {
210-
e.onError(common.TaskStateDead, errors.Wrap(err, "SrcWatchNats"))
211-
return
212-
}
213198

214199
err = common.GetGtidFromConsul(e.storeManager, e.subject, e.logger, e.mysqlContext)
215200
if err != nil {

driver/oracle/extractor/extractor_oracle.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ type OracleSchemaInfo struct {
120120
Tables map[string]*ast.CreateTableStmt
121121
}
122122

123-
func NewExtractorOracle(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger g.LoggerType, storeManager *common.StoreManager, waitCh chan *drivers.ExitResult) (*ExtractorOracle, error) {
123+
func NewExtractorOracle(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger g.LoggerType, storeManager *common.StoreManager, waitCh chan *drivers.ExitResult, natsAddr string) (*ExtractorOracle, error) {
124124
logger.Info("NewExtractorOracle", "job", execCtx.Subject)
125125

126126
e := &ExtractorOracle{
@@ -140,6 +140,7 @@ func NewExtractorOracle(execCtx *common.ExecContext, cfg *common.MySQLDriverConf
140140
memory1: new(int64),
141141
memory2: new(int64),
142142
OracleContext: new(OracleContext),
143+
natsAddr: natsAddr,
143144
}
144145
e.dataChannel = make(chan *common.EntryContext, cfg.ReplChanBufferSize*4)
145146
e.timestampCtx = NewTimestampContext(e.shutdownCh, e.logger, func() bool {
@@ -154,20 +155,6 @@ func NewExtractorOracle(execCtx *common.ExecContext, cfg *common.MySQLDriverConf
154155
func (e *ExtractorOracle) Run() {
155156
var err error
156157

157-
err = e.storeManager.PutSourceType(e.subject, "oracle")
158-
if err != nil {
159-
e.onError(common.TaskStateDead, errors.Wrap(err, "PutSourceType"))
160-
return
161-
}
162-
163-
e.logger.Info("src watch Nats")
164-
e.natsAddr, err = e.storeManager.SrcWatchNats(e.subject, e.shutdownCh, func(err error) {
165-
e.onError(common.TaskStateDead, err)
166-
})
167-
if err != nil {
168-
e.onError(common.TaskStateDead, errors.Wrap(err, "SrcWatchNats"))
169-
return
170-
}
171158
// init nats
172159
e.logger.Info("initNatsPubClient")
173160
e.logger.Debug("begin Connect nats server", "NatAddr", e.natsAddr)

0 commit comments

Comments
 (0)