Skip to content

Commit

Permalink
2018.10.10.-2019.1.30期间重要变更合并
Browse files Browse the repository at this point in the history
  • Loading branch information
elevenqq committed Jan 31, 2019
1 parent ebb596e commit 13b7cba
Show file tree
Hide file tree
Showing 39 changed files with 1,214 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class RDBMSMapping extends AbstractMapping {
toHDFS.put("double", "double");
toHDFS.put("text", "string");
toHDFS.put("clob", "string");
toHDFS.put("blob", "string");
toHDFS.put("blob", "binary");
toHDFS.put("decimal", "decimal");
toHDFS.put("bigint identity", "bigint");
toHDFS.put("bigint unsigned", "bigint");
Expand Down Expand Up @@ -90,7 +90,6 @@ public class RDBMSMapping extends AbstractMapping {
toES.put("date", "date");
toES.put("mediumtext","string");
toES.put("integer", "integer");
toES.put("decimal","string");
toES.put("longvarchar","string");
toES.put("datetime2","date");
toES.put("nvarchar","string");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public List<SyncNode> getSyncRelationTrees(Long mediaSourceId, String mediaName)
realMediaSourceId = mediaSourceId;
}
List<SyncNode> list = treesCache.getUnchecked(mediaName);
return list.stream().filter(i -> isDbInNodeTree(realMediaSourceId, i)).collect(Collectors.toList());
return list.stream().filter(i -> isDbInNodeTree(realMediaSourceId, mediaName, i, true)).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -493,18 +493,22 @@ private SyncNode findSyncNode(SyncNode syncNode, MediaSourceInfo mediaSourceInfo
return null;
}

private boolean isDbInNodeTree(Long mediaSourceId, SyncNode rootNode) {
private boolean isDbInNodeTree(Long mediaSourceId, String mediaName, SyncNode rootNode, Boolean isRootNode) {
if (mediaSourceId == null) {
return true;//如果没有指定具体数据源,则认为不需要过滤,直接返回true即可
}

if (rootNode.getMediaSource().getId().equals(mediaSourceId)) {
if (isRootNode) {
if (rootNode.getMediaSource().getId().equals(mediaSourceId) && (rootNode.getMappingInfo().getSourceMedia().getName().equalsIgnoreCase(mediaName) || rootNode.getMappingInfo().getSourceMedia().getName().equals("(.*)"))) {
return true;
}
} else if (rootNode.getMediaSource().getId().equals(mediaSourceId) && rootNode.getMappingInfo().getTargetMediaName().equalsIgnoreCase(mediaName)) {
return true;
}

if (rootNode.getChildren() != null && !rootNode.getChildren().isEmpty()) {
for (SyncNode node : rootNode.getChildren()) {
if (isDbInNodeTree(mediaSourceId, node)) {
if (isDbInNodeTree(mediaSourceId, mediaName, node, false)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.ucar.datalink.biz.utils;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.*;
import com.ucar.datalink.biz.service.MediaSourceService;
import com.ucar.datalink.common.errors.DatalinkException;
import com.ucar.datalink.domain.media.MediaSourceInfo;
Expand All @@ -17,6 +15,7 @@

import javax.sql.DataSource;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -30,7 +29,20 @@ public class DataSourceFactory {
private static final LoadingCache<MediaSourceInfo, DataSource> dataSources;

static {
dataSources = CacheBuilder.newBuilder().build(new CacheLoader<MediaSourceInfo, DataSource>() {
dataSources = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).removalListener(new RemovalListener<MediaSourceInfo, DataSource>() {
@Override
public void onRemoval(RemovalNotification<MediaSourceInfo, DataSource> notification) {
DataSource ds = notification.getValue();
if (ds instanceof org.apache.tomcat.jdbc.pool.DataSource) {
try {
((org.apache.tomcat.jdbc.pool.DataSource) ds).close();
logger.info("RemovalListener close datasource succeeded.");
} catch (Exception e) {
logger.error("RemovalListener close datasource failed, DataSource is " + ds, e);
}
}
}
}).build(new CacheLoader<MediaSourceInfo, DataSource>() {
@Override
public DataSource load(MediaSourceInfo sourceInfo) throws Exception {
MediaSrcParameter parameter = sourceInfo.getParameterObj();
Expand Down
3 changes: 0 additions & 3 deletions dl-biz/src/main/resources/biz/sqlmap/mapper-task.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@
<if test="leaderTaskId!=null">
LEADER_TASK_ID=#{leaderTaskId},
</if>
<if test="isLeaderTask!=null">
IS_LEADER_TASK=#{isLeaderTask},
</if>
MODIFY_TIME=now(),
</set>
where id = #{id}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.ucar.datalink.domain.event;

import com.ucar.datalink.common.event.CallbackEvent;
import com.ucar.datalink.common.utils.FutureCallback;
import com.ucar.datalink.domain.media.MediaSourceInfo;

/**
* Created by qianqian.shi on 2018/11/21.
*/
public class HBaseConfigClearEvent extends CallbackEvent {

private MediaSourceInfo mediaSourceInfo;

public HBaseConfigClearEvent(FutureCallback callback, MediaSourceInfo mediaSourceInfo) {
super(callback);
this.mediaSourceInfo = mediaSourceInfo;
}

public MediaSourceInfo getMediaSourceInfo() {
return mediaSourceInfo;
}

public void setMediaSourceInfo(MediaSourceInfo mediaSourceInfo) {
this.mediaSourceInfo = mediaSourceInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class MysqlReaderPosition extends Position {
private InetSocketAddress sourceAddress;
private Long slaveId;
protected Long timestamp;
private String latestEffectSyncLogFileName;
private Long latestEffectSyncLogFileOffset;

public Long getSlaveId() {
return slaveId;
Expand Down Expand Up @@ -73,4 +75,20 @@ public Long getTimestamp() {
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public String getLatestEffectSyncLogFileName() {
return latestEffectSyncLogFileName;
}

public void setLatestEffectSyncLogFileName(String latestEffectSyncLogFileName) {
this.latestEffectSyncLogFileName = latestEffectSyncLogFileName;
}

public Long getLatestEffectSyncLogFileOffset() {
return latestEffectSyncLogFileOffset;
}

public void setLatestEffectSyncLogFileOffset(Long latestEffectSyncLogFileOffset) {
this.latestEffectSyncLogFileOffset = latestEffectSyncLogFileOffset;
}
}
5 changes: 4 additions & 1 deletion dl-manager/dl-manager-core/src/main/bin/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ else
fi


echo "install cron"
echo "append manager cron script"
if [ ! -f "/var/spool/cron/root" ];then
touch /var/spool/cron/root
fi
grep "startup.sh" /var/spool/cron/root
if [ $? -ne 0 ];then
cron_asterisk="* * * * *"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ServerStatusMonitor {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
//暂时不支持动态显示切换,so do nothing
logger.info("received manager active node data change event.");
}

@Override
Expand All @@ -61,6 +62,7 @@ public void handleDataDeleted(String dataPath) throws Exception {
this.zkStateListener = new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
logger.info("received handle state change event , state is :" + state);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ private void handleGroupCoordinatorRequest(ChannelHandlerContext ctx, Request re

Node activeNode = coordinator.getActiveGroupCoordinator();
if (activeNode == null) {
logger.trace("Coordinator不存在");
sendResponse(ctx,
new Response(responseHeader, new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode())));
} else {
logger.trace("Coordinator存在,当前Coordinator是:{}", activeNode.toString());
sendResponse(ctx,
new Response(responseHeader, new GroupCoordinatorResponse(Errors.NONE.code(), activeNode)));
}
Expand Down Expand Up @@ -139,7 +141,12 @@ private void handleHeartbeatRequest(ChannelHandlerContext ctx, Request request)
HeartbeatResponse response = new HeartbeatResponse(errorCode);
logger.trace(String.format("Sending heartbeat response %s for correlation id %d to client %s.",
response, request.getHeader().correlationId(), request.getHeader().clientId()));
sendResponse(ctx, new Response(responseHeader, response));
//当前manager不是active-manager,则结束和client的通信
if(errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()){
ctx.getChannel().disconnect();
}else {
sendResponse(ctx, new Response(responseHeader, response));
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public String toReloadDB(String mediaSourceId) {
return "success";
}
for (ClusterState.MemberData mem : memberDatas) {
String url = "http://" + mem.getWorkerState().url() + "/flush/reloadMediaSource/" + mediaSourceId;
String url = "http://" + mem.getWorkerState().url() + "/flush/reloadHBase/" + mediaSourceId;
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity request = new HttpEntity(null, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,18 @@ public void initialize(TaskWriterContext context) {
);


// 对于该线程池来说,不能使用CallerRunsPolicy,会导致Future.get方法一直阻塞,进而导致Task始终无法关闭
// 相关原因可参考该链接:https://blog.csdn.net/zero__007/article/details/78915354
// 针对我们的场景,使用固定线程数+无界队列即可,所以把之前的CallerRunsPolicy去掉
executorService = new ThreadPoolExecutor(
parameter.getPoolSize(),
parameter.getPoolSize(),
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(parameter.getPoolSize() * 4),
new LinkedBlockingQueue<>(),
new NamedThreadFactory(
MessageFormat.format("Task-{0}-Writer-{1}-load", context.taskId(), parameter.getPluginName())
),
new ThreadPoolExecutor.CallerRunsPolicy());
));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ public interface PositionManager {
* @return
*/
<T extends Position> T getPosition(String taskId);

/**
* 如果存在的话,将某个任务的Position信息废弃,即:停止进行位点更新操作,避免因并发update导致的数据一致性问题
* @param taskId
*/
void discardPosition(String taskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,55 @@ public SqlServerTemplate(DbDialect dbDialect) {

@Override
public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames, String[] viewColumnNames) {

//判断是否有自增列且自增列不是主键
Table table = getDbDialect().findTable(schemaName, tableName);
Column[] columns = table.getAutoIncrementColumns();
Boolean flag = false;
if(columns != null){
for (Column column : columns){
if(!column.isPrimaryKey()){
flag = true;
break;
}
}
}
//有自增列,且不是主键
if(flag){
return getSpecialInsertSql(schemaName,tableName,pkNames,columnNames);
}
//正常
else {
return getMergeSql(schemaName,tableName,pkNames,columnNames);
}
}

//有自增列,且不是主键
public String getSpecialInsertSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
StringBuilder sql = new StringBuilder();
sql.append("set IDENTITY_INSERT ").append(tableName).append(" on; ");

sql.append("insert into " + getFullName(schemaName, tableName) + "(");
String[] allColumns = new String[pkNames.length + columnNames.length];
System.arraycopy(columnNames, 0, allColumns, 0, columnNames.length);
System.arraycopy(pkNames, 0, allColumns, columnNames.length, pkNames.length);

int size = allColumns.length;
for (int i = 0; i < size; i++) {
sql.append(appendEscape(allColumns[i])).append((i + 1 < size) ? "," : "");
}

sql.append(") values (");
appendColumnQuestions(sql, allColumns);
sql.append(");");

sql.append(" set IDENTITY_INSERT ").append(tableName).append(" off ;");

return sql.toString().intern();// intern优化,避免出现大量相同的字符串
}

//正常
public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
final String aliasA = "a";
final String aliasB = "b";
StringBuilder sql = new StringBuilder();
Expand Down
5 changes: 4 additions & 1 deletion dl-worker/dl-worker-core/src/main/bin/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ else
echo "worker conf("$worker_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
fi

echo "install cron"
echo "append worker cron script"
if [ ! -f "/var/spool/cron/root" ];then
touch /var/spool/cron/root
fi
grep "startup.sh" /var/spool/cron/root
if [ $? -ne 0 ];then
cron_asterisk="* * * * *"
Expand Down
Loading

0 comments on commit 13b7cba

Please sign in to comment.