ActiveMQ
1.ActiveMQ介绍:
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件
2.消息中间件应用场景
异步处理 应用解耦 流量削锋
(1)异步处理
场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。
串行方式
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客 户端。

并行方式
将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客 户端。与串行的差别是,并行的方式可以提高处理的时间

异步处理
引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下:

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送 短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间 可能是50毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了3倍,比并行提高了两倍。
(2)应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。 传统的做法是,订单系统调用库存系统的接口。如下图:

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存 系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存 系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假 如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再 关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存 系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假 如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再 关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
(3)流量消峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般 会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数 b、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请 求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理
目前常用的消息队列
特性MQ | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
生产者消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布订阅模式 | 支持 | 支持 | 支持 | 支持 |
请求回应模式 | 支持 | 支持 | 不支持 | 不支持 |
Api完备性 | 高 | 高 | 高 | 高 |
多语言支持 | 支持 | 支持 | java | 支持 |
单机吞吐量 | 万级 | 万级 | 万级 | 十万级 |
消息延迟 | 无 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
消息丢失 | 低 | 低 | 理论上不会丢失 | 理论上不会丢失 |
文档的完备性 | 高 | 高 | 高 | 高 |
提供快速入门 | 有 | 有 | 有 | 有 |
社区活跃度 | 高 | 高 | 有 | 高 |
商业支持 | 无 | 无 | 商业云 | 商业云 |
3.什么是JMS?
介绍
JMS(Java?Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。 JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。
JMS消息模型
消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)。
(1) P2P (Point to Point) 点对点模型(Queue队列模型)
(2) Publish/Subscribe(Pub/Sub) 发布/订阅模型(Topic主题模型)
常用术语
- Provider/MessageProvider:生产者
- Consumer/MessageConsumer:消费者
- PTP:Point To Point,点对点通信消息模型
- Pub/Sub:Publish/Subscribe,发布订阅消息模型
- Queue:队列,目标类型之一,和PTP结合
- Topic:主题,目标类型之一,和Pub/Sub结合
- ConnectionFactory:连接工厂,JMS用它创建连接
- Connnection:JMS Client到JMS Provider的连接
- Destination:消息目的地,由Session创建
- Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的
点对点模型
点对点模型(Pointer-to-Pointer):即生产者和消费者之间的消息往来
每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
点对点模型的特点:
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中);
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;
接收者在成功接收消息之后需向队列应答成功。
发布/订阅模型
包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发
送到topic,系统将这些消息投递到订阅此topic的订阅者
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发
布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。
发布/订阅模型的特点:
每个消息可以有多个消费者;
发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
订阅者必须保持运行的状态,才能接受发布者发布的消息;
4.JMS的消息格式
JMS消息组成
-
消息头:
每个消息头字段都有相应的getter和setter方法。
-
消息属性:
如果需要除消息头字段以外的值,那么可以使用消息属性。
message.setStringProperty("Property",Property); //自定义属性
-
消息体:
JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage(对象实现序列化接口)。
ObjectMessage:
发送
//发送ObjectMessage消息
@Test
public void test2(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
User user = new User();
user.setName("小苍");
user.setAge(18);
ObjectMessage objectMessage = session.createObjectMessage(user);
return objectMessage;
}
});
}
接受
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage)message;
try {
User user = (User)objectMessage.getObject();
System.out.println(user.getUsername());
System.out.println(user.getPassword());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的 加入到受信任的列表。
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
packages:
trust-all: true # 添加所有包到信任列表
两种模式比较
1 | Topic | Queue |
---|---|---|
Publish Subscribe messaging 发布 订阅消息 | Point-to-Point 点对点 | |
有无状态 | topic 数据默认不落地,是无状态的。 | Queue 数据默认会在 mq 服 务器上以文件形式保存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面。也可以配置成 DB 存储。 |
完整性保障 | 并不保证 publisher 发布的每条数 据,Subscriber 都能接受到。 | Queue 保证每条数据都能 被 receiver 接收。消息不超时。 |
消息是否会丢失 | 一般来说 publisher 发布消息到某 一个 topic 时,只有正在监听该 topic 地址的 sub 能够接收到消息;如果没 有 sub 在监听,该 topic 就丢失了。 | Sender 发 送 消 息 到 目 标 Queue, receiver 可以异步接收这 个 Queue 上的消息。Queue 上的 消息如果暂时没有 receiver 来 取,也不会丢失。前提是消息不 超时。 |
消息发布接 收策略 | 一对多的消息发布接收策略,监 听同一个topic地址的多个sub都能收 到 publisher 发送的消息。Sub 接收完 通知 mq 服务器 | 一对一的消息发布接收策 略,一个 sender 发送的消息,只 能有一个 receiver 接收。 receiver 接收完后,通知 mq 服务器已接 收,mq 服务器对 queue 里的消 息采取删除或其他操作。 |
5.ActiveMQ的安装
Linux安装流程
第一步:安装 jdk(略)
第二步:把 activemq的压缩包(apache-activemq-5.14.5-bin.tar.gz)上传到 linux 系统
第三步:解压缩压缩包
tar -zxvf apache-activemq-5.14.5-bin.tar.gz
第四步:进入apache-activemq-5.14.5的bin目录
cd apache-activemq-5.14.5/bin
第五步:启动 activemq
./activemq start (执行2次:第一次:生成配置信息;第二次:启动)
第六步:停止activemq:
./activemq stop
页面控制台: http://ip:8161 (监控)
请求地址: tcp://ip:61616 (java代码访问消息中间件)
账号:admin
密码:admin
登录后列表各列信息含义如下:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
windows安装
下载后直接解压缩直接就能用(免安装)。
bin/是服务启动相关的命令文件所在目录
data/是默认持久化文件所在目录
docs/里面放的是用户手册
conf/是配置文件所在目录,任何配置文件修改后,必须重启ActiveMQ,才能生效.
有几个配置文件需要提下,后面会用到:
1.activemq.xml
就是spring配置文件。配置的是ActiveMQ应用使用的默认对象组件.
transportConnectors标签 - 配置链接端口信息的. 其中的端口号61616是ActiveMQ对外发布的tcp协议访问端口. 就是java代码访问ActiveMQ时使用的端口.
配置安全认证和持久化都是在这个文件里面。
2.jetty.xml
spring配置文件, ActiveMQ使用的是jetty提供HTTP服务,因此需要该文件用于配置jetty服务器的默认对象组件.
3.users.properties
内容信息: 用户名=密码
是用于配置客户端通过协议访问ActiveMQ时,使用的用户名和密码.
4.groups.properties
内容信息: 用户组=用户1,用户2(多个用户中间用逗号隔开)
双击bin\win64目录下的activemq批处理文件,即可启动
访问http://localhost:8161/admin/,输入默认的用户名和密码admin/admin,即可看到管理台页面
6.原生JMS API操作ActiveMQ
PTP模式
引入坐标
org.apache.activemq
activemq-all
5.11.2
生产者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式 -- 消息生产者
*/
public class PTP_Producer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory
= new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
/**
* 参数一:是否开启事务操作
* 参数二:消息确认机制
*/
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
Queue queue = session.createQueue("queue01");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
//createTextMessage: 文本类型
TextMessage textMessage = session.createTextMessage("test message");
//8.发送消息
producer.send(textMessage);
System.out.println("消息发送完成");
//9.释放资源
session.close();
connection.close();
}
}
消费者
方案一(1)
package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式- 消息消费者(第一种方案)
*/
public class PTP_Consumer1 {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory
= new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.指定目标地址
Queue queue = session.createQueue("queue01");
//6.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息
while(true){
Message message = consumer.receive();
//如果已经没有消息了,结束啦
if(message==null){
break;
}
//如果还有消息,判断什么类型的消息
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
System.out.println("接收的消息:"+textMessage.getText());
}
}
}
}
方案二(2)
package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式- 消息消费者(第二种方案) -- 更加推荐
*/
public class PTP_Consumer2 {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory
= new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.指定目标地址
Queue queue = session.createQueue("queue01");
//6.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
//7.设置消息监听器来接收消息
consumer.setMessageListener(new MessageListener() {
//处理消息
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收的(2):"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
}
}
Pub/Sub模式
生产者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主题消息,消息的发送方
*/
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
Topic topic = session.createTopic("sms");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage message = session.createTextMessage("发短信...");
//8.发送消息
producer.send(message);
System.out.println("发送消息:发短信...");
session.close();;
connection.close();
}
}
消费者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主题消息,消息的消费方
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
Topic topic = session.createTopic("sms");
//6.创建消息的消费者
MessageConsumer consumer = session.createConsumer(topic);
//7.配置消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
7.SpringBoot整合ActiveMQ
生产者
1.依赖
org.springframework.boot
spring-boot-starter-activemq
2.配置:
server:
port: 9001 #端口
spring:
application:
name: activemq-producer # 服务名称
# springboot与activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 连接地址
user: admin # activemq用户名
password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
pub-sub-domain: false
3.启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 生产者启动类
*/
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class,args);
}
}
4.编写生产者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* 演示SpringBoot与ActiveMQ整合- 消息生产者
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
//JmsMessagingTemplate: 用于工具类发送消息
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void ptpSender(){
/**
* 参数一:队列的名称或主题名称
* 参数二:消息内容
*/
jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
message");
}
}
消费者
1.依赖
org.springframework.boot
spring-boot-starter-activemq
2.配置
server:
port: 9002 #端口
spring:
application:
name: activemq-consumer # 服务名称
# springboot与activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 连接地址
user: admin # activemq用户名
password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
pub-sub-domain: false
activemq:
name: springboot_queue
3.启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 消息消费者启动类
*/
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
4.监听消息类
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
* 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
*/
@Component // 放入IOC容器
public class MsgListener {
/**
* 用于接收消息的方法
* destination: 队列的名称或主题的名称
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
8.练习SpringBoot 集成 ActiveMQ
1、创建一个springboot项目,添加依赖
org.springframework.boot
spring-boot-starter-activemq
1.5.0.RELEASE
org.apache.activemq
activemq-pool
5.15.0
2、application.yml文件的配置
server:
port: 8080
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
close-timeout: 15s # 在考虑结束之前等待的时间
in-memory: true # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
non-blocking-redelivery: false # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
send-timeout: 0 # 等待消息发送响应的时间。设置为0等待永远。
queue-name: active.queue
topic-name: active.topic.name.model
# packages:
# trust-all: true #不配置此项,会报错
pool:
enabled: true
max-connections: 10 #连接池最大连接数
idle-timeout: 30000 #空闲的连接过期时间,默认为30秒
# jms:
# pub-sub-domain: true #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔列表(当不信任所有包时)
#spring.activemq.packages.trusted=
# 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然满,则在抛出异常前阻塞时间。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在启动时创建连接。可以在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 连接过期超时。
#spring.activemq.pool.expiry-timeout=0ms
# 连接空闲超时
#spring.activemq.pool.idle-timeout=30s
# 连接池最大连接数
#spring.activemq.pool.max-connections=1
# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 当有"JMSException"时尝试重新连接
#spring.activemq.pool.reconnect-on-exception=true
# 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一个MessageProducer
#spring.activemq.pool.use-anonymous-producers=true
3、启动类增加 @EnableJms 注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms //启动消息队列
public class SpringbootActivemqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootActivemqApplication.class, args);
}
}
4、初始化和配置 ActiveMQ 的连接
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
@Configuration
publicclass BeanConfig
{
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.topic-name}")
private String password;
@Value("${spring.activemq.queue-name}")
private String queueName;
@Value("${spring.activemq.topic-name}")
private String topicName;
@Bean(name = "queue")
public Queue queue() {
return new ActiveMQQueue(queueName);
}
@Bean(name = "topic")
public Topic topic() {
return new ActiveMQTopic(topicName);
}
@Bean
public ConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(username, password, brokerUrl); }
@Bean
public JmsMessagingTemplate jmsMessageTemplate(){
return new JmsMessagingTemplate(connectionFactory());
}
// 在Queue模式中,对消息的监听需要对containerFactory进行配置
@Bean("queueListener")
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(false);
return factory;
}
//在Topic模式中,对消息的监听需要对containerFactory进行配置
@Bean("topicListener")
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
}
5、生产者(queue 和 topic)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
@RestController
public class ProducerController
{
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@PostMapping("/queue/test")
public String sendQueue(@RequestBody String str) {
this.sendMessage(this.queue, str);
return "success";
}
@PostMapping("/topic/test")
public String sendTopic(@RequestBody String str) {
this.sendMessage(this.topic, str);
return "success";
}
// 发送消息,destination是发送到的队列,message是待发送的消息
private void sendMessage(Destination destination, final String message){
jmsMessagingTemplate.convertAndSend(destination, message);
}
}
6、Queue模式的消费者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class QueueConsumerListener
{
//queue模式的消费者
@JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
public void readActiveQueue(String message) {
System.out.println("queue接受到:" + message);
}
}
7、topic模式的消费者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumerListener
{
//topic模式的消费者
@JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
public void readActiveQueue(String message) {
System.out.println("topic接受到:" + message);
}
}
8、测试(使用Postman发消息)
(1) POST: http://localhost:8080/queue/test 消息体:{"aaa" : "queue"}
控制台打印:queue接受到:{"aaa" : "queue"}
(2) POST: http://localhost:8080/topic/test 消息体:{"aaa" : "topic"}
控制台打印:topic接受到:{"aaa" : "topic"}
topic模式有普通订阅和持久化订阅
普通订阅:在消费者启动之前发送过来的消息,消费者启动之后不会去消费;
持久化订阅: 在消费者启动之前发送过来的消息,消费者启动之后会去消费;
9.持久化
分类:
1.Memory 消息存储-基于内存的消息存储。
2.基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复 能力。
3.基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。
修改jms配置文件
delivery-mode: non_persistent # 非持久化(内存)
delivery-mode: persistent # 持久化(日志KahaDB)
JDBC消息存储
1)application.yml
server:
port: 9001
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false:点对点队列模式, true:发布/订阅模式
template:
delivery-mode: persistent # 持久化
activemq:
name: springboot-queue01
2)修改activemq.xml
3)拷贝mysql及durid数据源的jar包到activemq的lib目录下
4)重启activemq
10.消息事务
一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。 生产者、消费者与消息服务器直接都支持事务性;
一、生产者事务
/**
* 事务性发送--方案一
*/
@Test
public void sendMessageTx(){
//获取连接工厂
ConnectionFactory connectionFactory=
jmsMessagingTemplate.getConnectionFactory();
Session session=null;
try{
//创建连接
Connection connection=connectionFactory.createConnection();
/**
* 参数一:是否开启消息事务
*/a
session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//创建生产者
MessageProducer producer=
session.createProducer(session.createQueue(name));
for(int i=1;i<=10;i++){
//模拟异常
if(i==4){
int a=10/0;
}
TextMessage textMessage=session.createTextMessage("消息--"+
i);
producer.send(textMessage);
}
//注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达
MQ服务器
session.commit();
}catch(JMSException e){
e.printStackTrace();
//消息事务回滚
try{
session.rollback();
}catch(JMSException e1){
e1.printStackTrace();
}
}
}
方式二:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory
connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
}
生产者业务类:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息发送的业务类
*/
@Service
public class MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${activemq.name}")
private String name;
@Transactional // 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
public void sendMessage(){
for(int i=1;i<=10;i++) {
//模拟异常
if(i==4){
int a = 10/0;
}
jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
}
}
}
测试发送方法:
@Autowired
private MessageService messageService;
/**
* 事务性发送--方案二: Spring的JmsTransactionManager功能
*/
@Test
public void sendMessageTx2(){
messageService.sendMessage();
}
二、消费者事务
/**
* 消息消费者
*/
@Component
public class Consumer {
/**
* 接收消息的方法
*/
@JmsListener(destination="${activemq.name}",containerFactory =
"jmsQueryListenerFactory")
public void receiveMessage(TextMessage textMessage,Session session) throws
JMSException {
try {
System.out.println("消息内容:" + textMessage.getText() + ",是否重发:"
+ textMessage.getJMSRedelivered());
int i = 100/0; //模拟异常
session.commit();//提交事务
} catch (JMSException e) {
try {
session.rollback();//回滚事务
} catch (JMSException e1) {
}
e.printStackTrace();
}
}
}
11.消息确认机制
MS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接 收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在 非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参 数有以下三个可选值:
Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从 MessageListener.onMessage方法成功返回的时候,会话 自动确认客户收到的消息
Session.CLIENT_ACKNOWLEDGE:客户通过消息的acknowledge方法确认消息。需要注意的 是,在这种模式中,确认是在会话层上进行:确认一个被消 费的消息将自动确认所有已被会话消费的消息。例如,如果 一个消息消费者消费了10个消息,然后确认第5个消息,那 么所有10个消息都被确认
Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝确认消息的提交。如果JMS provider失 败,那么可能会导致一些重复的消息。如果是重复的消息, 那么JMS provider必须把消息头的JMSRedelivered字段设置 为true