ActiveMQ原生Demo

Tip:可以先看一下最后的总结,因为这些代码真的很没有意思…

1. 依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.5</version>
</dependency>

2. QueueDemo

Queue模式生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class QueueProducer {
public static final String NAME = "user";
public static final String PASSWORD = "123456";
public static final String URL = "tcp://127.0.0.1:61616";

public void queueSend(String target,String msg) throws JMSException {
//1.创建工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(NAME,PASSWORD,URL);
//2.创建连接
Connection connection = factory.createConnection();
connection.start();
//3.创建回话,开启事务
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//4.创建点对点(Queue)消息队列,并指明消息目的地
Destination destination = session.createQueue(target);
//5.创建消息生产者
MessageProducer producer = session.createProducer(destination);
//6.创建一条消息
TextMessage text = session.createTextMessage(msg);
//7.生产者发送消息
producer.send(text);
//8.因为设置了事务,所以需要提交会话
session.commit();
//9.关闭连接
connection.close();
}
}
Queue模式消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class QueueConsumer {
public static final String NAME = "user";
public static final String PASSWORD = "123456";
public static final String URL = "tcp://127.0.0.1:61616";

public void queueReceive(String target) throws JMSException {
//1.创建工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(NAME,PASSWORD,URL);
//2.创建连接
Connection connection = factory.createConnection();
connection.start();
//3.创建回话,关闭事务
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.创建点对点(Queue)消息队列,并指明消息目的地
Destination destination = session.createQueue(target);
//5.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//6.设置监听器
consumer.setMessageListener(message -> {
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});

//7.关闭连接,因为要监听,所以不能关闭连接,所以我注释掉了
//connection.close();
}
}
测试

测试方法可以自己写一个,因为Queue模式是不需要订阅的,所以生产者消费者哪个先启动都可以,最终消费者都可以接收到消息。

3. TopicDemo

Topic模式生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TopicProducer {
public static final String NAME = "user";
public static final String PASSWORD = "123456";
public static final String URL = "tcp://127.0.0.1:61616";

public void topicSend(String target,String msg) throws JMSException {
//1.创建工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(NAME,PASSWORD,URL);
//2.创建连接
Connection connection = factory.createConnection();
connection.start();
//3.创建回话,开启事务
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//4.创建广播(Topic)消息队列,指明目的地
Destination destination = session.createTopic(target);
//5.创建消息生产者
MessageProducer producer = session.createProducer(destination);
//6.创建一条消息
TextMessage text = session.createTextMessage(msg);
//7.生产者发送消息
producer.send(text);
//8.因为设置了事务,所以需要提交会话
session.commit();
//9.关闭连接
connection.close();
}
}
Topic模式消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TopicConsumer {
public static final String NAME = "user";
public static final String PASSWORD = "123456";
public static final String URL = "tcp://127.0.0.1:61616";

public void topicReceive(String target) throws JMSException {
//1.创建工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(NAME,PASSWORD,URL);
//2.创建连接
Connection connection = factory.createConnection();
connection.start();
//3.创建回话,关闭事务
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.创建广播(Topic)消息队列,指明目的地,与生产者地址一致
Destination destination = session.createTopic(target);
//5.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//6.设置监听器
consumer.setMessageListener(message -> {
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});

//7.关闭连接,因为要监听,所以不能关闭连接,所以我注释掉了
//connection.close();
}
}
测试

因为Topic模式消费者需要先订阅消息,所以需要先启动消费者,再启动生产者。

总结

不难发现Queue模式和Topic的代码几乎是完全一样的,就只是在创建消息队列时一个是createQueue另一个是createTopic,仅此而已!

本案例代码已上传至Github,欢迎下载查看。