66import com .path .android .jobqueue .config .Configuration ;
77import com .path .android .jobqueue .log .JqLog ;
88
9+ import java .util .concurrent .ConcurrentHashMap ;
910import java .util .concurrent .TimeUnit ;
1011import java .util .concurrent .atomic .AtomicInteger ;
1112
@@ -21,7 +22,8 @@ public class JobConsumerExecutor {
2122 private final Contract contract ;
2223 private final int keepAliveSeconds ;
2324 private final AtomicInteger activeConsumerCount = new AtomicInteger (0 );
24- private final AtomicInteger runningJobCount = new AtomicInteger (0 );
25+ // key : id + (isPersistent)
26+ private final ConcurrentHashMap <String , JobHolder > runningJobHolders ;
2527
2628
2729 public JobConsumerExecutor (Configuration config , Contract contract ) {
@@ -31,6 +33,7 @@ public JobConsumerExecutor(Configuration config, Contract contract) {
3133 this .keepAliveSeconds = config .getConsumerKeepAlive ();
3234 this .contract = contract ;
3335 threadGroup = new ThreadGroup ("JobConsumers" );
36+ runningJobHolders = new ConcurrentHashMap <String , JobHolder >();
3437 }
3538
3639 /**
@@ -92,17 +95,43 @@ private boolean isAboveLoadFactor(boolean inConsumerThread) {
9295 int consumerCnt = activeConsumerCount .intValue () - (inConsumerThread ? 1 : 0 );
9396 boolean res =
9497 consumerCnt < minConsumerSize ||
95- consumerCnt * loadFactor < contract .countRemainingReadyJobs () + runningJobCount . get ();
98+ consumerCnt * loadFactor < contract .countRemainingReadyJobs () + runningJobHolders . size ();
9699 if (JqLog .isDebugEnabled ()) {
97100 JqLog .d ("%s: load factor check. %s = (%d < %d)|| (%d * %d < %d + %d). consumer thread: %s" , Thread .currentThread ().getName (), res ,
98101 consumerCnt , minConsumerSize ,
99- consumerCnt , loadFactor , contract .countRemainingReadyJobs (), runningJobCount . get (), inConsumerThread );
102+ consumerCnt , loadFactor , contract .countRemainingReadyJobs (), runningJobHolders . size (), inConsumerThread );
100103 }
101104 return res ;
102105 }
103106
104107 }
105108
109+ private void onBeforeRun (JobHolder jobHolder ) {
110+ runningJobHolders .put (createrunningJobHolderKey (jobHolder ), jobHolder );
111+ }
112+
113+ private void onAfterRun (JobHolder jobHolder ) {
114+ runningJobHolders .remove (createrunningJobHolderKey (jobHolder ));
115+ }
116+
117+ private String createrunningJobHolderKey (JobHolder jobHolder ) {
118+ return createrunningJobHolderKey (jobHolder .getId (), jobHolder .getBaseJob ().isPersistent ());
119+ }
120+
121+ private String createrunningJobHolderKey (long id , boolean isPersistent ) {
122+ return id + "_" + (isPersistent ? "t" : "f" );
123+ }
124+
125+ /**
126+ * returns true if job is currently handled by one of the executor threads
127+ * @param id id of the job
128+ * @param persistent boolean flag to distinguish id conflicts
129+ * @return true if job is currently handled here
130+ */
131+ public boolean isRunning (long id , boolean persistent ) {
132+ return runningJobHolders .containsKey (createrunningJobHolderKey (id , persistent ));
133+ }
134+
106135 /**
107136 * contract between the {@link JobManager} and {@link JobConsumerExecutor}
108137 */
@@ -168,13 +197,13 @@ public void run() {
168197 do {
169198 nextJob = contract .isRunning () ? contract .getNextJob (executor .keepAliveSeconds , TimeUnit .SECONDS ) : null ;
170199 if (nextJob != null ) {
171- executor .runningJobCount . incrementAndGet ( );
200+ executor .onBeforeRun ( nextJob );
172201 if (nextJob .safeRun (nextJob .getRunCount ())) {
173202 contract .removeJob (nextJob );
174203 } else {
175204 contract .insertOrReplace (nextJob );
176205 }
177- executor .runningJobCount . decrementAndGet ( );
206+ executor .onAfterRun ( nextJob );
178207 }
179208 } while (nextJob != null );
180209 } finally {
0 commit comments