Skip to content

Commit 027b082

Browse files
author
YunaiV
committed
增加 activemq 示例
1 parent 94a6bc1 commit 027b082

File tree

14 files changed

+369
-0
lines changed

14 files changed

+369
-0
lines changed

lab-32/lab-32-activemq-demo/pom.xml

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<groupId>org.springframework.boot</groupId>
7+
<artifactId>spring-boot-starter-parent</artifactId>
8+
<version>2.2.1.RELEASE</version>
9+
<relativePath/> <!-- lookup parent from repository -->
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>lab-32-activemq-demo</artifactId>
14+
15+
<dependencies>
16+
<!-- 实现对 ActiveMQ 的自动化配置 -->
17+
<dependency>
18+
<groupId>org.springframework.boot</groupId>
19+
<artifactId>spring-boot-starter-activemq</artifactId>
20+
</dependency>
21+
22+
<!-- 方便等会写单元测试 -->
23+
<dependency>
24+
<groupId>org.springframework.boot</groupId>
25+
<artifactId>spring-boot-starter-test</artifactId>
26+
<scope>test</scope>
27+
</dependency>
28+
</dependencies>
29+
30+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class Application {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(Application.class, args);
11+
}
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.config;
2+
3+
import cn.iocoder.springboot.lab32.activemqdemo.message.BroadcastMessage;
4+
import cn.iocoder.springboot.lab32.activemqdemo.message.ClusteringMessage;
5+
import org.apache.activemq.command.ActiveMQQueue;
6+
import org.apache.activemq.command.ActiveMQTopic;
7+
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
11+
import org.springframework.jms.core.JmsMessagingTemplate;
12+
import org.springframework.jms.core.JmsTemplate;
13+
14+
import javax.jms.ConnectionFactory;
15+
import javax.jms.Queue;
16+
import javax.jms.Topic;
17+
18+
@Configuration
19+
public class RabbitConfig {
20+
21+
public static final String CLUSTERING_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "clusteringJmsListenerContainerFactory";
22+
public static final String BROADCAST_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "broadcastJmsListenerContainerFactory";
23+
24+
public static final String CLUSTERING_JMS_TEMPLATE_BEAN_NAME = "clusteringJmsTemplate";
25+
public static final String BROADCAST_JMS_TEMPLATE_BEAN_NAME = "broadcastJmsTemplate";
26+
27+
// ========== 集群消费 ==========
28+
29+
@Bean
30+
public Queue clusteringQueue() {
31+
return new ActiveMQQueue(ClusteringMessage.QUEUE);
32+
}
33+
34+
@Bean(CLUSTERING_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
35+
public DefaultJmsListenerContainerFactory clusteringJmsListenerContainerFactory(
36+
DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
37+
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
38+
configurer.configure(factory, connectionFactory);
39+
factory.setPubSubDomain(false);
40+
return factory;
41+
}
42+
43+
@Bean(CLUSTERING_JMS_TEMPLATE_BEAN_NAME)
44+
public JmsMessagingTemplate clusteringJmsTemplate(ConnectionFactory connectionFactory) {
45+
// 创建 JmsTemplate 对象
46+
JmsTemplate template = new JmsTemplate(connectionFactory);
47+
template.setPubSubDomain(false);
48+
// 创建 JmsMessageTemplate
49+
return new JmsMessagingTemplate(template);
50+
}
51+
52+
// ========== 广播消费 ==========
53+
54+
@Bean
55+
public Topic broadcastTopic() {
56+
return new ActiveMQTopic(BroadcastMessage.TOPIC);
57+
}
58+
59+
@Bean(BROADCAST_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
60+
public DefaultJmsListenerContainerFactory broadcastJmsListenerContainerFactory(
61+
DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
62+
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
63+
configurer.configure(factory, connectionFactory);
64+
factory.setPubSubDomain(true);
65+
return factory;
66+
}
67+
68+
@Bean(BROADCAST_JMS_TEMPLATE_BEAN_NAME)
69+
public JmsMessagingTemplate broadcastJmsTemplate(ConnectionFactory connectionFactory) {
70+
// 创建 JmsTemplate 对象
71+
JmsTemplate template = new JmsTemplate(connectionFactory);
72+
template.setPubSubDomain(true);
73+
// 创建 JmsMessageTemplate
74+
return new JmsMessagingTemplate(template);
75+
}
76+
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.consumer;
2+
3+
import cn.iocoder.springboot.lab32.activemqdemo.config.RabbitConfig;
4+
import cn.iocoder.springboot.lab32.activemqdemo.message.BroadcastMessage;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.jms.annotation.JmsListener;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
public class BroadcastConsumer {
12+
13+
private Logger logger = LoggerFactory.getLogger(getClass());
14+
15+
@JmsListener(destination = BroadcastMessage.TOPIC,
16+
containerFactory = RabbitConfig.BROADCAST_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
17+
public void onMessage(BroadcastMessage message) {
18+
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
19+
}
20+
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.consumer;
2+
3+
import cn.iocoder.springboot.lab32.activemqdemo.config.RabbitConfig;
4+
import cn.iocoder.springboot.lab32.activemqdemo.message.ClusteringMessage;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.jms.annotation.JmsListener;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
public class ClusteringConsumer {
12+
13+
private Logger logger = LoggerFactory.getLogger(getClass());
14+
15+
@JmsListener(destination = ClusteringMessage.QUEUE,
16+
containerFactory = RabbitConfig.CLUSTERING_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
17+
public void onMessage(ClusteringMessage message) {
18+
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
19+
}
20+
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.message;
2+
3+
import java.io.Serializable;
4+
5+
/**
6+
* 广播消费的消息示例
7+
*/
8+
public class BroadcastMessage implements Serializable {
9+
10+
public static final String TOPIC = "TOPIC_BROADCASTING";
11+
12+
/**
13+
* 编号
14+
*/
15+
private Integer id;
16+
17+
public BroadcastMessage setId(Integer id) {
18+
this.id = id;
19+
return this;
20+
}
21+
22+
public Integer getId() {
23+
return id;
24+
}
25+
26+
@Override
27+
public String toString() {
28+
return "BroadcastMessage{" +
29+
"id=" + id +
30+
'}';
31+
}
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.message;
2+
3+
import java.io.Serializable;
4+
5+
/**
6+
* 广播消费的消息示例
7+
*/
8+
public class ClusteringMessage implements Serializable {
9+
10+
public static final String QUEUE = "QUEUE_CLUSTERING";
11+
12+
/**
13+
* 编号
14+
*/
15+
private Integer id;
16+
17+
public ClusteringMessage setId(Integer id) {
18+
this.id = id;
19+
return this;
20+
}
21+
22+
public Integer getId() {
23+
return id;
24+
}
25+
26+
@Override
27+
public String toString() {
28+
return "ClusteringtMessage{" +
29+
"id=" + id +
30+
'}';
31+
}
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab32.activemqdemo.config.RabbitConfig;
4+
import cn.iocoder.springboot.lab32.activemqdemo.message.BroadcastMessage;
5+
import org.springframework.jms.core.JmsMessagingTemplate;
6+
import org.springframework.stereotype.Component;
7+
8+
import javax.annotation.Resource;
9+
10+
@Component
11+
public class BroadcastProducer {
12+
13+
@Resource(name = RabbitConfig.BROADCAST_JMS_TEMPLATE_BEAN_NAME)
14+
private JmsMessagingTemplate jmsMessagingTemplate;
15+
16+
public void syncSend(Integer id) {
17+
// 创建 BroadcastMessage 消息
18+
BroadcastMessage message = new BroadcastMessage();
19+
message.setId(id);
20+
// 同步发送消息
21+
jmsMessagingTemplate.convertAndSend(BroadcastMessage.TOPIC, message);
22+
}
23+
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab32.activemqdemo.config.RabbitConfig;
4+
import cn.iocoder.springboot.lab32.activemqdemo.message.ClusteringMessage;
5+
import org.springframework.jms.core.JmsMessagingTemplate;
6+
import org.springframework.stereotype.Component;
7+
8+
import javax.annotation.Resource;
9+
10+
@Component
11+
public class ClusteringProducer {
12+
13+
@Resource(name = RabbitConfig.CLUSTERING_JMS_TEMPLATE_BEAN_NAME)
14+
private JmsMessagingTemplate rabbitTemplate;
15+
16+
public void syncSend(Integer id) {
17+
// 创建 ClusteringMessage 消息
18+
ClusteringMessage message = new ClusteringMessage();
19+
message.setId(id);
20+
// 同步发送消息
21+
rabbitTemplate.convertAndSend(ClusteringMessage.QUEUE, message);
22+
}
23+
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
spring:
2+
# ActiveMQ 配置项,对应 ActiveMQProperties 配置类
3+
activemq:
4+
broker-url: tcp://127.0.0.1:61616 # RabbitMQ Broker 的地址
5+
user: admin # 账号
6+
password: admin # 密码
7+
packages:
8+
trust-all: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab32.activemqdemo.Application;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.boot.test.context.SpringBootTest;
10+
import org.springframework.test.context.junit4.SpringRunner;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
14+
@RunWith(SpringRunner.class)
15+
@SpringBootTest(classes = Application.class)
16+
public class BroadcastProducerTest {
17+
18+
private Logger logger = LoggerFactory.getLogger(getClass());
19+
20+
@Autowired
21+
private BroadcastProducer producer;
22+
23+
@Test
24+
public void mock() throws InterruptedException {
25+
// 阻塞等待,保证消费
26+
new CountDownLatch(1).await();
27+
}
28+
29+
@Test
30+
public void testSyncSend() throws InterruptedException {
31+
for (int i = 0; i < 3; i++) {
32+
int id = (int) (System.currentTimeMillis() / 1000);
33+
producer.syncSend(id);
34+
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);
35+
}
36+
37+
// 阻塞等待,保证消费
38+
new CountDownLatch(1).await();
39+
}
40+
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package cn.iocoder.springboot.lab32.activemqdemo.producer;
2+
3+
import cn.iocoder.springboot.lab32.activemqdemo.Application;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.boot.test.context.SpringBootTest;
10+
import org.springframework.test.context.junit4.SpringRunner;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
14+
@RunWith(SpringRunner.class)
15+
@SpringBootTest(classes = Application.class)
16+
public class ClusteringProducerTest {
17+
18+
private Logger logger = LoggerFactory.getLogger(getClass());
19+
20+
@Autowired
21+
private ClusteringProducer producer;
22+
23+
@Test
24+
public void mock() throws InterruptedException {
25+
// 阻塞等待,保证消费
26+
new CountDownLatch(1).await();
27+
}
28+
29+
@Test
30+
public void testSyncSend() throws InterruptedException {
31+
// 发送 3 条消息
32+
for (int i = 0; i < 3; i++) {
33+
int id = (int) (System.currentTimeMillis() / 1000);
34+
producer.syncSend(id);
35+
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);
36+
}
37+
38+
// 阻塞等待,保证消费
39+
new CountDownLatch(1).await();
40+
}
41+
42+
}

lab-32/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<packaging>pom</packaging>
1414
<modules>
1515
<module>lab-32-activemq-native</module>
16+
<module>lab-32-activemq-demo</module>
1617
</modules>
1718

1819

0 commit comments

Comments
 (0)