Skip to content

Commit

Permalink
#483 code refactor; add it
Browse files Browse the repository at this point in the history
  • Loading branch information
heziai committed Sep 12, 2018
1 parent 2cc8a1d commit 0159dbb
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 73 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.common.collect.Maps;
import com.vip.saturn.job.basic.JobScheduler;
import com.vip.saturn.job.basic.SaturnConstant;
import com.vip.saturn.job.basic.SaturnExecutorContext;
import com.vip.saturn.job.exception.JobException;
import com.vip.saturn.job.exception.JobInitAlarmException;
import com.vip.saturn.job.internal.config.ConfigurationNode;
Expand All @@ -22,13 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;

import static com.vip.saturn.job.executor.SaturnExecutorService.WAIT_JOBCLASS_ADDED_COUNT;

Expand All @@ -48,10 +42,16 @@ public class InitNewJobService {

private List<String> jobNames = new ArrayList<>();

/**
* record the alarm message hashcode, permanently saved, used for just raising alarm one time for one type exception
*/
private static final ConcurrentMap<String, ConcurrentMap<String, Set<Integer>>> JOB_INIT_FAILED_RECORDS = new ConcurrentHashMap<>();

public InitNewJobService(SaturnExecutorService saturnExecutorService) {
this.saturnExecutorService = saturnExecutorService;
this.executorName = saturnExecutorService.getExecutorName();
this.regCenter = saturnExecutorService.getCoordinatorRegistryCenter();
JOB_INIT_FAILED_RECORDS.putIfAbsent(executorName, new ConcurrentHashMap<String, Set<Integer>>());
}

public void start() throws Exception {
Expand Down Expand Up @@ -159,18 +159,28 @@ public Map<String, Object> constructAlarmInfo(String namespace, String jobName,
return alarmInfo;
}

private void raiseAlarmForJobInitFailed(String jobName, String message) {
try {
String namespace = regCenter.getNamespace();
AlarmUtils.raiseAlarm(constructAlarmInfo(namespace, jobName, executorName, message), namespace);
} catch (Exception e) {
log.error("exception throws during raise alarm for job init fail", e);
private void raiseAlarmForJobInitFailed(String jobName, JobInitAlarmException jobInitAlarmException) {
String message = jobInitAlarmException.getMessage();
int messageHashCode = message.hashCode();
Set<Integer> records = JOB_INIT_FAILED_RECORDS.get(executorName).get(jobName);
if (!records.contains(messageHashCode)) {
try {
String namespace = regCenter.getNamespace();
AlarmUtils.raiseAlarm(constructAlarmInfo(namespace, jobName, executorName, message), namespace);
records.add(messageHashCode);
} catch (Exception e) {
log.error("exception throws during raise alarm for job init fail", e);
}
} else {
log.info(SaturnConstant.LOG_FORMAT, jobName,
"job initialize failed but will not raise alarm as such kind of alarm already been raise before");
}
}

private boolean initJobScheduler(String jobName) {
try {
log.info(SaturnConstant.LOG_FORMAT, jobName, "start to initialize the new job");
JOB_INIT_FAILED_RECORDS.get(executorName).putIfAbsent(jobName, new HashSet<Integer>());
JobConfiguration jobConfig = new JobConfiguration(regCenter, jobName);
if (jobConfig.getSaturnJobClass() == null) {
throw new JobException(
Expand All @@ -186,17 +196,12 @@ private boolean initJobScheduler(String jobName) {
JobScheduler scheduler = new JobScheduler(regCenter, jobConfig);
scheduler.setSaturnExecutorService(saturnExecutorService);
scheduler.init();
// clear previous records when initialize job successfully
JOB_INIT_FAILED_RECORDS.get(executorName).get(jobName).clear();
return true;
} catch (JobInitAlarmException e) {
// no need to log exception stack as it should be logged in the original happen place
String message = e.getMessage();
if (!SaturnExecutorContext.containsJobInitExceptionMessage(jobName, message)) {
raiseAlarmForJobInitFailed(jobName, message);
SaturnExecutorContext.putJobInitExceptionMessage(jobName, message);
} else {
log.info(SaturnConstant.LOG_FORMAT, jobName,
"job initialize failed but will not raise alarm as such kind of alarm already been raise before");
}
raiseAlarmForJobInitFailed(jobName, e);
} catch (Throwable t) {
log.warn(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName,
"job initialize failed, but will not stop the init process"), t);
Expand All @@ -207,4 +212,8 @@ private boolean initJobScheduler(String jobName) {

}

public static boolean containsJobInitFailedRecord(String executorName, String jobName, String message) {
return JOB_INIT_FAILED_RECORDS.get(executorName).get(jobName).contains(message.hashCode());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.vip.saturn.it.job.InitMsgJobFail;

import com.vip.saturn.job.AbstractSaturnJavaJob;
import com.vip.saturn.job.SaturnJobExecutionContext;
import com.vip.saturn.job.SaturnJobReturn;

public class InitSuccessfullyJob extends AbstractSaturnJavaJob {

public static Object getObject() {
return new InitSuccessfullyJob();
}

@Override
public SaturnJobReturn handleJavaJob(String jobName, Integer shardItem, String shardParam,
SaturnJobExecutionContext shardingContext) throws InterruptedException {
return new SaturnJobReturn();
}

}
131 changes: 117 additions & 14 deletions saturn-it/src/test/java/com/vip/saturn/it/impl/InitJobFailAlarmIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@

import com.vip.saturn.it.AbstractSaturnIT;
import com.vip.saturn.it.JobType;
import com.vip.saturn.it.job.InitMsgJobFail.InitFailOfDefaultConstructorJob;
import com.vip.saturn.it.job.InitMsgJobFail.InitFailOfErrorJob;
import com.vip.saturn.it.job.InitMsgJobFail.InitFailOfGetObjectJob;
import com.vip.saturn.it.job.InitMsgJobFail.InitFailOfRuntimeExceptionJob;
import com.vip.saturn.job.basic.SaturnExecutorContext;
import com.vip.saturn.it.job.InitMsgJobFail.*;
import com.vip.saturn.job.executor.InitNewJobService;
import com.vip.saturn.job.internal.config.JobConfiguration;
import org.junit.*;
import org.junit.runners.MethodSorters;
Expand Down Expand Up @@ -34,7 +31,7 @@ public void after() throws Exception {

@Test
public void testA_InitFailOfGetObjectJob() throws Exception {
startOneNewExecutorList();
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testA_InitFailOfGetObjectJob");
jobConfiguration.setCron("0/2 * * * * ?");
Expand All @@ -44,7 +41,7 @@ public void testA_InitFailOfGetObjectJob() throws Exception {
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(SaturnExecutorContext.containsJobInitExceptionMessage(jobConfiguration.getJobName(),
assertThat(InitNewJobService.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(),
"java.lang.ArithmeticException: / by zero")).isTrue();
removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
Expand All @@ -53,7 +50,7 @@ public void testA_InitFailOfGetObjectJob() throws Exception {

@Test
public void testB_InitFailOfDefaultConstructorJob() throws Exception {
startOneNewExecutorList();
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testB_InitFailOfDefaultConstructorJob");
jobConfiguration.setCron("0/2 * * * * ?");
Expand All @@ -63,7 +60,7 @@ public void testB_InitFailOfDefaultConstructorJob() throws Exception {
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(SaturnExecutorContext.containsJobInitExceptionMessage(jobConfiguration.getJobName(),
assertThat(InitNewJobService.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(),
"java.lang.ArithmeticException: / by zero")).isTrue();
removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
Expand All @@ -72,7 +69,7 @@ public void testB_InitFailOfDefaultConstructorJob() throws Exception {

@Test
public void testC_InitFailOfRuntimeExceptionJob() throws Exception {
startOneNewExecutorList();
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testC_InitFailOfRuntimeExceptionJob");
jobConfiguration.setCron("0/2 * * * * ?");
Expand All @@ -82,7 +79,7 @@ public void testC_InitFailOfRuntimeExceptionJob() throws Exception {
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(SaturnExecutorContext.containsJobInitExceptionMessage(jobConfiguration.getJobName(),
assertThat(InitNewJobService.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(),
"java.lang.RuntimeException: RuntimeException!!!")).isTrue();
removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
Expand All @@ -91,7 +88,7 @@ public void testC_InitFailOfRuntimeExceptionJob() throws Exception {

@Test
public void testD_InitFailOfErrorJob() throws Exception {
startOneNewExecutorList();
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testD_InitFailOfErrorJob");
jobConfiguration.setCron("0/2 * * * * ?");
Expand All @@ -101,8 +98,114 @@ public void testD_InitFailOfErrorJob() throws Exception {
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(SaturnExecutorContext
.containsJobInitExceptionMessage(jobConfiguration.getJobName(), "java.lang.Error: Error!!!")).isTrue();
assertThat(InitNewJobService
.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(), "java.lang.Error: Error!!!"))
.isTrue();
removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
forceRemoveJob(jobConfiguration.getJobName());
}

@Test
public void testE_ClassNotFoundException() throws Exception {
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testE_ClassNotFoundException");
jobConfiguration.setCron("0/2 * * * * ?");
jobConfiguration.setJobType(JobType.JAVA_JOB.toString());
jobConfiguration.setJobClass("WhoAmI");
jobConfiguration.setShardingTotalCount(1);
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(InitNewJobService.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(),
"java.lang.ClassNotFoundException: WhoAmI")).isTrue();
removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
forceRemoveJob(jobConfiguration.getJobName());
}

@Test
public void testF_jobClassIsNotSet() throws Exception {
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testF_jobClassIsNotSet");
jobConfiguration.setCron("0/2 * * * * ?");
jobConfiguration.setJobType(JobType.JAVA_JOB.toString());
jobConfiguration.setJobClass("");
jobConfiguration.setShardingTotalCount(1);
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(InitNewJobService
.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(), "jobClass is not set"))
.isTrue();
removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
forceRemoveJob(jobConfiguration.getJobName());
}

@Test
public void testG_multiRecord() throws Exception {
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testG_multiRecord");
jobConfiguration.setCron("0/2 * * * * ?");
jobConfiguration.setJobType(JobType.JAVA_JOB.toString());
jobConfiguration.setJobClass("");
jobConfiguration.setShardingTotalCount(1);
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(InitNewJobService
.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(), "jobClass is not set"))
.isTrue();

removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
forceRemoveJob(jobConfiguration.getJobName());

jobConfiguration.setJobClass("WhoAmI");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(InitNewJobService
.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(), "jobClass is not set"))
.isTrue();
assertThat(InitNewJobService.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(),
"java.lang.ClassNotFoundException: WhoAmI")).isTrue();

removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
forceRemoveJob(jobConfiguration.getJobName());
}

@Test
public void testH_clearRecord() throws Exception {
String executorName = startOneNewExecutorList().getExecutorName();
Thread.sleep(1000);
JobConfiguration jobConfiguration = new JobConfiguration("testH_clearRecord");
jobConfiguration.setCron("0/2 * * * * ?");
jobConfiguration.setJobType(JobType.JAVA_JOB.toString());
jobConfiguration.setJobClass("");
jobConfiguration.setShardingTotalCount(1);
jobConfiguration.setShardingItemParameters("0=0");
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(InitNewJobService
.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(), "jobClass is not set"))
.isTrue();

removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
forceRemoveJob(jobConfiguration.getJobName());

jobConfiguration.setJobClass(InitSuccessfullyJob.class.getCanonicalName());
addJob(jobConfiguration);
Thread.sleep(1000);
assertThat(InitNewJobService
.containsJobInitFailedRecord(executorName, jobConfiguration.getJobName(), "jobClass is not set"))
.isFalse();

removeJob(jobConfiguration.getJobName());
Thread.sleep(1000);
forceRemoveJob(jobConfiguration.getJobName());
Expand Down

0 comments on commit 0159dbb

Please sign in to comment.