diff --git a/contrib/src/main/java/org/archive/modules/AMQPProducer.java b/contrib/src/main/java/org/archive/modules/AMQPProducer.java new file mode 100644 index 000000000..b111e656d --- /dev/null +++ b/contrib/src/main/java/org/archive/modules/AMQPProducer.java @@ -0,0 +1,112 @@ +/* + * This file is part of the Heritrix web crawler (crawler.archive.org). + * + * Licensed to the Internet Archive (IA) by one or more individual + * contributors. + * + * The IA licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.archive.modules; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class AMQPProducer { + static protected final Logger logger = Logger.getLogger(AMQPProducer.class.getName()); + + protected String amqpUri; + protected String exchange; + protected String routingKey; + + public AMQPProducer(String amqpUri, String exchange, String routingKey) { + this.amqpUri = amqpUri; + this.exchange = exchange; + this.routingKey = routingKey; + } + + transient protected Connection connection = null; + transient protected ThreadLocal threadChannel = + new ThreadLocal(); + + protected synchronized Channel channel() throws IOException { + if (threadChannel.get() != null && !threadChannel.get().isOpen()) { + threadChannel.set(null); + } + + if (threadChannel.get() == null) { + if (connection == null || !connection.isOpen()) { + connect(); + } + try { + if (connection != null) { + threadChannel.set(connection.createChannel()); + } + } catch (IOException e) { + throw new IOException("Attempting to create channel for AMQP connection failed!", e); + } + } + + return threadChannel.get(); + } + + private AtomicBoolean serverLooksDown = new AtomicBoolean(false); + private synchronized void connect() throws IOException { + ConnectionFactory factory = new ConnectionFactory(); + try { + factory.setUri(amqpUri); + connection = factory.newConnection(); + boolean wasDown = serverLooksDown.getAndSet(false); + if (wasDown) { + logger.info(amqpUri + " is back up, connected successfully!"); + } + } catch (Exception e) { + connection = null; + serverLooksDown.getAndSet(true); + throw new IOException("Attempting to connect to AMQP server failed!", e); + } + } + + synchronized public void stop() { + try { + if (connection != null && connection.isOpen()) { + connection.close(); + connection = null; + } + } catch (IOException e) { + logger.log(Level.SEVERE, "Attempting to close AMQP connection failed!", e); + } + } + + /** + * Publish the message with the supplied properties. If this method returns + * without throwing an exception, the message was published successfully. + * + * @param message + * @param props + * @throws IOException + * if message is not published successfully for any reason + */ + public void publishMessage(byte[] message, BasicProperties props) + throws IOException { + Channel channel = channel(); + channel.exchangeDeclare(exchange, "direct", true); + channel.basicPublish(exchange, routingKey, props, message); + } +} diff --git a/contrib/src/main/java/org/archive/modules/AMQPProducerProcessor.java b/contrib/src/main/java/org/archive/modules/AMQPProducerProcessor.java new file mode 100644 index 000000000..05faa758d --- /dev/null +++ b/contrib/src/main/java/org/archive/modules/AMQPProducerProcessor.java @@ -0,0 +1,126 @@ +/* + * This file is part of the Heritrix web crawler (crawler.archive.org). + * + * Licensed to the Internet Archive (IA) by one or more individual + * contributors. + * + * The IA licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.archive.modules; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * @contributor nlevitt + */ +public abstract class AMQPProducerProcessor extends Processor { + + protected final Logger logger = Logger.getLogger(getClass().getName()); + + protected String amqpUri = "amqp://guest:guest@localhost:5672/%2f"; + public String getAmqpUri() { + return this.amqpUri; + } + public void setAmqpUri(String uri) { + this.amqpUri = uri; + } + + protected String exchange; + public String getExchange() { + return exchange; + } + public void setExchange(String exchange) { + this.exchange = exchange; + } + + protected String routingKey; + public String getRoutingKey() { + return routingKey; + } + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + transient protected AMQPProducer amqpProducer; + + protected AMQPProducer amqpProducer() { + if (amqpProducer == null) { + amqpProducer = new AMQPProducer(getAmqpUri(), getExchange(), getRoutingKey()); + } + return amqpProducer; + } + + @Override + synchronized public void stop() { + if (!isRunning) { + return; + } + + super.stop(); + + if (amqpProducer != null) { + amqpProducer.stop(); + } + } + + @Override + protected ProcessResult innerProcessResult(CrawlURI curi) + throws InterruptedException { + byte[] message = null; + BasicProperties props = null; + + message = buildMessage(curi); + props = amqpMessageProperties(); + try { + amqpProducer().publishMessage(message, props); + success(curi, message, props); + } catch (IOException e) { + fail(curi, message, props, e); + } + + return ProcessResult.PROCEED; + } + + protected BasicProperties amqpMessageProperties() { + return null; + } + + @Override + protected void innerProcess(CrawlURI uri) throws InterruptedException { + throw new RuntimeException("should never be called"); + } + + abstract protected byte[] buildMessage(CrawlURI curi); + + protected void success(CrawlURI curi, byte[] message, BasicProperties props) { + if (logger.isLoggable(Level.FINE)) { + try { + logger.fine("sent to amqp exchange=" + getExchange() + + " routingKey=" + routingKey + ": " + new String(message, "UTF-8")); + } catch (UnsupportedEncodingException e) { + logger.fine("sent to amqp exchange=" + getExchange() + + " routingKey=" + routingKey + ": " + message + " (" + message.length + " bytes)"); + } + } + } + + protected void fail(CrawlURI curi, byte[] message, BasicProperties props, Throwable e) { + logger.log(Level.SEVERE, "failed to send message to amqp for URI " + curi, e); + } +} diff --git a/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java b/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java index 3646f399f..8582da0fc 100644 --- a/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java +++ b/contrib/src/main/java/org/archive/modules/AMQPPublishProcessor.java @@ -21,12 +21,10 @@ import static org.archive.modules.CoreAttributeConstants.A_HERITABLE_KEYS; -import java.io.IOException; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; import org.apache.commons.httpclient.URIException; import org.archive.crawler.frontier.AMQPUrlReceiver; @@ -35,53 +33,23 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; /** * @author eldondev * @contributor nlevitt */ -public class AMQPPublishProcessor extends Processor { +public class AMQPPublishProcessor extends AMQPProducerProcessor implements Serializable { - @SuppressWarnings("unused") - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; - private static final Logger logger = - Logger.getLogger(AMQPPublishProcessor.class.getName()); - - public static final int S_SENT_TO_AMQP = 10001; // artificial fetch status public static final String A_SENT_TO_AMQP = "sentToAMQP"; // annotation - protected String amqpUri = "amqp://guest:guest@localhost:5672/%2f"; - public String getAmqpUri() { - return this.amqpUri; - } - public void setAmqpUri(String uri) { - this.amqpUri = uri; + public AMQPPublishProcessor() { + // set default values + exchange = "umbra"; + routingKey = "urls"; } - transient protected Connection connection = null; - transient protected ThreadLocal threadChannel = - new ThreadLocal(); - - protected String exchange = "umbra"; - public String getExchange() { - return exchange; - } - public void setExchange(String exchange) { - this.exchange = exchange; - } - - protected String routingKey = "url"; - public String getRoutingKey() { - return routingKey; - } - public void setRoutingKey(String routingKey) { - this.routingKey = routingKey; - } - protected String clientId = null; public String getClientId() { return clientId; @@ -110,33 +78,6 @@ protected boolean shouldProcess(CrawlURI curi) { } } - @Override - protected ProcessResult innerProcessResult(CrawlURI curi) - throws InterruptedException { - try { - Channel channel = channel(); - if (channel != null) { - JSONObject message = buildJsonMessage(curi); - - BasicProperties props = new AMQP.BasicProperties.Builder(). - contentType("application/json").build(); - channel.exchangeDeclare(getExchange(), "direct", true); - channel.basicPublish(getExchange(), getRoutingKey(), props, - message.toString().getBytes("UTF-8")); - if (logger.isLoggable(Level.FINE)) { - logger.fine("sent to amqp exchange=" + getExchange() - + " routingKey=" + routingKey + ": " + message); - } - - curi.getAnnotations().add(A_SENT_TO_AMQP); - } - } catch (Exception e) { - logger.log(Level.SEVERE, "Attempting to send URI to AMQP server failed! " + curi, e); - } - - return ProcessResult.PROCEED; - } - /** * Constructs the json to send via AMQP. This includes the url, and some * metadata from the CrawlURI. The metadata should be passed back to @@ -167,62 +108,30 @@ protected JSONObject buildJsonMessage(CrawlURI curi) { metadata.put("heritableData", heritableData); message.put("metadata", metadata); - + return message; } @Override - protected void innerProcess(CrawlURI uri) throws InterruptedException { - throw new RuntimeException("should never be called"); - } - - protected synchronized Channel channel() { - if (threadChannel.get() != null && !threadChannel.get().isOpen()) { - threadChannel.set(null); - } - - if (threadChannel.get() == null) { - if (connection == null || !connection.isOpen()) { - connect(); - } - try { - if (connection != null) { - threadChannel.set(connection.createChannel()); - } - } catch (IOException e) { - logger.log(Level.SEVERE, "Attempting to create channel for AMQP connection failed!", e); - } + protected byte[] buildMessage(CrawlURI curi) { + try { + return buildJsonMessage(curi).toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); } - return threadChannel.get(); } - - private AtomicBoolean serverLooksDown = new AtomicBoolean(false); - private synchronized void connect() { - ConnectionFactory factory = new ConnectionFactory(); - try { - factory.setUri(getAmqpUri()); - connection = factory.newConnection(); - boolean wasDown = serverLooksDown.getAndSet(false); - if (wasDown) { - logger.info(getAmqpUri() + " is back up, connected successfully!"); - } - } catch (Exception e) { - connection = null; - boolean wasAlreadyDown = serverLooksDown.getAndSet(true); - if (!wasAlreadyDown) { - logger.log(Level.SEVERE, "Attempting to connect to AMQP server failed!", e); - } - } + @Override + protected void success(CrawlURI curi, byte[] message, BasicProperties props) { + super.success(curi, message, props); + curi.getAnnotations().add(A_SENT_TO_AMQP); } - synchronized public void stop() { - try { - if(connection != null && connection.isOpen()) { - connection.close(); - } - } catch (IOException e) { - logger.log(Level.SEVERE, "Attempting to close AMQP connection failed!", e); - } + protected BasicProperties props = new AMQP.BasicProperties.Builder(). + contentType("application/json").build(); + + @Override + protected BasicProperties amqpMessageProperties() { + return props; } } diff --git a/contrib/src/main/java/org/archive/modules/deciderules/DecideRuleSequenceWithAMQPFeed.java b/contrib/src/main/java/org/archive/modules/deciderules/DecideRuleSequenceWithAMQPFeed.java new file mode 100644 index 000000000..7c42e1c29 --- /dev/null +++ b/contrib/src/main/java/org/archive/modules/deciderules/DecideRuleSequenceWithAMQPFeed.java @@ -0,0 +1,149 @@ +/* + * This file is part of the Heritrix web crawler (crawler.archive.org). + * + * Licensed to the Internet Archive (IA) by one or more individual + * contributors. + * + * The IA licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.archive.modules.deciderules; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.archive.modules.AMQPProducer; +import org.archive.modules.CrawlURI; +import org.archive.modules.net.CrawlHost; +import org.archive.modules.net.ServerCache; +import org.archive.util.ArchiveUtils; +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; + +public class DecideRuleSequenceWithAMQPFeed extends DecideRuleSequence { + private static final long serialVersionUID = 1L; + + private static final Logger logger = + Logger.getLogger(DecideRuleSequenceWithAMQPFeed.class.getName()); + + protected String amqpUri = "amqp://guest:guest@localhost:5672/%2f"; + public String getAmqpUri() { + return this.amqpUri; + } + public void setAmqpUri(String uri) { + this.amqpUri = uri; + } + + protected String exchange = "heritrix.realTimeFeed"; + public String getExchange() { + return exchange; + } + public void setExchange(String exchange) { + this.exchange = exchange; + } + + protected String routingKey = "scopeLog"; + public String getRoutingKey() { + return routingKey; + } + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + protected ServerCache serverCache; + public ServerCache getServerCache() { + return this.serverCache; + } + @Autowired + public void setServerCache(ServerCache serverCache) { + this.serverCache = serverCache; + } + + transient protected AMQPProducer amqpProducer; + + protected AMQPProducer amqpProducer() { + if (amqpProducer == null) { + amqpProducer = new AMQPProducer(getAmqpUri(), getExchange(), getRoutingKey()); + } + return amqpProducer; + } + + @Override + synchronized public void stop() { + if (!isRunning) { + return; + } + + super.stop(); + + if (amqpProducer != null) { + amqpProducer.stop(); + } + } + + @Override + protected void decisionMade(CrawlURI curi, DecideRule decisiveRule, + int decisiveRuleNumber, DecideResult result) { + super.decisionMade(curi, decisiveRule, decisiveRuleNumber, result); + + JSONObject jo = buildJson(curi, decisiveRuleNumber, decisiveRule, + result); + + byte[] message; + try { + message = jo.toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + + try { + amqpProducer().publishMessage(message, props); + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "sent message to amqp: " + jo); + } + } catch (IOException e) { + logger.log(Level.WARNING, "failed to send message to amqp: " + jo, e); + } + } + + protected BasicProperties props = new AMQP.BasicProperties.Builder(). + contentType("application/json").build(); + + protected JSONObject buildJson(CrawlURI curi, int decisiveRuleNumber, + DecideRule decisiveRule, DecideResult result) { + JSONObject jo = new JSONObject(); + + jo.put("timestamp", ArchiveUtils.getLog17Date(System.currentTimeMillis())); + + jo.put("decisiveRuleNo", decisiveRuleNumber); + jo.put("decisiveRule", decisiveRule.getClass().getSimpleName()); + jo.put("result", result.toString()); + + jo.put("url", curi.toString()); + + CrawlHost host = getServerCache().getHostFor(curi.getUURI()); + if (host != null) { + jo.put("host", host.fixUpName()); + } else { + jo.put("host", JSONObject.NULL); + } + + jo.put("sourceSeed", curi.getSourceTag()); + jo.put("via", curi.flattenVia()); + return jo; + } +} diff --git a/contrib/src/main/java/org/archive/modules/postprocessor/AMQPCrawlLogFeed.java b/contrib/src/main/java/org/archive/modules/postprocessor/AMQPCrawlLogFeed.java new file mode 100644 index 000000000..281aa613d --- /dev/null +++ b/contrib/src/main/java/org/archive/modules/postprocessor/AMQPCrawlLogFeed.java @@ -0,0 +1,202 @@ +/* + * This file is part of the Heritrix web crawler (crawler.archive.org). + * + * Licensed to the Internet Archive (IA) by one or more individual + * contributors. + * + * The IA licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.archive.modules.postprocessor; + +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.collections.Closure; +import org.apache.commons.lang.StringUtils; +import org.archive.crawler.framework.Frontier; +import org.archive.crawler.frontier.AbstractFrontier; +import org.archive.crawler.frontier.BdbFrontier; +import org.archive.crawler.io.UriProcessingFormatter; +import org.archive.modules.AMQPProducerProcessor; +import org.archive.modules.CoreAttributeConstants; +import org.archive.modules.CrawlURI; +import org.archive.modules.net.CrawlHost; +import org.archive.modules.net.ServerCache; +import org.archive.util.ArchiveUtils; +import org.archive.util.MimetypeUtils; +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.Lifecycle; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * @see UriProcessingFormatter + * @contributor nlevitt + */ +public class AMQPCrawlLogFeed extends AMQPProducerProcessor implements Lifecycle { + + protected Frontier frontier; + public Frontier getFrontier() { + return this.frontier; + } + /** Autowired frontier, needed to determine when a url is finished. */ + @Autowired + public void setFrontier(Frontier frontier) { + this.frontier = frontier; + } + + protected ServerCache serverCache; + public ServerCache getServerCache() { + return this.serverCache; + } + @Autowired + public void setServerCache(ServerCache serverCache) { + this.serverCache = serverCache; + } + + protected Map extraFields; + public Map getExtraFields() { + return extraFields; + } + public void setExtraFields(Map extraFields) { + this.extraFields = extraFields; + } + + protected boolean dumpPendingAtClose = false; + public boolean getDumpPendingAtClose() { + return dumpPendingAtClose; + } + /** + * If true, publish all pending urls (i.e. queued urls still in the + * frontier) when crawl job is stopping. They are recognizable by the status + * field which has the value 0. + * + * @see BdbFrontier#setDumpPendingAtClose(boolean) + */ + public void setDumpPendingAtClose(boolean dumpPendingAtClose) { + this.dumpPendingAtClose = dumpPendingAtClose; + } + + public AMQPCrawlLogFeed() { + // set default values + exchange = "heritrix.realTimeFeed"; + routingKey = "crawlLog"; + } + + protected Object checkForNull(Object o) { + return o != null ? o : JSONObject.NULL; + } + + @Override + protected byte[] buildMessage(CrawlURI curi) { + JSONObject jo = new JSONObject(); + + jo.put("timestamp", ArchiveUtils.getLog17Date(System.currentTimeMillis())); + + for (Entry entry: getExtraFields().entrySet()) { + jo.put(entry.getKey(), entry.getValue()); + } + + jo.put("bytes_received", curi.isHttpTransaction() && curi.getContentLength() >= 0 ? curi.getContentLength() : JSONObject.NULL); + jo.put("stored_size", curi.getContentSize() > 0 ? curi.getContentSize() : JSONObject.NULL); + + jo.put("response_code", checkForNull(curi.getFetchStatus())); + jo.put("document_url", checkForNull(curi.getUURI().toString())); + jo.put("hop_path", checkForNull(curi.getPathFromSeed())); + jo.put("via", checkForNull(curi.flattenVia())); + jo.put("mimetype", checkForNull(MimetypeUtils.truncate(curi.getContentType()))); + jo.put("thread", checkForNull(curi.getThreadNumber())); + + if (curi.containsDataKey(CoreAttributeConstants.A_FETCH_COMPLETED_TIME)) { + long beganTime = curi.getFetchBeginTime(); + String fetchBeginDuration = ArchiveUtils.get17DigitDate(beganTime) + + "+" + (curi.getFetchCompletedTime() - beganTime); + jo.put("start_time_plus_duration", fetchBeginDuration); + } else { + jo.put("start_time_plus_duration", JSONObject.NULL); + } + + jo.put("payload_hash", checkForNull(curi.getContentDigestSchemeString())); + jo.put("seed", checkForNull(curi.getSourceTag())); + + CrawlHost host = getServerCache().getHostFor(curi.getUURI()); + if (host != null) { + jo.put("host", host.fixUpName()); + } else { + jo.put("host", JSONObject.NULL); + } + + jo.put("annotations", checkForNull(StringUtils.join(curi.getAnnotations(), ","))); + + jo.put("extra_info", checkForNull(curi.getExtraInfo())); + + String str = jo.toString(); + try { + return str.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Override + protected boolean shouldProcess(CrawlURI curi) { + if (frontier instanceof AbstractFrontier) { + return !((AbstractFrontier) frontier).needsReenqueuing(curi); + } else { + return false; + } + } + + private transient long pendingDumpedCount = 0l; + @Override + public synchronized void stop() { + if (!isRunning) { + return; + } + + if (dumpPendingAtClose) { + if (frontier instanceof BdbFrontier) { + + Closure closure = new Closure() { + public void execute(Object curi) { + try { + innerProcessResult((CrawlURI) curi); + pendingDumpedCount++; + } catch (InterruptedException e) { + } + } + }; + + logger.info("dumping " + frontier.queuedUriCount() + " queued urls to amqp feed"); + ((BdbFrontier) frontier).forAllPendingDo(closure); + logger.info("dumped " + pendingDumpedCount + " queued urls to amqp feed"); + } else { + logger.warning("frontier is not a BdbFrontier, cannot dumpPendingAtClose"); + } + } + + // closes amqp connection + super.stop(); + } + + protected BasicProperties props = new AMQP.BasicProperties.Builder(). + contentType("application/json").build(); + + @Override + protected BasicProperties amqpMessageProperties() { + return props; + } +} diff --git a/engine/src/main/java/org/archive/crawler/frontier/AbstractFrontier.java b/engine/src/main/java/org/archive/crawler/frontier/AbstractFrontier.java index 2c5244466..3408ec440 100644 --- a/engine/src/main/java/org/archive/crawler/frontier/AbstractFrontier.java +++ b/engine/src/main/java/org/archive/crawler/frontier/AbstractFrontier.java @@ -1082,7 +1082,7 @@ protected boolean isDisregarded(CrawlURI curi) { * The CrawlURI to check * @return True if we need to retry. */ - protected boolean needsReenqueuing(CrawlURI curi) { + public boolean needsReenqueuing(CrawlURI curi) { if (overMaxRetries(curi)) { return false; } diff --git a/engine/src/main/java/org/archive/crawler/frontier/BdbFrontier.java b/engine/src/main/java/org/archive/crawler/frontier/BdbFrontier.java index bc57aac06..5806df75a 100644 --- a/engine/src/main/java/org/archive/crawler/frontier/BdbFrontier.java +++ b/engine/src/main/java/org/archive/crawler/frontier/BdbFrontier.java @@ -382,9 +382,13 @@ public void execute(Object curi) { log((CrawlURI) curi); } }; - pendingUris.forAllPendingDo(tolog); + forAllPendingDo(tolog); } - + + public void forAllPendingDo(Closure closure) { + pendingUris.forAllPendingDo(closure); + } + /** * Run a self-consistency check over queue collections, queues-of-queues, * etc. for testing purposes. Requires one of the same locks as for PAUSE, diff --git a/engine/src/main/java/org/archive/crawler/io/UriProcessingFormatter.java b/engine/src/main/java/org/archive/crawler/io/UriProcessingFormatter.java index d463aefda..02cb9a671 100644 --- a/engine/src/main/java/org/archive/crawler/io/UriProcessingFormatter.java +++ b/engine/src/main/java/org/archive/crawler/io/UriProcessingFormatter.java @@ -37,15 +37,20 @@ public class UriProcessingFormatter extends Formatter implements Preformatter, CoreAttributeConstants { private final static String NA = "-"; + /** - * Guess at line length (URIs are assumed avg. of 128 bytes). - * Used to preallocated the buffer we accumulate the log line - * in. Hopefully we get it right most of the time and no need - * to enlarge except in the rare case. + * Guess at line length. Used to preallocated the buffer we accumulate the + * log line in. Hopefully we get it right most of the time and no need to + * enlarge except in the rare case. + * + *

+ * In a sampling of actual Aug 2014 Archive-It crawl logs I found that a + * line length 1000 characters was around the 99th percentile (only 1 in 100 + * is longer than that). We put more information in the crawl log now than + * was originally estimated. Exactly what goes in can depend on the + * configuration as well. */ - private final static int GUESS_AT_LOG_LENGTH = - 17 + 1 + 3 + 1 + 10 + 128 + + 1 + 10 + 1 + 128 + 1 + 10 + 1 + 3 + - 14 + 1 + 32 + 4 + 128 + 1; + private final static int GUESS_AT_LINE_LENGTH = 1000; /** * Reusable assembly buffer. @@ -54,7 +59,7 @@ public class UriProcessingFormatter new ThreadLocal() { @Override protected StringBuilder initialValue() { - return new StringBuilder(GUESS_AT_LOG_LENGTH); + return new StringBuilder(GUESS_AT_LINE_LENGTH); } }; diff --git a/engine/src/main/java/org/archive/crawler/postprocessor/DispositionProcessor.java b/engine/src/main/java/org/archive/crawler/postprocessor/DispositionProcessor.java index 2735564ff..a9c779e3a 100644 --- a/engine/src/main/java/org/archive/crawler/postprocessor/DispositionProcessor.java +++ b/engine/src/main/java/org/archive/crawler/postprocessor/DispositionProcessor.java @@ -167,13 +167,11 @@ public DispositionProcessor() { @Override protected boolean shouldProcess(CrawlURI puri) { - return puri instanceof CrawlURI; + return true; } @Override - protected void innerProcess(CrawlURI puri) { - CrawlURI curi = (CrawlURI)puri; - + protected void innerProcess(CrawlURI curi) { // Tally per-server, per-host, per-frontier-class running totals CrawlServer server = serverCache.getServerFor(curi.getUURI()); diff --git a/engine/src/main/java/org/archive/crawler/reporting/HostsReport.java b/engine/src/main/java/org/archive/crawler/reporting/HostsReport.java index e62b5627d..4370e4257 100644 --- a/engine/src/main/java/org/archive/crawler/reporting/HostsReport.java +++ b/engine/src/main/java/org/archive/crawler/reporting/HostsReport.java @@ -20,8 +20,6 @@ package org.archive.crawler.reporting; import java.io.PrintWriter; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -39,18 +37,6 @@ public class HostsReport extends Report { private final static Logger logger = Logger.getLogger(HostsReport.class.getName()); - protected String fixup(String hostName) { - if ("dns:".equals(hostName) || "whois:".equals(hostName)) { - return hostName; - } else { - try { - return URLEncoder.encode(hostName, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - } - @Override public void write(final PrintWriter writer, StatisticsTracker stats) { // TODO: only perform sorting on manageable number of hosts @@ -63,7 +49,7 @@ public void write(final PrintWriter writer, StatisticsTracker stats) { writeReportLine(writer, host.getSubstats().getFetchSuccesses(), host.getSubstats().getTotalBytes(), - fixup(host.getHostName()), + host.fixUpName(), host.getSubstats().getRobotsDenials(), host.getSubstats().getRemaining(), host.getSubstats().getNovelUrls(), diff --git a/modules/src/main/java/org/archive/modules/deciderules/DecideRuleSequence.java b/modules/src/main/java/org/archive/modules/deciderules/DecideRuleSequence.java index c13b48387..e90e9a3ad 100644 --- a/modules/src/main/java/org/archive/modules/deciderules/DecideRuleSequence.java +++ b/modules/src/main/java/org/archive/modules/deciderules/DecideRuleSequence.java @@ -25,21 +25,24 @@ import org.archive.modules.CrawlURI; import org.archive.modules.SimpleFileLoggerProvider; +import org.archive.modules.net.CrawlHost; +import org.archive.modules.net.ServerCache; +import org.json.JSONObject; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.Lifecycle; public class DecideRuleSequence extends DecideRule implements BeanNameAware, Lifecycle { final private static Logger LOGGER = - Logger.getLogger(DecideRuleSequence.class.getName()); + Logger.getLogger(DecideRuleSequence.class.getName()); private static final long serialVersionUID = 3L; - + protected transient Logger fileLogger = null; /** * If enabled, log decisions to file named logs/{spring-bean-id}.log. Format * is: [timestamp] [decisive-rule-num] [decisive-rule-class] [decision] - * [uri] + * [uri] [extraInfo] * * Relies on Spring Lifecycle to initialize the log. Only top-level * beans get the Lifecycle treatment from Spring, so bean must be top-level @@ -56,6 +59,19 @@ public void setLogToFile(boolean enabled) { kp.put("logToFile",enabled); } + /** + * Whether to include the "extra info" field for each entry in crawl.log. + * "Extra info" is a json object with entries "host", "via", "source" and + * "hopPath". + */ + protected boolean logExtraInfo = false; + public boolean getLogExtraInfo() { + return logExtraInfo; + } + public void setLogExtraInfo(boolean logExtraInfo) { + this.logExtraInfo = logExtraInfo; + } + // provided by CrawlerLoggerModule which is in heritrix-engine, inaccessible // from here, thus the need for the SimpleFileLoggerProvider interface protected SimpleFileLoggerProvider loggerModule; @@ -66,7 +82,7 @@ public SimpleFileLoggerProvider getLoggerModule() { public void setLoggerModule(SimpleFileLoggerProvider loggerModule) { this.loggerModule = loggerModule; } - + @SuppressWarnings("unchecked") public List getRules() { return (List) kp.get("rules"); @@ -75,13 +91,22 @@ public void setRules(List rules) { kp.put("rules", rules); } + protected ServerCache serverCache; + public ServerCache getServerCache() { + return this.serverCache; + } + @Autowired + public void setServerCache(ServerCache serverCache) { + this.serverCache = serverCache; + } + public DecideResult innerDecide(CrawlURI uri) { DecideRule decisiveRule = null; int decisiveRuleNumber = -1; DecideResult result = DecideResult.NONE; List rules = getRules(); int max = rules.size(); - + for (int i = 0; i < max; i++) { DecideRule rule = rules.get(i); if (rule.onlyDecision(uri) != result) { @@ -98,13 +123,37 @@ public DecideResult innerDecide(CrawlURI uri) { } } - if (fileLogger != null) { - fileLogger.info(decisiveRuleNumber + " " + decisiveRule.getClass().getSimpleName() + " " + result + " " + uri); - } + decisionMade(uri, decisiveRule, decisiveRuleNumber, result); return result; } - + + protected void decisionMade(CrawlURI uri, DecideRule decisiveRule, + int decisiveRuleNumber, DecideResult result) { + if (fileLogger != null) { + JSONObject extraInfo = null; + if (logExtraInfo) { + CrawlHost crawlHost = getServerCache().getHostFor(uri.getUURI()); + String host = "-"; + if (crawlHost != null) { + host = crawlHost.fixUpName(); + } + + extraInfo = new JSONObject(); + extraInfo.put("hopPath", uri.getPathFromSeed()); + extraInfo.put("via", uri.getVia()); + extraInfo.put("seed", uri.getSourceTag()); + extraInfo.put("host", host); + } + + fileLogger.info(decisiveRuleNumber + + " " + decisiveRule.getClass().getSimpleName() + + " " + result + + " " + uri + + (extraInfo != null ? " " + extraInfo : "")); + } + } + protected String beanName; public String getBeanName() { return this.beanName; @@ -113,7 +162,7 @@ public String getBeanName() { public void setBeanName(String name) { this.beanName = name; } - + protected boolean isRunning = false; @Override public boolean isRunning() { diff --git a/modules/src/main/java/org/archive/modules/net/CrawlHost.java b/modules/src/main/java/org/archive/modules/net/CrawlHost.java index bdf377db2..c674d6148 100644 --- a/modules/src/main/java/org/archive/modules/net/CrawlHost.java +++ b/modules/src/main/java/org/archive/modules/net/CrawlHost.java @@ -20,8 +20,10 @@ package org.archive.modules.net; import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.net.Inet4Address; import java.net.InetAddress; +import java.net.URLEncoder; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.logging.Level; @@ -282,5 +284,17 @@ public void makeDirty() { @Override public void setIdentityCache(ObjectIdentityCache cache) { this.cache = cache; - } + } + + public String fixUpName() { + if ("dns:".equals(getHostName()) || "whois:".equals(getHostName())) { + return getHostName(); + } else { + try { + return URLEncoder.encode(getHostName(), "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + } }