Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

无主键索引表的增量同步 #61

Merged
merged 2 commits into from
Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 171 additions & 142 deletions src/main/java/com/taobao/yugong/common/model/YuGongContext.java
Original file line number Diff line number Diff line change
@@ -1,142 +1,171 @@
package com.taobao.yugong.common.model;

import javax.sql.DataSource;

import com.taobao.yugong.common.db.meta.Table;
import com.taobao.yugong.common.model.position.Position;

/**
* yugong数据处理上下文
*
* @author agapple 2013-9-12 下午5:04:57
*/
public class YuGongContext {

// 具体每张表的同步
private Position lastPosition; // 最后一次同步的position记录
private Table tableMeta; // 对应的meta
private boolean ignoreSchema = false; // 同步时是否忽略schema,oracle迁移到mysql可能schema不同,可设置为忽略

// 全局共享
private RunMode runMode;
private int onceCrawNum; // 每次提取的记录数
private int tpsLimit = 0; // <=0代表不限制
private DataSource sourceDs; // 源数据库链接
private DataSource targetDs; // 目标数据库链接
private boolean batchApply = false;
private boolean skipApplierException = false; // 是否允许跳过applier异常
private String sourceEncoding = "UTF-8";
private String targetEncoding = "UTF-8";

public Position getLastPosition() {
return lastPosition;
}

public void setLastPosition(Position lastPosition) {
this.lastPosition = lastPosition;
}

public int getOnceCrawNum() {
return onceCrawNum;
}

public void setOnceCrawNum(int onceCrawNum) {
this.onceCrawNum = onceCrawNum;
}

public DataSource getSourceDs() {
return sourceDs;
}

public void setSourceDs(DataSource sourceDs) {
this.sourceDs = sourceDs;
}

public DataSource getTargetDs() {
return targetDs;
}

public void setTargetDs(DataSource targetDs) {
this.targetDs = targetDs;
}

public boolean isBatchApply() {
return batchApply;
}

public void setBatchApply(boolean batchApply) {
this.batchApply = batchApply;
}

public String getSourceEncoding() {
return sourceEncoding;
}

public void setSourceEncoding(String sourceEncoding) {
this.sourceEncoding = sourceEncoding;
}

public String getTargetEncoding() {
return targetEncoding;
}

public void setTargetEncoding(String targetEncoding) {
this.targetEncoding = targetEncoding;
}

public Table getTableMeta() {
return tableMeta;
}

public void setTableMeta(Table tableMeta) {
this.tableMeta = tableMeta;
}

public int getTpsLimit() {
return tpsLimit;
}

public void setTpsLimit(int tpsLimit) {
this.tpsLimit = tpsLimit;
}

public RunMode getRunMode() {
return runMode;
}

public void setRunMode(RunMode runMode) {
this.runMode = runMode;
}

public boolean isIgnoreSchema() {
return ignoreSchema;
}

public void setIgnoreSchema(boolean ignoreSchema) {
this.ignoreSchema = ignoreSchema;
}

public boolean isSkipApplierException() {
return skipApplierException;
}

public void setSkipApplierException(boolean skipApplierException) {
this.skipApplierException = skipApplierException;
}

public YuGongContext cloneGlobalContext() {
YuGongContext context = new YuGongContext();
context.setRunMode(runMode);
context.setBatchApply(batchApply);
context.setSourceDs(sourceDs);
context.setTargetDs(targetDs);
context.setSourceEncoding(sourceEncoding);
context.setTargetEncoding(targetEncoding);
context.setOnceCrawNum(onceCrawNum);
context.setTpsLimit(tpsLimit);
context.setIgnoreSchema(ignoreSchema);
context.setSkipApplierException(skipApplierException);
return context;
}

}
package com.taobao.yugong.common.model;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import com.taobao.yugong.common.db.meta.Table;
import com.taobao.yugong.common.model.position.Position;

/**
* yugong数据处理上下文
*
* @author agapple 2013-9-12 下午5:04:57
*/
public class YuGongContext {

// 具体每张表的同步
private Position lastPosition; // 最后一次同步的position记录
private Table tableMeta; // 对应的meta
private boolean ignoreSchema = false; // 同步时是否忽略schema,oracle迁移到mysql可能schema不同,可设置为忽略

// 全局共享
private RunMode runMode;
private int onceCrawNum; // 每次提取的记录数
private int tpsLimit = 0; // <=0代表不限制
private DataSource sourceDs; // 源数据库链接
private DataSource targetDs; // 目标数据库链接
private boolean batchApply = false;
private boolean skipApplierException = false; // 是否允许跳过applier异常
private String sourceEncoding = "UTF-8";
private String targetEncoding = "UTF-8";



private String mViewLogType="";// 创建物化视图日志的类型 PK主键 默认。 或ROWID

private Map<String,String[]> tablepks=new HashMap(); //没有主键 实时同步时指定的判断字段



public Map<String, String[]> getTablepks() {
return tablepks;
}

public void setTablepks(Map<String, String[]> tablepks) {
this.tablepks = tablepks;
}

public String getmViewLogType() {
return mViewLogType;
}

public void setmViewLogType(String mViewLogType) {
this.mViewLogType = mViewLogType;
}

public Position getLastPosition() {
return lastPosition;
}

public void setLastPosition(Position lastPosition) {
this.lastPosition = lastPosition;
}

public int getOnceCrawNum() {
return onceCrawNum;
}

public void setOnceCrawNum(int onceCrawNum) {
this.onceCrawNum = onceCrawNum;
}

public DataSource getSourceDs() {
return sourceDs;
}

public void setSourceDs(DataSource sourceDs) {
this.sourceDs = sourceDs;
}

public DataSource getTargetDs() {
return targetDs;
}

public void setTargetDs(DataSource targetDs) {
this.targetDs = targetDs;
}

public boolean isBatchApply() {
return batchApply;
}

public void setBatchApply(boolean batchApply) {
this.batchApply = batchApply;
}

public String getSourceEncoding() {
return sourceEncoding;
}

public void setSourceEncoding(String sourceEncoding) {
this.sourceEncoding = sourceEncoding;
}

public String getTargetEncoding() {
return targetEncoding;
}

public void setTargetEncoding(String targetEncoding) {
this.targetEncoding = targetEncoding;
}

public Table getTableMeta() {
return tableMeta;
}

public void setTableMeta(Table tableMeta) {
this.tableMeta = tableMeta;
}

public int getTpsLimit() {
return tpsLimit;
}

public void setTpsLimit(int tpsLimit) {
this.tpsLimit = tpsLimit;
}

public RunMode getRunMode() {
return runMode;
}

public void setRunMode(RunMode runMode) {
this.runMode = runMode;
}

public boolean isIgnoreSchema() {
return ignoreSchema;
}

public void setIgnoreSchema(boolean ignoreSchema) {
this.ignoreSchema = ignoreSchema;
}

public boolean isSkipApplierException() {
return skipApplierException;
}

public void setSkipApplierException(boolean skipApplierException) {
this.skipApplierException = skipApplierException;
}

public YuGongContext cloneGlobalContext() {
YuGongContext context = new YuGongContext();
context.setRunMode(runMode);
context.setBatchApply(batchApply);
context.setSourceDs(sourceDs);
context.setTargetDs(targetDs);
context.setSourceEncoding(sourceEncoding);
context.setTargetEncoding(targetEncoding);
context.setOnceCrawNum(onceCrawNum);
context.setTpsLimit(tpsLimit);
context.setIgnoreSchema(ignoreSchema);
context.setSkipApplierException(skipApplierException);
context.setmViewLogType(mViewLogType);
context.setTablepks(tablepks);
return context;
}

}
26 changes: 25 additions & 1 deletion src/main/java/com/taobao/yugong/controller/YuGongController.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -458,9 +460,31 @@ private YuGongContext initGlobalContext() {
context.setIgnoreSchema(config.getBoolean("yugong.table.ignoreSchema", false));
context.setSkipApplierException(config.getBoolean("yugong.table.skipApplierException", false));
context.setRunMode(runMode);
context.setmViewLogType(config.getString("yugong.table.inc.mviewlogtype","PK"));
context.setTablepks(getTablePKs(config.getString("yugong.table.inc.tablepks")));
return context;
}


private Map<String,String[]> getTablePKs(String tablepks){
if(StringUtils.isBlank(tablepks)){
return null;
}else{
Map<String,String[]> tps=new HashMap();
String[] tables=tablepks.split("\\|");
for(String table:tables){
String[] tablev=table.split("&");
String tableName=tablev[0];
String[] pks=new String[tablev.length-1];
for(int i=1;i<tablev.length;i++){
pks[i-1]=new String(tablev[i]).toUpperCase().toString();
}
tps.put(new String(tableName).toUpperCase().toString(), pks);
}
return tps;
}
}


private DataSource initDataSource(String type) {
String username = config.getString("yugong.database." + type + ".username");
String password = config.getString("yugong.database." + type + ".password");
Expand Down
Loading