Skip to content

Commit

Permalink
added interrupt checks, v4.2.4
Browse files Browse the repository at this point in the history
  • Loading branch information
sitfoxfly committed Nov 12, 2019
1 parent 68f6497 commit 369a4a2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>ai.preferred</groupId>
<artifactId>venom</artifactId>
<version>4.2.3</version>
<version>4.2.4</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/ai/preferred/venom/Crawler.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,9 @@ public void cancelled(final @NotNull Request request) {

fetcher.fetch(crawlerRequest, callback);
});
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
LOGGER.debug("({}) producer thread interrupted.", crawlerThread.getName(), e);
Thread.currentThread().interrupt();
break;
}
}
Expand Down
26 changes: 19 additions & 7 deletions src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ public final class AsyncFetcher implements Fetcher {
*/
private final boolean compressed;

private static Future<Response> failRequest(final FutureCallback<Response> callback, final Exception ex) {
final BasicFuture<Response> f = new BasicFuture<>(callback);
f.failed(ex);
return f;
}

private static Future<Response> cancelRequest(final FutureCallback<Response> callback) {
final BasicFuture<Response> f = new BasicFuture<>(callback);
f.cancel(true);
return f;
}

/**
* Constructs an instance of AsyncFetcher.
*
Expand Down Expand Up @@ -357,14 +369,16 @@ public void cancelled() {
}
};

if (Thread.currentThread().isInterrupted()) {
return cancelRequest(futureCallback);
}

final HttpUriRequest httpReq = prepareHttpRequest(httpFetcherRequest);
final HttpHost target;
try {
target = determineTarget(httpReq);
} catch (final ClientProtocolException ex) {
final BasicFuture<Response> future = new BasicFuture<>(futureCallback);
future.failed(ex);
return future;
return failRequest(futureCallback, ex);
}

LOGGER.debug("Fetching URL: {}", request.getUrl());
Expand All @@ -376,10 +390,8 @@ public void cancelled() {
routedValidator = null;
}

if (!httpClient.isRunning()) {
final BasicFuture<Response> future = new BasicFuture<>(futureCallback);
future.cancel(true);
return future;
if (!httpClient.isRunning() || Thread.currentThread().isInterrupted()) {
return cancelRequest(futureCallback);
}

return httpClient.execute(
Expand Down

0 comments on commit 369a4a2

Please sign in to comment.