Skip to content

Commit

Permalink
无主键索引表,增量功能支持。
Browse files Browse the repository at this point in the history
  • Loading branch information
dotaalready committed Oct 19, 2017
1 parent 2fd0dc1 commit f6e31fa
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 149 deletions.
313 changes: 171 additions & 142 deletions src/main/java/com/taobao/yugong/common/model/YuGongContext.java
Original file line number Diff line number Diff line change
@@ -1,142 +1,171 @@
package com.taobao.yugong.common.model;

import javax.sql.DataSource;

import com.taobao.yugong.common.db.meta.Table;
import com.taobao.yugong.common.model.position.Position;

/**
* yugong数据处理上下文
*
* @author agapple 2013-9-12 下午5:04:57
*/
public class YuGongContext {

// 具体每张表的同步
private Position lastPosition; // 最后一次同步的position记录
private Table tableMeta; // 对应的meta
private boolean ignoreSchema = false; // 同步时是否忽略schema,oracle迁移到mysql可能schema不同,可设置为忽略

// 全局共享
private RunMode runMode;
private int onceCrawNum; // 每次提取的记录数
private int tpsLimit = 0; // <=0代表不限制
private DataSource sourceDs; // 源数据库链接
private DataSource targetDs; // 目标数据库链接
private boolean batchApply = false;
private boolean skipApplierException = false; // 是否允许跳过applier异常
private String sourceEncoding = "UTF-8";
private String targetEncoding = "UTF-8";

public Position getLastPosition() {
return lastPosition;
}

public void setLastPosition(Position lastPosition) {
this.lastPosition = lastPosition;
}

public int getOnceCrawNum() {
return onceCrawNum;
}

public void setOnceCrawNum(int onceCrawNum) {
this.onceCrawNum = onceCrawNum;
}

public DataSource getSourceDs() {
return sourceDs;
}

public void setSourceDs(DataSource sourceDs) {
this.sourceDs = sourceDs;
}

public DataSource getTargetDs() {
return targetDs;
}

public void setTargetDs(DataSource targetDs) {
this.targetDs = targetDs;
}

public boolean isBatchApply() {
return batchApply;
}

public void setBatchApply(boolean batchApply) {
this.batchApply = batchApply;
}

public String getSourceEncoding() {
return sourceEncoding;
}

public void setSourceEncoding(String sourceEncoding) {
this.sourceEncoding = sourceEncoding;
}

public String getTargetEncoding() {
return targetEncoding;
}

public void setTargetEncoding(String targetEncoding) {
this.targetEncoding = targetEncoding;
}

public Table getTableMeta() {
return tableMeta;
}

public void setTableMeta(Table tableMeta) {
this.tableMeta = tableMeta;
}

public int getTpsLimit() {
return tpsLimit;
}

public void setTpsLimit(int tpsLimit) {
this.tpsLimit = tpsLimit;
}

public RunMode getRunMode() {
return runMode;
}

public void setRunMode(RunMode runMode) {
this.runMode = runMode;
}

public boolean isIgnoreSchema() {
return ignoreSchema;
}

public void setIgnoreSchema(boolean ignoreSchema) {
this.ignoreSchema = ignoreSchema;
}

public boolean isSkipApplierException() {
return skipApplierException;
}

public void setSkipApplierException(boolean skipApplierException) {
this.skipApplierException = skipApplierException;
}

public YuGongContext cloneGlobalContext() {
YuGongContext context = new YuGongContext();
context.setRunMode(runMode);
context.setBatchApply(batchApply);
context.setSourceDs(sourceDs);
context.setTargetDs(targetDs);
context.setSourceEncoding(sourceEncoding);
context.setTargetEncoding(targetEncoding);
context.setOnceCrawNum(onceCrawNum);
context.setTpsLimit(tpsLimit);
context.setIgnoreSchema(ignoreSchema);
context.setSkipApplierException(skipApplierException);
return context;
}

}
package com.taobao.yugong.common.model;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import com.taobao.yugong.common.db.meta.Table;
import com.taobao.yugong.common.model.position.Position;

/**
* yugong数据处理上下文
*
* @author agapple 2013-9-12 下午5:04:57
*/
public class YuGongContext {

// 具体每张表的同步
private Position lastPosition; // 最后一次同步的position记录
private Table tableMeta; // 对应的meta
private boolean ignoreSchema = false; // 同步时是否忽略schema,oracle迁移到mysql可能schema不同,可设置为忽略

// 全局共享
private RunMode runMode;
private int onceCrawNum; // 每次提取的记录数
private int tpsLimit = 0; // <=0代表不限制
private DataSource sourceDs; // 源数据库链接
private DataSource targetDs; // 目标数据库链接
private boolean batchApply = false;
private boolean skipApplierException = false; // 是否允许跳过applier异常
private String sourceEncoding = "UTF-8";
private String targetEncoding = "UTF-8";



private String mViewLogType="";// 创建物化视图日志的类型 PK主键 默认。 或ROWID

private Map<String,String[]> tablepks=new HashMap(); //没有主键 实时同步时指定的判断字段



public Map<String, String[]> getTablepks() {
return tablepks;
}

public void setTablepks(Map<String, String[]> tablepks) {
this.tablepks = tablepks;
}

public String getmViewLogType() {
return mViewLogType;
}

public void setmViewLogType(String mViewLogType) {
this.mViewLogType = mViewLogType;
}

public Position getLastPosition() {
return lastPosition;
}

public void setLastPosition(Position lastPosition) {
this.lastPosition = lastPosition;
}

public int getOnceCrawNum() {
return onceCrawNum;
}

public void setOnceCrawNum(int onceCrawNum) {
this.onceCrawNum = onceCrawNum;
}

public DataSource getSourceDs() {
return sourceDs;
}

public void setSourceDs(DataSource sourceDs) {
this.sourceDs = sourceDs;
}

public DataSource getTargetDs() {
return targetDs;
}

public void setTargetDs(DataSource targetDs) {
this.targetDs = targetDs;
}

public boolean isBatchApply() {
return batchApply;
}

public void setBatchApply(boolean batchApply) {
this.batchApply = batchApply;
}

public String getSourceEncoding() {
return sourceEncoding;
}

public void setSourceEncoding(String sourceEncoding) {
this.sourceEncoding = sourceEncoding;
}

public String getTargetEncoding() {
return targetEncoding;
}

public void setTargetEncoding(String targetEncoding) {
this.targetEncoding = targetEncoding;
}

public Table getTableMeta() {
return tableMeta;
}

public void setTableMeta(Table tableMeta) {
this.tableMeta = tableMeta;
}

public int getTpsLimit() {
return tpsLimit;
}

public void setTpsLimit(int tpsLimit) {
this.tpsLimit = tpsLimit;
}

public RunMode getRunMode() {
return runMode;
}

public void setRunMode(RunMode runMode) {
this.runMode = runMode;
}

public boolean isIgnoreSchema() {
return ignoreSchema;
}

public void setIgnoreSchema(boolean ignoreSchema) {
this.ignoreSchema = ignoreSchema;
}

public boolean isSkipApplierException() {
return skipApplierException;
}

public void setSkipApplierException(boolean skipApplierException) {
this.skipApplierException = skipApplierException;
}

public YuGongContext cloneGlobalContext() {
YuGongContext context = new YuGongContext();
context.setRunMode(runMode);
context.setBatchApply(batchApply);
context.setSourceDs(sourceDs);
context.setTargetDs(targetDs);
context.setSourceEncoding(sourceEncoding);
context.setTargetEncoding(targetEncoding);
context.setOnceCrawNum(onceCrawNum);
context.setTpsLimit(tpsLimit);
context.setIgnoreSchema(ignoreSchema);
context.setSkipApplierException(skipApplierException);
context.setmViewLogType(mViewLogType);
context.setTablepks(tablepks);
return context;
}

}
26 changes: 25 additions & 1 deletion src/main/java/com/taobao/yugong/controller/YuGongController.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -458,9 +460,31 @@ private YuGongContext initGlobalContext() {
context.setIgnoreSchema(config.getBoolean("yugong.table.ignoreSchema", false));
context.setSkipApplierException(config.getBoolean("yugong.table.skipApplierException", false));
context.setRunMode(runMode);
context.setmViewLogType(config.getString("yugong.table.inc.mviewlogtype","PK"));
context.setTablepks(getTablePKs(config.getString("yugong.table.inc.tablepks")));
return context;
}


private Map<String,String[]> getTablePKs(String tablepks){
if(StringUtils.isBlank(tablepks)){
return null;
}else{
Map<String,String[]> tps=new HashMap();
String[] tables=tablepks.split("\\|");
for(String table:tables){
String[] tablev=table.split("&");
String tableName=tablev[0];
String[] pks=new String[tablev.length-1];
for(int i=1;i<tablev.length;i++){
pks[i-1]=new String(tablev[i]).toUpperCase().toString();
}
tps.put(new String(tableName).toUpperCase().toString(), pks);
}
return tps;
}
}


private DataSource initDataSource(String type) {
String username = config.getString("yugong.database." + type + ".username");
String password = config.getString("yugong.database." + type + ".password");
Expand Down
Loading

0 comments on commit f6e31fa

Please sign in to comment.