From 369a4a2490b065dbbc367d42cd07930e48a737dc Mon Sep 17 00:00:00 2001 From: mtkachenko Date: Tue, 12 Nov 2019 11:26:37 +0800 Subject: [PATCH] added interrupt checks, v4.2.4 --- pom.xml | 2 +- src/main/java/ai/preferred/venom/Crawler.java | 3 ++- .../preferred/venom/fetcher/AsyncFetcher.java | 26 ++++++++++++++----- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 5d78029..775d414 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ ai.preferred venom - 4.2.3 + 4.2.4 jar ${project.groupId}:${project.artifactId} diff --git a/src/main/java/ai/preferred/venom/Crawler.java b/src/main/java/ai/preferred/venom/Crawler.java index 8e70024..ee43166 100644 --- a/src/main/java/ai/preferred/venom/Crawler.java +++ b/src/main/java/ai/preferred/venom/Crawler.java @@ -376,8 +376,9 @@ public void cancelled(final @NotNull Request request) { fetcher.fetch(crawlerRequest, callback); }); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOGGER.debug("({}) producer thread interrupted.", crawlerThread.getName(), e); + Thread.currentThread().interrupt(); break; } } diff --git a/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java b/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java index 2fb74e3..98f52a1 100644 --- a/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java +++ b/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java @@ -132,6 +132,18 @@ public final class AsyncFetcher implements Fetcher { */ private final boolean compressed; + private static Future failRequest(final FutureCallback callback, final Exception ex) { + final BasicFuture f = new BasicFuture<>(callback); + f.failed(ex); + return f; + } + + private static Future cancelRequest(final FutureCallback callback) { + final BasicFuture f = new BasicFuture<>(callback); + f.cancel(true); + return f; + } + /** * Constructs an instance of AsyncFetcher. * @@ -357,14 +369,16 @@ public void cancelled() { } }; + if (Thread.currentThread().isInterrupted()) { + return cancelRequest(futureCallback); + } + final HttpUriRequest httpReq = prepareHttpRequest(httpFetcherRequest); final HttpHost target; try { target = determineTarget(httpReq); } catch (final ClientProtocolException ex) { - final BasicFuture future = new BasicFuture<>(futureCallback); - future.failed(ex); - return future; + return failRequest(futureCallback, ex); } LOGGER.debug("Fetching URL: {}", request.getUrl()); @@ -376,10 +390,8 @@ public void cancelled() { routedValidator = null; } - if (!httpClient.isRunning()) { - final BasicFuture future = new BasicFuture<>(futureCallback); - future.cancel(true); - return future; + if (!httpClient.isRunning() || Thread.currentThread().isInterrupted()) { + return cancelRequest(futureCallback); } return httpClient.execute(