ActiveMQ


1.ActiveMQ介绍:

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件

2.消息中间件应用场景

异步处理 应用解耦 流量削锋

(1)异步处理

场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。

串行方式

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客 户端。

image-20211227110519740

并行方式

将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客 户端。与串行的差别是,并行的方式可以提高处理的时间

image-20211227110546100

异步处理

引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下:

image-20211227110614553

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送 短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间 可能是50毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了3倍,比并行提高了两倍。

(2)应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。 传统的做法是,订单系统调用库存系统的接口。如下图:

image-20211227110646914

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存 系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图

image-20211227110703030

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存 系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假 如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再 关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存 系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假 如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再 关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

(3)流量消峰

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般 会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

a、可以控制活动的人数 b、可以缓解短时间内高流量压垮应用

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请 求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理

目前常用的消息队列

特性MQ ActiveMQ RabbitMQ RocketMQ Kafka
生产者消费者模式 支持 支持 支持 支持
发布订阅模式 支持 支持 支持 支持
请求回应模式 支持 支持 不支持 不支持
Api完备性
多语言支持 支持 支持 java 支持
单机吞吐量 万级 万级 万级 十万级
消息延迟 微秒级 毫秒级 毫秒级
可用性 高(主从) 高(主从) 非常高(分布式) 非常高(分布式)
消息丢失 理论上不会丢失 理论上不会丢失
文档的完备性
提供快速入门
社区活跃度
商业支持 商业云 商业云

3.什么是JMS?

介绍

JMS(Java?Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。 JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。

JMS消息模型
消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)。
(1) P2P (Point to Point) 点对点模型(Queue队列模型)
(2) Publish/Subscribe(Pub/Sub) 发布/订阅模型(Topic主题模型)

常用术语

  • Provider/MessageProvider:生产者
  • Consumer/MessageConsumer:消费者
  • PTP:Point To Point,点对点通信消息模型
  • Pub/Sub:Publish/Subscribe,发布订阅消息模型
  • Queue:队列,目标类型之一,和PTP结合
  • Topic:主题,目标类型之一,和Pub/Sub结合
  • ConnectionFactory:连接工厂,JMS用它创建连接
  • Connnection:JMS Client到JMS Provider的连接
  • Destination:消息目的地,由Session创建
  • Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的

点对点模型

点对点模型(Pointer-to-Pointer):即生产者和消费者之间的消息往来

image-20211227111119559

每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

点对点模型的特点:
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中);
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;
接收者在成功接收消息之后需向队列应答成功。

image-20211227111204627

发布/订阅模型

包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发
送到topic,系统将这些消息投递到订阅此topic的订阅者

image-20211227111228835

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发
布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

发布/订阅模型的特点:
每个消息可以有多个消费者;
发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
订阅者必须保持运行的状态,才能接受发布者发布的消息;

4.JMS的消息格式

JMS消息组成

  • 消息头:

    每个消息头字段都有相应的getter和setter方法。

  • 消息属性:

    如果需要除消息头字段以外的值,那么可以使用消息属性。

message.setStringProperty("Property",Property); //自定义属性
  • 消息体:

    JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage(对象实现序列化接口)。

ObjectMessage:

发送

//发送ObjectMessage消息
@Test
public void test2(){
jmsTemplate.send(name, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
        User user = new User();
        user.setName("小苍");
        user.setAge(18);
        ObjectMessage objectMessage = session.createObjectMessage(user);
        return objectMessage;
        }
    });
}

接受

@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
    if(message instanceof ObjectMessage){
        ObjectMessage objectMessage = (ObjectMessage)message;
        try {
            User user = (User)objectMessage.getObject();
            System.out.println(user.getUsername());
            System.out.println(user.getPassword());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的 加入到受信任的列表。

spring:
	activemq:
        broker-url: tcp://192.168.66.133:61616
        user: admin
        password: admin
        packages:
        	trust-all: true # 添加所有包到信任列表

两种模式比较

1 Topic Queue
Publish Subscribe messaging 发布 订阅消息 Point-to-Point 点对点
有无状态 topic 数据默认不落地,是无状态的。 Queue 数据默认会在 mq 服 务器上以文件形式保存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面。也可以配置成 DB 存储。
完整性保障 并不保证 publisher 发布的每条数 据,Subscriber 都能接受到。 Queue 保证每条数据都能 被 receiver 接收。消息不超时。
消息是否会丢失 一般来说 publisher 发布消息到某 一个 topic 时,只有正在监听该 topic 地址的 sub 能够接收到消息;如果没 有 sub 在监听,该 topic 就丢失了。 Sender 发 送 消 息 到 目 标 Queue, receiver 可以异步接收这 个 Queue 上的消息。Queue 上的 消息如果暂时没有 receiver 来 取,也不会丢失。前提是消息不 超时。
消息发布接 收策略 一对多的消息发布接收策略,监 听同一个topic地址的多个sub都能收 到 publisher 发送的消息。Sub 接收完 通知 mq 服务器 一对一的消息发布接收策 略,一个 sender 发送的消息,只 能有一个 receiver 接收。 receiver 接收完后,通知 mq 服务器已接 收,mq 服务器对 queue 里的消 息采取删除或其他操作。

5.ActiveMQ的安装

Linux安装流程

第一步:安装 jdk(略)
第二步:把 activemq的压缩包(apache-activemq-5.14.5-bin.tar.gz)上传到 linux 系统
第三步:解压缩压缩包
tar -zxvf apache-activemq-5.14.5-bin.tar.gz
第四步:进入apache-activemq-5.14.5的bin目录
cd apache-activemq-5.14.5/bin
第五步:启动 activemq
./activemq start (执行2次:第一次:生成配置信息;第二次:启动)
第六步:停止activemq:
./activemq stop
页面控制台: http://ip:8161 (监控)
请求地址: tcp://ip:61616 (java代码访问消息中间件)
账号:admin
密码:admin
登录后列表各列信息含义如下:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。

windows安装

下载后直接解压缩直接就能用(免安装)。

bin/是服务启动相关的命令文件所在目录

data/是默认持久化文件所在目录

docs/里面放的是用户手册

conf/是配置文件所在目录,任何配置文件修改后,必须重启ActiveMQ,才能生效.
有几个配置文件需要提下,后面会用到:
1.activemq.xml
就是spring配置文件。配置的是ActiveMQ应用使用的默认对象组件.
transportConnectors标签 - 配置链接端口信息的. 其中的端口号61616是ActiveMQ对外发布的tcp协议访问端口. 就是java代码访问ActiveMQ时使用的端口.

配置安全认证和持久化都是在这个文件里面。

2.jetty.xml
spring配置文件, ActiveMQ使用的是jetty提供HTTP服务,因此需要该文件用于配置jetty服务器的默认对象组件.

3.users.properties
内容信息: 用户名=密码

是用于配置客户端通过协议访问ActiveMQ时,使用的用户名和密码.

4.groups.properties

内容信息: 用户组=用户1,用户2(多个用户中间用逗号隔开)

双击bin\win64目录下的activemq批处理文件,即可启动

访问http://localhost:8161/admin/,输入默认的用户名和密码admin/admin,即可看到管理台页面

6.原生JMS API操作ActiveMQ

PTP模式

引入坐标


	
		org.apache.activemq
		activemq-all
		5.11.2
	

生产者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 演示点对点模式 -- 消息生产者
 */
public class PTP_Producer {
    public static void main(String[] args) throws JMSException {
//1.创建连接工厂
        ConnectionFactory factory
                = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
        Connection connection = factory.createConnection();
//3.打开连接
        connection.start();
//4.创建session
/**
 * 参数一:是否开启事务操作
 * 参数二:消息确认机制
 */
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
        Queue queue = session.createQueue("queue01");
//6.创建消息生产者
        MessageProducer producer = session.createProducer(queue);
//7.创建消息
//createTextMessage: 文本类型
        TextMessage textMessage = session.createTextMessage("test message");
//8.发送消息
        producer.send(textMessage);
        System.out.println("消息发送完成");
//9.释放资源
        session.close();
        connection.close();
    }
}
消费者

方案一(1)

package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 演示点对点模式- 消息消费者(第一种方案)
 */
public class PTP_Consumer1 {
    public static void main(String[] args) throws JMSException {
//1.创建连接工厂
        ConnectionFactory factory
                = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
        Connection connection = factory.createConnection();
//3.打开连接
        connection.start();
//4.创建session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.指定目标地址
        Queue queue = session.createQueue("queue01");
//6.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息
        while(true){
            Message message = consumer.receive();
//如果已经没有消息了,结束啦
            if(message==null){
                break;
            }
//如果还有消息,判断什么类型的消息
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                System.out.println("接收的消息:"+textMessage.getText());
            }
        }
    }
}

方案二(2)

package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 演示点对点模式- 消息消费者(第二种方案) -- 更加推荐
 */
public class PTP_Consumer2 {
    public static void main(String[] args) throws JMSException {
//1.创建连接工厂
        ConnectionFactory factory
                = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
        Connection connection = factory.createConnection();
//3.打开连接
        connection.start();
//4.创建session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.指定目标地址
        Queue queue = session.createQueue("queue01");
//6.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
//7.设置消息监听器来接收消息
        consumer.setMessageListener(new MessageListener() {
            //处理消息
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println("接收的(2):"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
//注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
    }
}

Pub/Sub模式

生产者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 主题消息,消息的发送方
 */
public class TopicProducer {
    public static void main(String[] args) throws Exception {
//1.创建连接工厂
        ConnectionFactory factory = new
                ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.创建连接
        Connection connection = factory.createConnection();
//3.打开连接
        connection.start();
//4.创建session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
        Topic topic = session.createTopic("sms");
//6.创建消息生产者
        MessageProducer producer = session.createProducer(topic);
//7.创建消息
        TextMessage message = session.createTextMessage("发短信...");
//8.发送消息
        producer.send(message);
        System.out.println("发送消息:发短信...");
        session.close();;
        connection.close();
    }
}
消费者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 主题消息,消息的消费方
 */
public class TopicConsumer {
    public static void main(String[] args) throws Exception {
//1.创建连接工厂
        ConnectionFactory factory = new
                ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.创建连接
        Connection connection = factory.createConnection();
//3.打开连接
        connection.start();
//4.创建session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
        Topic topic = session.createTopic("sms");
//6.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(topic);
//7.配置消息监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

7.SpringBoot整合ActiveMQ

生产者

1.依赖

	org.springframework.boot
	spring-boot-starter-activemq

2.配置:
server:
	port: 9001 #端口
spring:
	application:
		name: activemq-producer # 服务名称
# springboot与activemq整合配置
	activemq:
		broker-url: tcp://192.168.66.133:61616 # 连接地址
		user: admin # activemq用户名
		password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
	jms:
		pub-sub-domain: false
3.启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * 生产者启动类
 */
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class,args);
    }
}
4.编写生产者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
 * 演示SpringBoot与ActiveMQ整合- 消息生产者
 */
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
    //JmsMessagingTemplate: 用于工具类发送消息
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Test
    public void ptpSender(){
/**
 * 参数一:队列的名称或主题名称
 * 参数二:消息内容
 */
        jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
                message");
    }
}

消费者

1.依赖

	org.springframework.boot
	spring-boot-starter-activemq

2.配置
server:
	port: 9002 #端口
spring:
	application:
		name: activemq-consumer # 服务名称
# springboot与activemq整合配置
	activemq:
		broker-url: tcp://192.168.66.133:61616 # 连接地址
		user: admin # activemq用户名
		password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
	jms:
		pub-sub-domain: false
activemq:
	name: springboot_queue

3.启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * 消息消费者启动类
 */
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class,args);
    }
}
4.监听消息类
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
 * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
 */
@Component // 放入IOC容器
public class MsgListener {
    /**
     * 用于接收消息的方法
     * destination: 队列的名称或主题的名称
     */
    @JmsListener(destination = "${activemq.name}")
    public void receiveMessage(Message message){
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("接收消息:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

8.练习SpringBoot 集成 ActiveMQ

1、创建一个springboot项目,添加依赖

        
        
            org.springframework.boot
            spring-boot-starter-activemq
            1.5.0.RELEASE
        
        
        
            org.apache.activemq
            activemq-pool
            5.15.0
        

2、application.yml文件的配置

server:
  port: 8080

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    close-timeout: 15s   # 在考虑结束之前等待的时间
    in-memory: true      # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
    non-blocking-redelivery: false  # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
    send-timeout: 0     # 等待消息发送响应的时间。设置为0等待永远。
    queue-name: active.queue
    topic-name: active.topic.name.model

#  packages:
#    trust-all: true #不配置此项,会报错
  pool:
    enabled: true
    max-connections: 10   #连接池最大连接数
    idle-timeout: 30000   #空闲的连接过期时间,默认为30秒

 # jms:
 #   pub-sub-domain: true  #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置

# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔列表(当不信任所有包时)
#spring.activemq.packages.trusted=
# 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然满,则在抛出异常前阻塞时间。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在启动时创建连接。可以在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 连接过期超时。
#spring.activemq.pool.expiry-timeout=0ms
# 连接空闲超时
#spring.activemq.pool.idle-timeout=30s
# 连接池最大连接数
#spring.activemq.pool.max-connections=1
# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 当有"JMSException"时尝试重新连接
#spring.activemq.pool.reconnect-on-exception=true
# 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一个MessageProducer
#spring.activemq.pool.use-anonymous-producers=true

3、启动类增加 @EnableJms 注解

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

@SpringBootApplication
@EnableJms    //启动消息队列
public class SpringbootActivemqApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootActivemqApplication.class, args);
    }

}

4、初始化和配置 ActiveMQ 的连接

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
publicclass BeanConfig
{

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.topic-name}")
    private String password;

    @Value("${spring.activemq.queue-name}")
    private String queueName;

    @Value("${spring.activemq.topic-name}")
    private String topicName;

    @Bean(name = "queue")
    public Queue queue() {
        return new ActiveMQQueue(queueName);
    }

    @Bean(name = "topic")
    public Topic topic() {
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ConnectionFactory connectionFactory(){
        return new ActiveMQConnectionFactory(username, password, brokerUrl);    }

    @Bean
    public JmsMessagingTemplate jmsMessageTemplate(){
        return new JmsMessagingTemplate(connectionFactory());
    }

    // 在Queue模式中,对消息的监听需要对containerFactory进行配置
    @Bean("queueListener")
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }

    //在Topic模式中,对消息的监听需要对containerFactory进行配置
    @Bean("topicListener")
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

5、生产者(queue 和 topic)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;

@RestController
public class ProducerController
{
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @PostMapping("/queue/test")
    public String sendQueue(@RequestBody String str) {
        this.sendMessage(this.queue, str);
        return "success";
    }

    @PostMapping("/topic/test")
    public String sendTopic(@RequestBody String str) {
        this.sendMessage(this.topic, str);
        return "success";
    }

    // 发送消息,destination是发送到的队列,message是待发送的消息
    private void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}

6、Queue模式的消费者

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class QueueConsumerListener
{
    //queue模式的消费者
    @JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
    public void readActiveQueue(String message) {
        System.out.println("queue接受到:" + message);
    }
}

7、topic模式的消费者

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumerListener
{
    //topic模式的消费者
    @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
    public void readActiveQueue(String message) {
        System.out.println("topic接受到:" + message);
    }
}

8、测试(使用Postman发消息)

(1) POST: http://localhost:8080/queue/test 消息体:{"aaa" : "queue"}

控制台打印:queue接受到:{"aaa" : "queue"}img

(2) POST: http://localhost:8080/topic/test 消息体:{"aaa" : "topic"}

控制台打印:topic接受到:{"aaa" : "topic"}

img

topic模式有普通订阅和持久化订阅

普通订阅:在消费者启动之前发送过来的消息,消费者启动之后不会去消费;

持久化订阅: 在消费者启动之前发送过来的消息,消费者启动之后会去消费;

9.持久化

分类:

1.Memory 消息存储-基于内存的消息存储。

2.基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复 能力。

3.基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。

修改jms配置文件
delivery-mode: non_persistent # 非持久化(内存)

delivery-mode: persistent # 持久化(日志KahaDB)

JDBC消息存储

1)application.yml

server:
	port: 9001
spring:
    activemq:
        broker-url: tcp://192.168.66.133:61616
        user: admin
        password: admin
    jms:
        pub-sub-domain: false # false:点对点队列模式, true:发布/订阅模式
        template:
    		delivery-mode: persistent # 持久化
activemq:
    name: springboot-queue01

2)修改activemq.xml



    
    
    
    



	


3)拷贝mysql及durid数据源的jar包到activemq的lib目录下

4)重启activemq

10.消息事务

一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。 生产者、消费者与消息服务器直接都支持事务性;

一、生产者事务

/**
 * 事务性发送--方案一
 */
@Test
public void sendMessageTx(){
	//获取连接工厂
    ConnectionFactory connectionFactory=
    jmsMessagingTemplate.getConnectionFactory();
    Session session=null;
    try{
	//创建连接
        Connection connection=connectionFactory.createConnection();
	/**
	 * 参数一:是否开启消息事务
	 */a
        session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
		//创建生产者
        MessageProducer producer=
        session.createProducer(session.createQueue(name));
        for(int i=1;i<=10;i++){
        //模拟异常
            if(i==4){
            int a=10/0;
        }
        TextMessage textMessage=session.createTextMessage("消息--"+
        i);
        producer.send(textMessage);
       	}
		//注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达
        MQ服务器
        session.commit();
        }catch(JMSException e){
        	e.printStackTrace();
			//消息事务回滚
        try{
        	session.rollback();
        }catch(JMSException e1){
        	e1.printStackTrace();
        }
    }
}

方式二:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory
    connectionFactory) {
    	return new JmsTransactionManager(connectionFactory);
    }
}

生产者业务类:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息发送的业务类
*/
@Service
public class MessageService {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Value("${activemq.name}")
    private String name;
    @Transactional // 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
    public void sendMessage(){
        for(int i=1;i<=10;i++) {
                //模拟异常
                if(i==4){
                    int a = 10/0;
                }
            jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
        }
    }
}

测试发送方法:

@Autowired
private MessageService messageService;
/**
* 事务性发送--方案二: Spring的JmsTransactionManager功能
*/
@Test
public void sendMessageTx2(){
    messageService.sendMessage();
}

二、消费者事务

/**
* 消息消费者
*/
@Component
public class Consumer {
    /**
    * 接收消息的方法
    */
    @JmsListener(destination="${activemq.name}",containerFactory =
    "jmsQueryListenerFactory")
    public void receiveMessage(TextMessage textMessage,Session session) throws
    JMSException {
        try {
                System.out.println("消息内容:" + textMessage.getText() + ",是否重发:"
                + textMessage.getJMSRedelivered());
                int i = 100/0; //模拟异常
                session.commit();//提交事务
            } catch (JMSException e) {
            try {
            	session.rollback();//回滚事务
            } catch (JMSException e1) {
            }
            e.printStackTrace();
        }
    }
}

11.消息确认机制

MS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接 收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在 非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参 数有以下三个可选值:

Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从 MessageListener.onMessage方法成功返回的时候,会话 自动确认客户收到的消息

Session.CLIENT_ACKNOWLEDGE:客户通过消息的acknowledge方法确认消息。需要注意的 是,在这种模式中,确认是在会话层上进行:确认一个被消 费的消息将自动确认所有已被会话消费的消息。例如,如果 一个消息消费者消费了10个消息,然后确认第5个消息,那 么所有10个消息都被确认

Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝确认消息的提交。如果JMS provider失 败,那么可能会导致一些重复的消息。如果是重复的消息, 那么JMS provider必须把消息头的JMSRedelivered字段设置 为true