Skip to content

Commit

Permalink
#396 不适用自定义的retry机制,依赖curator的retry重试
Browse files Browse the repository at this point in the history
  • Loading branch information
kfchu committed Apr 28, 2018
1 parent 42a29c3 commit 21ace27
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import com.vip.saturn.job.internal.storage.JobNodePath;
import com.vip.saturn.job.trigger.SaturnScheduler;
import com.vip.saturn.job.trigger.SaturnTrigger;
import com.vip.saturn.job.utils.RetriableTask;
import com.vip.saturn.job.utils.RetryCallable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.quartz.SchedulerException;
Expand Down Expand Up @@ -195,8 +193,8 @@ private boolean checkIfZkLostAfterExecution(final Integer item) {
CuratorFramework curatorFramework = (CuratorFramework) executionService.getCoordinatorRegistryCenter().getRawClient();
try {
String runningPath = JobNodePath.getNodeFullPath(jobName, ExecutionNode.getRunningNode(item));
Stat itemStat = tryTheBestToGetItemStat(curatorFramework, runningPath);
long sessionId = tryTheBestToGetSessionId(curatorFramework);
Stat itemStat = curatorFramework.checkExists().forPath(runningPath);
long sessionId = curatorFramework.getZookeeperClient().getZooKeeper().getSessionId();
// 有itemStat的情况
if (itemStat != null) {
long ephemeralOwner = itemStat.getEphemeralOwner();
Expand All @@ -217,28 +215,6 @@ private boolean checkIfZkLostAfterExecution(final Integer item) {
}
}

private long tryTheBestToGetSessionId(final CuratorFramework curatorFramework) throws Exception {
RetriableTask<Long> retriableTask = new RetriableTask<>(new RetryCallable<Long>() {
@Override
public Long call() throws Exception {
return curatorFramework.getZookeeperClient().getZooKeeper().getSessionId();
}
});

return retriableTask.call();
}

private Stat tryTheBestToGetItemStat(final CuratorFramework curatorFramework, final String runningPath) throws Exception {
RetriableTask<Stat> retriableTask = new RetriableTask<>(new RetryCallable<Stat>() {
@Override
public Stat call() throws Exception {
return curatorFramework.checkExists().forPath(runningPath);
}
});

return retriableTask.call();
}

protected abstract void executeJob(final JobExecutionMultipleShardingContext shardingContext);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.vip.saturn.job.internal.control.ReportService;
import com.vip.saturn.job.internal.failover.FailoverNode;
import com.vip.saturn.job.reg.exception.RegException;
import com.vip.saturn.job.utils.RetriableTask;
import com.vip.saturn.job.utils.RetryCallable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -160,8 +158,8 @@ public void registerJobCompletedReportInfoByItem(
* 注册作业完成信息.
*
*/
public void registerJobCompletedControlInfoByItem(final JobExecutionMultipleShardingContext jobExecutionShardingContext, int item)
throws Exception {
public void registerJobCompletedControlInfoByItem(
final JobExecutionMultipleShardingContext jobExecutionShardingContext, int item) {

boolean isEnabledReport = configService.isEnabledReport();
if (!isEnabledReport) {
Expand All @@ -172,19 +170,7 @@ public void registerJobCompletedControlInfoByItem(final JobExecutionMultipleShar
// create completed node
createCompletedNode(item);
// remove running node
tryTheBestToRemoveRunningNode(item);
}

private void tryTheBestToRemoveRunningNode(final int item) throws Exception {
RetriableTask<Void> retriableTask = new RetriableTask<>(new RetryCallable<Void>() {
@Override
public Void call() throws Exception {
getJobNodeStorage().removeJobNodeIfExisted(ExecutionNode.getRunningNode(item));
return null;
}
});

retriableTask.call();
getJobNodeStorage().removeJobNodeIfExisted(ExecutionNode.getRunningNode(item));
}

private void createCompletedNode(int item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.vip.saturn.job.internal.storage.JobNodePath;
import com.vip.saturn.job.internal.storage.LeaderExecutionCallback;
import com.vip.saturn.job.sharding.node.SaturnExecutorsNode;
import com.vip.saturn.job.utils.RetriableTask;
import com.vip.saturn.job.utils.RetryCallable;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,16 +94,8 @@ && getJobNodeStorage().isJobNodeExisted(ConfigurationNode.ENABLED)
*
* @param item 执行完毕失效转移的分片项列表
*/
public void updateFailoverComplete(final Integer item) throws Exception {
RetriableTask<Void> retriableTask = new RetriableTask<>(new RetryCallable<Void>() {
@Override
public Void call() throws Exception {
getJobNodeStorage().removeJobNodeIfExisted(FailoverNode.getExecutionFailoverNode(item));
return null;
}
});

retriableTask.call();
public void updateFailoverComplete(final Integer item) {
getJobNodeStorage().removeJobNodeIfExisted(FailoverNode.getExecutionFailoverNode(item));
}

/**
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 21ace27

Please sign in to comment.