Skip to content

Commit

Permalink
Implement .getTriggerState
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed May 25, 2012
1 parent 36542dd commit a71b6a1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 24 deletions.
91 changes: 69 additions & 22 deletions src/main/java/com/novemberain/quartz/mongodb/MongoDBJobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ public class MongoDBJobStore implements JobStore {
private static final String LOCK_INSTANCE_ID = "instanceId";
private static final String LOCK_TIME = "time";

private static final String WAITING_TRIGGER_STATE = "waiting";
private static final String ACQUIRED_TRIGGER_STATE = "acquired";
private static final String PAUSED_TRIGGER_STATE = "paused";
private static final String BLOCKED_TRIGGER_STATE = "blocked";
public static final String STATE_WAITING = "waiting";
public static final String STATE_DELETED = "deleted";
public static final String STATE_COMPLETE = "complete";
public static final String STATE_PAUSED = "paused";
public static final String STATE_PAUSED_BLOCKED = "pausedBlocked";
public static final String STATE_BLOCKED = "blocked";
public static final String STATE_ERROR = "error";

private Mongo mongo;
private String collectionPrefix = "quartz_";
Expand Down Expand Up @@ -149,7 +152,7 @@ public void storeJobsAndTriggers(Map<JobDetail, List<Trigger>> triggersAndJobs,
}

public boolean removeJob(JobKey jobKey) throws JobPersistenceException {
BasicDBObject keyObject = keyAsDBObject(jobKey);
BasicDBObject keyObject = keyToDBObject(jobKey);
DBCursor find = jobCollection.find(keyObject);
while (find.hasNext()) {
DBObject jobObj = find.next();
Expand All @@ -170,7 +173,7 @@ public boolean removeJobs(List<JobKey> jobKeys) throws JobPersistenceException {
}

public JobDetail retrieveJob(JobKey jobKey) throws JobPersistenceException {
DBObject dbObject = findJobByKey(jobKey);
DBObject dbObject = findJobDocumentByKey(jobKey);

try {
Class<Job> jobClass = (Class<Job>) getJobClassLoader().loadClass((String) dbObject.get(JOB_CLASS));
Expand Down Expand Up @@ -202,7 +205,7 @@ public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting) th
throw new JobPersistenceException("Trigger must be associated with a job. Please specify a JobKey.");
}

DBObject dbObject = jobCollection.findOne(keyAsDBObject(newTrigger.getJobKey()));
DBObject dbObject = jobCollection.findOne(keyToDBObject(newTrigger.getJobKey()));
if (dbObject != null) {
storeTrigger(newTrigger, (ObjectId) dbObject.get("_id"), replaceExisting);
} else {
Expand All @@ -211,7 +214,7 @@ public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting) th
}

public boolean removeTrigger(TriggerKey triggerKey) throws JobPersistenceException {
BasicDBObject dbObject = keyAsDBObject(triggerKey);
BasicDBObject dbObject = keyToDBObject(triggerKey);
DBCursor find = triggerCollection.find(dbObject);
if (find.count() > 0) {
triggerCollection.remove(dbObject);
Expand All @@ -236,19 +239,19 @@ public boolean replaceTrigger(TriggerKey triggerKey, OperableTrigger newTrigger)
}

public OperableTrigger retrieveTrigger(TriggerKey triggerKey) throws JobPersistenceException {
DBObject dbObject = triggerCollection.findOne(keyAsDBObject(triggerKey));
DBObject dbObject = triggerCollection.findOne(keyToDBObject(triggerKey));
if (dbObject == null) {
return null;
}
return toTrigger(triggerKey, dbObject);
}

public boolean checkExists(JobKey jobKey) throws JobPersistenceException {
return jobCollection.find(keyAsDBObject(jobKey)).count() > 0;
return jobCollection.find(keyToDBObject(jobKey)).count() > 0;
}

public boolean checkExists(TriggerKey triggerKey) throws JobPersistenceException {
return triggerCollection.find(keyAsDBObject(triggerKey)).count() > 0;
return triggerCollection.find(keyToDBObject(triggerKey)).count() > 0;
}

public void clearAllSchedulingData() throws JobPersistenceException {
Expand Down Expand Up @@ -284,6 +287,7 @@ public boolean removeCalendar(String calName) throws JobPersistenceException {
}

public Calendar retrieveCalendar(String calName) throws JobPersistenceException {
// TODO
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -342,7 +346,7 @@ public List<String> getCalendarNames() throws JobPersistenceException {
}

public List<OperableTrigger> getTriggersForJob(JobKey jobKey) throws JobPersistenceException {
DBObject dbObject = findJobByKey(jobKey);
DBObject dbObject = findJobDocumentByKey(jobKey);

List<OperableTrigger> triggers = new ArrayList<OperableTrigger>();
DBCursor cursor = triggerCollection.find(new BasicDBObject(TRIGGER_JOB_ID, dbObject.get("_id")));
Expand All @@ -354,13 +358,13 @@ public List<OperableTrigger> getTriggersForJob(JobKey jobKey) throws JobPersiste
}

public TriggerState getTriggerState(TriggerKey triggerKey) throws JobPersistenceException {
// TODO
throw new UnsupportedOperationException();
DBObject doc = findTriggerDocumentByKey(triggerKey);

return triggerStateForValue((String) doc.get(TRIGGER_STATE));
}

public void pauseTrigger(TriggerKey triggerKey) throws JobPersistenceException {
// TODO
throw new UnsupportedOperationException();
triggerCollection.update(keyToDBObject(triggerKey), updateThatSetsTriggerStateTo(STATE_PAUSED));
}

public Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher) throws JobPersistenceException {
Expand Down Expand Up @@ -813,7 +817,7 @@ private void ensureIndexes() {

protected void storeTrigger(OperableTrigger newTrigger, ObjectId jobId, boolean replaceExisting) throws ObjectAlreadyExistsException {
BasicDBObject trigger = new BasicDBObject();
trigger.put(TRIGGER_STATE, WAITING_TRIGGER_STATE);
trigger.put(TRIGGER_STATE, STATE_WAITING);
trigger.put(TRIGGER_CALENDAR_NAME, newTrigger.getCalendarName());
trigger.put(TRIGGER_CLASS, newTrigger.getClass().getName());
trigger.put(TRIGGER_DESCRIPTION, newTrigger.getDescription());
Expand All @@ -837,7 +841,7 @@ protected void storeTrigger(OperableTrigger newTrigger, ObjectId jobId, boolean
} catch (DuplicateKey key) {
if (replaceExisting) {
trigger.remove("_id");
triggerCollection.update(keyAsDBObject(newTrigger.getKey()), trigger);
triggerCollection.update(keyToDBObject(newTrigger.getKey()), trigger);
} else {
throw new ObjectAlreadyExistsException(newTrigger);
}
Expand All @@ -847,7 +851,7 @@ protected void storeTrigger(OperableTrigger newTrigger, ObjectId jobId, boolean
protected ObjectId storeJobInMongo(JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException {
JobKey key = newJob.getKey();

BasicDBObject job = keyAsDBObject(key);
BasicDBObject job = keyToDBObject(key);

if (replaceExisting) {
DBObject result = jobCollection.findOne(job);
Expand All @@ -872,7 +876,7 @@ protected ObjectId storeJobInMongo(JobDetail newJob, boolean replaceExisting) th
}
}

protected BasicDBObject keyAsDBObject(Key key) {
protected BasicDBObject keyToDBObject(Key key) {
BasicDBObject job = new BasicDBObject();
job.put(JOB_KEY_NAME, key.getName());
job.put(JOB_KEY_GROUP, key.getGroup());
Expand Down Expand Up @@ -903,8 +907,12 @@ private JobDetail retrieveJob(OperableTrigger trigger) throws JobPersistenceExce
}
}

protected DBObject findJobByKey(JobKey jobKey) {
return jobCollection.findOne(keyAsDBObject(jobKey));
protected DBObject findJobDocumentByKey(JobKey key) {
return jobCollection.findOne(keyToDBObject(key));
}

protected DBObject findTriggerDocumentByKey(TriggerKey key) {
return triggerCollection.findOne(keyToDBObject(key));
}

private void initializeHelpers() {
Expand All @@ -915,4 +923,43 @@ private void initializeHelpers() {

this.queryHelper = new QueryHelper();
}

private TriggerState triggerStateForValue(String ts) {
if (ts == null) {
return TriggerState.NONE;
}

if (ts.equals(STATE_DELETED)) {
return TriggerState.NONE;
}

if (ts.equals(STATE_COMPLETE)) {
return TriggerState.COMPLETE;
}

if (ts.equals(STATE_PAUSED)) {
return TriggerState.PAUSED;
}

if (ts.equals(STATE_PAUSED_BLOCKED)) {
return TriggerState.PAUSED;
}

if (ts.equals(STATE_ERROR)) {
return TriggerState.ERROR;
}

if (ts.equals(STATE_BLOCKED)) {
return TriggerState.BLOCKED;
}

// waiting or acquired
return TriggerState.NORMAL;
}

private DBObject updateThatSetsTriggerStateTo(String state) {
return BasicDBObjectBuilder.
start("$set", new BasicDBObject(TRIGGER_STATE, state)).
get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
(:use clojure.test
[clj-time.core :only [months from-now]])
(:import org.quartz.simpl.SimpleClassLoadHelper
com.mulesoft.quartz. mongodb.MongoDBJobStore)
com.novemberain.quartz.mongodb.MongoDBJobStore)
)

(use-fixtures :each h/purge-quartz-store)
Expand Down Expand Up @@ -78,9 +78,10 @@
job (qj/build
(qj/of-type NoOpJob)
(qj/with-identity "test-storing-triggers1" "tests"))
tk (qt/key "test-storing-triggers1" "tests")
tr (qt/build
(qt/start-now)
(qt/with-identity "test-storing-triggers1" "tests")
(qt/with-identity tk)
(qt/with-description desc)
(qt/for-job job)
(qt/with-schedule (s/schedule
Expand All @@ -93,6 +94,7 @@
(doto store
(.storeJob job false)
(.storeTrigger tr false))
(is (= "NORMAL" (str (.getTriggerState store tk))))
(is (= 1
(mgc/count jobs-collection)
(mgc/count triggers-collection)
Expand Down

0 comments on commit a71b6a1

Please sign in to comment.