Skip to content

Commit

Permalink
Use JobAttribute to specify different job requirements (#8)
Browse files Browse the repository at this point in the history
* Separated Scheduler interface

* Remove excess code in ProxyProvider

* Refactor job/Scheduler to QueueScheduler

* Updated UrlRouter to threadsafe

* Fixed CrawlerTest

* Minor refactoring

* Minor refactoring

* Modified AsyncFetcher.Builder and Crawler.Builder to fail fast

* Added tests for Crawler.Builder and AsyncFetcher.Builder

* Updates to job package

* Updated Javadoc

* Fixed warnings

* Removed whitespace

* Updated Job

* Updated Job

* Added tests

* Create backward compatibility

* Updated Javadoc

* Only allow NotNull JobAttributes

* Updated AsyncFetcher

* Renamed addJobAttribute to setJobAttribute

* Remove removeAndAdd() from QueueScheduler

* Replace PendingJobs with AtomicInteger and rename
  • Loading branch information
lwj5 authored and sitfoxfly committed Sep 10, 2019
1 parent 029b155 commit 6a8b394
Show file tree
Hide file tree
Showing 34 changed files with 1,251 additions and 542 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>ai.preferred</groupId>
<artifactId>venom</artifactId>
<version>4.1.4-SNAPSHOT</version>
<version>4.2.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
39 changes: 23 additions & 16 deletions src/main/java/ai/preferred/venom/Crawler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* This class handles the coordination between classes during the pre and
Expand Down Expand Up @@ -89,7 +93,7 @@ public final class Crawler implements Interruptible {
* The scheduler used.
*/
@NotNull
private final QueueScheduler<? extends Job> queueScheduler;
private final QueueScheduler queueScheduler;

/**
* The maximum number of simultaneous connections.
Expand Down Expand Up @@ -125,7 +129,7 @@ public final class Crawler implements Interruptible {
* A list of pending futures.
*/
@NotNull
private final Set<Job> pendingJobs;
private final AtomicInteger jobsPending;

/**
* The list of fatal exceptions occurred during response handling.
Expand Down Expand Up @@ -158,7 +162,7 @@ private Crawler(final Builder builder) {
true
);
workerManager = builder.workerManager == null ? new ThreadedWorkerManager(threadPool) : builder.workerManager;
pendingJobs = ConcurrentHashMap.newKeySet();
jobsPending = new AtomicInteger();
fatalHandlerExceptions = Collections.synchronizedList(new ArrayList<>());
}

Expand Down Expand Up @@ -262,7 +266,7 @@ private void handle(final Job job, final Response response) {
} catch (final Exception e) {
LOGGER.error("An exception occurred in handler when parsing response: {}", job.getRequest().getUrl(), e);
} finally {
pendingJobs.remove(job);
jobsPending.decrementAndGet();
}
}

Expand All @@ -276,12 +280,14 @@ private void except(final Job job, final Throwable ex) {
if ((ex instanceof ValidationException && ((ValidationException) ex).getStatus() == Validator.Status.STOP)
|| ex instanceof StopCodeException
|| ex instanceof CancellationException) {
pendingJobs.remove(job);
jobsPending.decrementAndGet();
} else {
synchronized (pendingJobs) { // Synchronisation required to prevent crawler stopping incorrectly.
pendingJobs.remove(job);
synchronized (jobsPending) { // Synchronisation required to prevent crawler stopping incorrectly.
jobsPending.decrementAndGet();
if (job.getTryCount() < maxTries) {
job.reQueue();
job.prepareRetry();
queueScheduler.add(job);
LOGGER.debug("Job {} - {} re-queued.", Integer.toHexString(job.hashCode()), job.getRequest().getUrl());
} else {
LOGGER.error("Max retries reached for request: {}", job.getRequest().getUrl());
}
Expand All @@ -299,13 +305,13 @@ private void run() {
try {
final Job job = queueScheduler.poll(100, TimeUnit.MILLISECONDS);
if (job == null) {
if (pendingJobs.size() != 0) {
if (jobsPending.get() > 0) {
continue;
}
// This should only run if pendingJob == 0 && job == null
synchronized (pendingJobs) {
synchronized (jobsPending) {
LOGGER.debug("({}) Checking for exit conditions.", crawlerThread.getName());
if (queueScheduler.peek() == null && pendingJobs.size() == 0 && exitWhenDone.get()) {
if (queueScheduler.peek() == null && jobsPending.get() <= 0 && exitWhenDone.get()) {
break;
}
}
Expand All @@ -316,13 +322,14 @@ private void run() {
lastRequestTime = System.nanoTime();

connections.acquire();
pendingJobs.add(job);
jobsPending.incrementAndGet();
threadPool.execute(() -> {
LOGGER.debug("Preparing job {} - {} (try {}/{}).",
Integer.toHexString(job.hashCode()), job.getRequest().getUrl(), job.getTryCount(), maxTries);
final CrawlerRequest crawlerRequest = prepareRequest(job.getRequest(), job.getTryCount());
if (Thread.currentThread().isInterrupted()) {
pendingJobs.remove(job);
connections.release();
jobsPending.decrementAndGet();
LOGGER.debug("The thread pool is interrupted");
return;
}
Expand Down Expand Up @@ -558,7 +565,7 @@ public static final class Builder {
/**
* The scheduler used.
*/
private QueueScheduler<? extends Job> queueScheduler;
private QueueScheduler queueScheduler;

/**
* The sleep scheduler used.
Expand Down Expand Up @@ -649,7 +656,7 @@ public Builder setWorkerManager(final @NotNull WorkerManager workerManager) {
* @param queueScheduler scheduler to be used.
* @return this
*/
public Builder setScheduler(final @NotNull QueueScheduler<? extends Job> queueScheduler) {
public Builder setScheduler(final @NotNull QueueScheduler queueScheduler) {
if (queueScheduler == null) {
throw new IllegalStateException("Attribute 'queueScheduler' cannot be null.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ public Builder setSslContext(final SSLContext sslContext) {
* @param codes A list of stop codes.
* @return this
*/
public Builder setStopCodes(final int... codes) {
public Builder setStopCodes(final @NotNull int... codes) {
if (codes == null) {
throw new IllegalStateException("Attribute 'codes' cannot be null.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2019 Preferred.AI
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.preferred.venom.job;

import javax.annotation.Nonnull;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Ween Jiann Lee
*/
public abstract class AbstractPriorityQueueScheduler extends AbstractQueueScheduler {

/**
* Constructs an instance of AbstractQueueScheduler.
*/
protected AbstractPriorityQueueScheduler() {
super(new PriorityBlockingQueue<>(11,
Comparator.comparing(o -> (o.getJobAttribute(PriorityJobAttribute.class)))));
}

/**
* Check the job for {@see PriorityJobAttribute}, if missing,
* adds it to the job.
*
* @param job the job to check.
* @return the input job.
*/
private Job ensurePriorityJobAttribute(final Job job) {
if (job.getJobAttribute(PriorityJobAttribute.class) == null) {
job.setJobAttribute(new PriorityJobAttribute());
}
return job;
}

@Override
public final void put(final @Nonnull Job job) throws InterruptedException {
getQueue().put(ensurePriorityJobAttribute(job));
}

@Override
public final boolean offer(final Job job, final long timeout, final @Nonnull TimeUnit unit)
throws InterruptedException {
return getQueue().offer(ensurePriorityJobAttribute(job), timeout, unit);
}

@Override
public final boolean offer(final @Nonnull Job job) {
return getQueue().offer(ensurePriorityJobAttribute(job));
}


}
72 changes: 3 additions & 69 deletions src/main/java/ai/preferred/venom/job/AbstractQueueScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@

package ai.preferred.venom.job;

import ai.preferred.venom.Handler;
import ai.preferred.venom.request.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.util.AbstractQueue;
import java.util.Collection;
Expand All @@ -31,7 +26,8 @@
* @author Ween Jiann Lee
* @author Maksim Tkachenko
*/
public abstract class AbstractQueueScheduler extends AbstractQueue<Job> implements QueueScheduler<Job> {
@SuppressWarnings("NullableProblems")
public abstract class AbstractQueueScheduler extends AbstractQueue<Job> implements QueueScheduler {

/**
* The queue used for this scheduler.
Expand All @@ -50,7 +46,7 @@ public abstract class AbstractQueueScheduler extends AbstractQueue<Job> implemen
*/
protected AbstractQueueScheduler(final BlockingQueue<Job> queue) {
this.queue = queue;
this.scheduler = new JobScheduler(queue);
this.scheduler = new JobScheduler(this);
}

@Override
Expand Down Expand Up @@ -90,11 +86,6 @@ public final int drainTo(final @Nonnull Collection<? super Job> c, final int max
return queue.drainTo(c, maxElements);
}

@Override
public final boolean offer(final @Nonnull Job job) {
return queue.offer(job);
}

@Override
public final Job peek() {
return queue.peek();
Expand All @@ -109,61 +100,4 @@ protected final BlockingQueue<Job> getQueue() {
return queue;
}

/**
* An implementation of ai.preferred.venom.job.Scheduler using BasicJob.
*/
public static class JobScheduler implements Scheduler {

/**
* Logger.
*/
private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class);

/**
* The queue used for this scheduler.
*/
private final BlockingQueue<Job> queue;

/**
* Constructs an instance of JobScheduler.
*
* @param queue an instance of BlockingQueue
*/
public JobScheduler(final BlockingQueue<Job> queue) {
this.queue = queue;
}

@Override
public final void add(final Request r, final Handler h, final Priority p, final Priority pf) {
final Job job = new BasicJob(r, h, p, pf, queue);
queue.add(job);
LOGGER.debug("Added job {} - {} to queue.", Integer.toHexString(job.hashCode()), r.getUrl());
}

@Override
public final void add(final Request r, final Handler h, final Priority p) {
add(r, h, p, Priority.FLOOR);
}

@Override
public final void add(final Request r, final Handler h) {
add(r, h, Priority.DEFAULT);
}

@Override
public final void add(final Request r, final Priority p, final Priority pf) {
add(r, null, p, pf);
}

@Override
public final void add(final Request r, final Priority p) {
add(r, null, p, Priority.FLOOR);
}

@Override
public final void add(final Request r) {
add(r, null, Priority.DEFAULT, Priority.FLOOR);
}

}
}
Loading

0 comments on commit 6a8b394

Please sign in to comment.