ActiveMQ整合SpringBoot

0. 依赖

可以在创建SpringBoot项目的时候直接勾选JMS,也可以后期添加依赖。

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

1. 配置

1
2
3
4
5
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: user
password: 123456

2. QueueDemo

生产者
1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class Producer {

@Autowired
JmsMessagingTemplate template;

public void sendQueueText(String destinationName,String text){
System.out.println(">>>>>发送至"+destinationName+" 内容:"+text);
Destination destination = new ActiveMQQueue(destinationName);
template.convertAndSend(destination,text);
}
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class Consumer {
//监听来自test.queue的消息队列
@JmsListener(destination = "test.queue")
public void receiver1(Message message){
TextMessage text = (TextMessage)message;
try {
System.out.println("<<<<< 1 来自于test.queue 内容:"+text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

3. TopicDemo

生产者
1
2
3
4
5
public void sendTopicText(String destinationName,String text){
System.out.println(">>>>>发送至"+destinationName+" 内容:"+text);
Destination destination = new ActiveMQTopic(destinationName);
template.convertAndSend(destination,text);
}
消费者
1
2
3
4
5
6
7
8
9
@JmsListener(destination = "test.topic" , containerFactory = "subJmsListenerContainerFactory")
public void receiver3(Message message){
TextMessage text = (TextMessage)message;
try {
System.out.println("<<<<< 3 来自于test.topic 内容:"+text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

Topic模式的消费者因为需要先订阅消息,所以注解这里用到了containerFactory,这里需要配置一个containerFactory的Bean。代码如下:

1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class FactoryConfig {
//广播(Topic)消息的消费者需要配置ContainerFactory
@Bean
public JmsListenerContainerFactory subJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory simpleJmsListenerContainerFactory = new SimpleJmsListenerContainerFactory();
simpleJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
simpleJmsListenerContainerFactory.setPubSubDomain(true);
return simpleJmsListenerContainerFactory;
}
}

4. 测试

SpringBoot启动后,消费者是不会自动断开链接的,所以只需要运行生产者即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqDemoApplicationTests {
@Autowired
Producer producer;
@Autowired
Consumer consumer;

@Test
public void testQueue() throws InterruptedException {
for (int i = 1 ; i <= 10 ; i++){
producer.sendQueueText("test.queue","test "+i);
Thread.sleep(1000);
}
}

@Test
public void testTopic() throws InterruptedException {
for (int i = 1 ; i <= 10 ; i++){
producer.sendTopicText("test.topic","test "+i);
Thread.sleep(1000);
}
}

}

5. 总结

呃…只能说SpringBoot把一切都变的太简单了,几乎不用怎么写代码。

本案例代码已上传至Github,欢迎下载查看。