Skip to content

Commit

Permalink
Merge pull request internetarchive#99 from nlevitt/aitfive-39
Browse files Browse the repository at this point in the history
AMQPCrawlLogFeed, DecideRuleSequenceWithAMQPFeed, DecideRuleSequence.logExtraInfo
  • Loading branch information
vonrosen committed Oct 16, 2014
2 parents 5f7dc49 + a27dbd1 commit 5909042
Show file tree
Hide file tree
Showing 12 changed files with 710 additions and 156 deletions.
112 changes: 112 additions & 0 deletions contrib/src/main/java/org/archive/modules/AMQPProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
*
* The IA licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.archive.modules;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class AMQPProducer {
static protected final Logger logger = Logger.getLogger(AMQPProducer.class.getName());

protected String amqpUri;
protected String exchange;
protected String routingKey;

public AMQPProducer(String amqpUri, String exchange, String routingKey) {
this.amqpUri = amqpUri;
this.exchange = exchange;
this.routingKey = routingKey;
}

transient protected Connection connection = null;
transient protected ThreadLocal<Channel> threadChannel =
new ThreadLocal<Channel>();

protected synchronized Channel channel() throws IOException {
if (threadChannel.get() != null && !threadChannel.get().isOpen()) {
threadChannel.set(null);
}

if (threadChannel.get() == null) {
if (connection == null || !connection.isOpen()) {
connect();
}
try {
if (connection != null) {
threadChannel.set(connection.createChannel());
}
} catch (IOException e) {
throw new IOException("Attempting to create channel for AMQP connection failed!", e);
}
}

return threadChannel.get();
}

private AtomicBoolean serverLooksDown = new AtomicBoolean(false);
private synchronized void connect() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
try {
factory.setUri(amqpUri);
connection = factory.newConnection();
boolean wasDown = serverLooksDown.getAndSet(false);
if (wasDown) {
logger.info(amqpUri + " is back up, connected successfully!");
}
} catch (Exception e) {
connection = null;
serverLooksDown.getAndSet(true);
throw new IOException("Attempting to connect to AMQP server failed!", e);
}
}

synchronized public void stop() {
try {
if (connection != null && connection.isOpen()) {
connection.close();
connection = null;
}
} catch (IOException e) {
logger.log(Level.SEVERE, "Attempting to close AMQP connection failed!", e);
}
}

/**
* Publish the message with the supplied properties. If this method returns
* without throwing an exception, the message was published successfully.
*
* @param message
* @param props
* @throws IOException
* if message is not published successfully for any reason
*/
public void publishMessage(byte[] message, BasicProperties props)
throws IOException {
Channel channel = channel();
channel.exchangeDeclare(exchange, "direct", true);
channel.basicPublish(exchange, routingKey, props, message);
}
}
126 changes: 126 additions & 0 deletions contrib/src/main/java/org/archive/modules/AMQPProducerProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
*
* The IA licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.archive.modules;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.rabbitmq.client.AMQP.BasicProperties;

/**
* @contributor nlevitt
*/
public abstract class AMQPProducerProcessor extends Processor {

protected final Logger logger = Logger.getLogger(getClass().getName());

protected String amqpUri = "amqp://guest:guest@localhost:5672/%2f";
public String getAmqpUri() {
return this.amqpUri;
}
public void setAmqpUri(String uri) {
this.amqpUri = uri;
}

protected String exchange;
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}

protected String routingKey;
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}

transient protected AMQPProducer amqpProducer;

protected AMQPProducer amqpProducer() {
if (amqpProducer == null) {
amqpProducer = new AMQPProducer(getAmqpUri(), getExchange(), getRoutingKey());
}
return amqpProducer;
}

@Override
synchronized public void stop() {
if (!isRunning) {
return;
}

super.stop();

if (amqpProducer != null) {
amqpProducer.stop();
}
}

@Override
protected ProcessResult innerProcessResult(CrawlURI curi)
throws InterruptedException {
byte[] message = null;
BasicProperties props = null;

message = buildMessage(curi);
props = amqpMessageProperties();
try {
amqpProducer().publishMessage(message, props);
success(curi, message, props);
} catch (IOException e) {
fail(curi, message, props, e);
}

return ProcessResult.PROCEED;
}

protected BasicProperties amqpMessageProperties() {
return null;
}

@Override
protected void innerProcess(CrawlURI uri) throws InterruptedException {
throw new RuntimeException("should never be called");
}

abstract protected byte[] buildMessage(CrawlURI curi);

protected void success(CrawlURI curi, byte[] message, BasicProperties props) {
if (logger.isLoggable(Level.FINE)) {
try {
logger.fine("sent to amqp exchange=" + getExchange()
+ " routingKey=" + routingKey + ": " + new String(message, "UTF-8"));
} catch (UnsupportedEncodingException e) {
logger.fine("sent to amqp exchange=" + getExchange()
+ " routingKey=" + routingKey + ": " + message + " (" + message.length + " bytes)");
}
}
}

protected void fail(CrawlURI curi, byte[] message, BasicProperties props, Throwable e) {
logger.log(Level.SEVERE, "failed to send message to amqp for URI " + curi, e);
}
}
Loading

0 comments on commit 5909042

Please sign in to comment.