Skip to content

Commit

Permalink
INT-1257, INT-1263 Refactoring Message History (work in progress): Me…
Browse files Browse the repository at this point in the history
…ssageHistoryWriter's writeHistory method is now static. Added HistoryProvider interface and MessageHistoryBeanPostProcessor.
  • Loading branch information
markfisher committed Aug 25, 2010
1 parent 97c292c commit f5f948d
Show file tree
Hide file tree
Showing 23 changed files with 192 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.core.OrderComparator;
import org.springframework.core.convert.ConversionService;
import org.springframework.integration.Message;
import org.springframework.integration.MessageDeliveryException;
import org.springframework.integration.MessagingException;
import org.springframework.integration.context.HistoryProvider;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.context.MessageHistoryWriter;
import org.springframework.integration.core.MessageBuilder;
import org.springframework.integration.core.MessageChannel;
import org.springframework.util.Assert;
Expand All @@ -43,22 +46,29 @@
* @author Mark Fisher
* @author Oleg Zhurakousky
*/
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel {
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, HistoryProvider {

private final Log logger = LogFactory.getLog(this.getClass());

private volatile boolean shouldIncludeInHistory = false;

private final AtomicLong sendSuccessCount = new AtomicLong();

private final AtomicLong sendErrorCount = new AtomicLong();

private volatile Class<?>[] datatypes = new Class<?>[] { Object.class };

private final ChannelInterceptorList interceptors = new ChannelInterceptorList();

public String getComponentType(){


public String getComponentType() {
return "channel";
}

public void setShouldIncludeInHistory(boolean shouldIncludeInHistory) {
this.shouldIncludeInHistory = shouldIncludeInHistory;
}

/**
* Return the current count of Messages that have been sent
* to this channel successfully.
Expand Down Expand Up @@ -157,9 +167,11 @@ public final boolean send(Message<?> message) {
* time or the sending thread is interrupted.
*/
public final boolean send(Message<?> message, long timeout) {
this.writeMessageHistory(message);
Assert.notNull(message, "message must not be null");
Assert.notNull(message.getPayload(), "message payload must not be null");
if (this.shouldIncludeInHistory) {
message = MessageHistoryWriter.writeHistory(this, message);
}
message = this.convertPayloadIfNecessary(message);
message = this.interceptors.preSend(message, this);
if (message == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,15 @@

package org.springframework.integration.config;

import java.util.Map;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.context.MessageHistoryWriter;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.PollableChannel;
Expand All @@ -44,7 +39,7 @@
/**
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Josh Long
* @author Josh Long
*/
public class ConsumerEndpointFactoryBean
implements FactoryBean<AbstractEndpoint>, BeanFactoryAware, BeanNameAware, InitializingBean, SmartLifecycle {
Expand All @@ -59,9 +54,9 @@ public class ConsumerEndpointFactoryBean

private volatile boolean autoStartup = true;

private volatile MessageChannel inputChannel;
private volatile MessageChannel inputChannel;

private volatile ConfigurableBeanFactory beanFactory;
private volatile ConfigurableBeanFactory beanFactory;

private volatile AbstractEndpoint endpoint;

Expand All @@ -71,6 +66,7 @@ public class ConsumerEndpointFactoryBean

private final Object handlerMonitor = new Object();


public void setHandler(MessageHandler handler) {
Assert.notNull(handler, "handler must not be null");
synchronized (this.handlerMonitor) {
Expand All @@ -79,9 +75,9 @@ public void setHandler(MessageHandler handler) {
}
}

public void setInputChannel(MessageChannel inputChannel) {
this.inputChannel= inputChannel;
}
public void setInputChannel(MessageChannel inputChannel) {
this.inputChannel = inputChannel;
}

public void setInputChannelName(String inputChannelName) {
this.inputChannelName = inputChannelName;
Expand All @@ -100,22 +96,14 @@ public void setBeanName(String beanName) {
}

public void setBeanFactory(BeanFactory beanFactory) {
Assert.isInstanceOf(ConfigurableBeanFactory.class, beanFactory,
"a ConfigurableBeanFactory is required");
Assert.isInstanceOf(ConfigurableBeanFactory.class, beanFactory, "a ConfigurableBeanFactory is required");
this.beanFactory = (ConfigurableBeanFactory) beanFactory;
}

public void afterPropertiesSet() throws Exception {
// Will check if this.handler needs to be wrapped in a MessageHistoryAwareMessageHandler.
// Such wrapping is only required if this.beanFactory contains a bean of type MessageHistoryWriter.
Map<String, MessageHistoryWriter> historyWriters = BeanFactoryUtils.beansOfTypeIncludingAncestors(
(ListableBeanFactory) this.beanFactory, MessageHistoryWriter.class);
if (historyWriters.size() == 1) {
MessageHistoryWriter writer = historyWriters.values().iterator().next();
if (!this.beanName.startsWith("org.springframework") && this.handler instanceof IntegrationObjectSupport) {
this.handler = new MessageHistoryWritingMessageHandler(this.handler, writer, this.beanName);
}
}
if (!this.beanName.startsWith("org.springframework") && this.handler instanceof IntegrationObjectSupport) {
((IntegrationObjectSupport) this.handler).setComponentName(this.beanName);
}
this.initializeEndpoint();
}

Expand All @@ -142,34 +130,27 @@ private void initializeEndpoint() throws Exception {
if (this.initialized) {
return;
}



MessageChannel channel = null;

if(StringUtils.hasText(this.inputChannelName)) {
Assert.isTrue(this.beanFactory.containsBean(this.inputChannelName),
"no such input channel '" + this.inputChannelName + "' for endpoint '" + this.beanName + "'");
channel = this.beanFactory.getBean(this.inputChannelName, MessageChannel.class);
}
if( this.inputChannel != null ){
channel = this.inputChannel;
}

Assert.state( channel != null , "one of inputChannelName or inputChannel is required");

MessageChannel channel = null;
if (StringUtils.hasText(this.inputChannelName)) {
Assert.isTrue(this.beanFactory.containsBean(this.inputChannelName), "no such input channel '"
+ this.inputChannelName + "' for endpoint '" + this.beanName + "'");
channel = this.beanFactory.getBean(this.inputChannelName, MessageChannel.class);
}
if (this.inputChannel != null) {
channel = this.inputChannel;
}
Assert.state(channel != null, "one of inputChannelName or inputChannel is required");
if (channel instanceof SubscribableChannel) {
Assert.isNull(this.pollerMetadata, "A poller should not be specified for endpoint '" + this.beanName
+ "', since '" + channel + "' is a SubscribableChannel (not pollable).");
this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler);
}
else if (channel instanceof PollableChannel) {
PollingConsumer pollingConsumer = new PollingConsumer(
(PollableChannel) channel, this.handler);
PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) channel, this.handler);
if (this.pollerMetadata == null) {
this.pollerMetadata = IntegrationContextUtils.getDefaultPollerMetadata(this.beanFactory);
Assert.notNull(this.pollerMetadata, "No poller has been defined for endpoint '"
+ this.beanName + "', and no default poller is available within the context.");
Assert.notNull(this.pollerMetadata, "No poller has been defined for endpoint '" + this.beanName
+ "', and no default poller is available within the context.");
}
pollingConsumer.setTrigger(this.pollerMetadata.getTrigger());
pollingConsumer.setMaxMessagesPerPoll(this.pollerMetadata.getMaxMessagesPerPoll());
Expand All @@ -181,8 +162,7 @@ else if (channel instanceof PollableChannel) {
this.endpoint = pollingConsumer;
}
else {
throw new IllegalArgumentException(
"unsupported channel type: [" + channel.getClass() + "]");
throw new IllegalArgumentException("unsupported channel type: [" + channel.getClass() + "]");
}
this.endpoint.setBeanName(this.beanName);
this.endpoint.setBeanFactory(this.beanFactory);
Expand All @@ -192,6 +172,7 @@ else if (channel instanceof PollableChannel) {
}
}


/*
* SmartLifecycle implementation (delegates to the created endpoint)
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.config.xml;

import org.w3c.dom.Element;

import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext;
import org.w3c.dom.Element;

/**
* @author Oleg Zhurakousky
* @author Mark Fisher
* @since 2.0
*/
public class MessageHistoryParser extends AbstractSimpleBeanDefinitionParser {
private String messageHistory;

private static final String POST_PROCESSOR_CLASSNAME = "org.springframework.integration.context.MessageHistoryBeanPostProcessor";


@Override
protected String getBeanClassName(Element element) {
return "org.springframework.integration.context.MessageHistoryWriter";
return POST_PROCESSOR_CLASSNAME;
}

@Override
protected boolean shouldGenerateId() {
return false;
}

protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext){
if (messageHistory == null){
messageHistory = BeanDefinitionReaderUtils.generateBeanName(definition, parserContext.getRegistry());
} else {
throw new BeanDefinitionStoreException("Attempt to register more then one MessageHistoryWriter");

protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) {
if (parserContext.getRegistry().containsBeanDefinition(POST_PROCESSOR_CLASSNAME)) {
throw new BeanDefinitionStoreException("At most one MessageHistoryBeanPostProcessor may be registered within a context.");
}
return messageHistory;
return POST_PROCESSOR_CLASSNAME;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.context;

/**
* @author Mark Fisher
* @since 2.0
*/
public interface HistoryProvider extends NamedComponent {

void setShouldIncludeInHistory(boolean shouldIncludeInHistory);

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.context;

import org.springframework.beans.factory.BeanFactory;
Expand All @@ -27,7 +28,6 @@

import org.springframework.util.Assert;


/**
* Utility methods for accessing common integration components from the BeanFactory.
*
Expand Down
Loading

0 comments on commit f5f948d

Please sign in to comment.