消息中间件
目录
- 消息中间件
- 什么是消息中间件
- 概述
- 消息中间件应用场景
- 同步与异步技术
- 同步技术
- 异步技术
- JMS
- 概述:
- jms消息服务器同类型技术
- JMS中支持的消息类型
- JMS中的两种发送模式
- 点对点模式
- 订阅发布模式
- ActiveMQ安装
- 启动
- 快速入门
- 创建普通Jar工程
- 引入pom依赖
- 点对点模式Queue
- 创建QueueProducer
- 创建QueueConsumer
- 订阅发布模式Topic
- TopicConsumer1
- TopicConsumer2
- TopicProducer
- 消息中间件在工程里的应用
- 添加中间所需要的依赖在common工程当中
- 在service_sellergoods工程当中添加配置文件
- spring/applicationContext-jms.xml
- 在修改商品状态和删除商品时发送消息
- 在service_page工程当中监听消息生成静态页面
- 添加配置文件
- spring/applicationContext-jms-consumer.xml
- 添加配置文件
- 编写监听器
- 在service_search工程当中监听消息添加和删除solr商品信息
- 添加配置文件
- spring/applicationContext-jms-consumer.xml
- 添加监听器
- ItemSearchListener
- ItemDeleteListener
- 添加配置文件
- 什么是消息中间件
消息中间件
什么是消息中间件
概述
- 消息中间件可以理解成就是一个服务软件,保存信息的容器,比如生活中的快递云柜.
- 我们把数据放到消息中间件当中, 然后通知对应的服务进行获取
- 消息中间件是在消息的传输过程中保存信息的容器
消息中间件应用场景
- 使用消息服务器当做大的队列使用, 先进先出, 来处理高并发写入操作
- 使用消息服务器可以将业务系统的串行执行改为并行执行, 处理效率高, 更合理的榨取服务器的性能.
同步与异步技术
同步技术
- dubbo是一中同步技术, 实时性高, controller调用service项目, 调用就执行,
- 如果service项目中的代码没有执行完, controller里面的代码一致等待结果.
异步技术
- mq消息中间件技术(jms) 是一种异步技术, 消息发送方, 将消息发送给消息服务器,
- 消息服务器未必立即处理.什么时候去处理, 主要看消息服务器是否繁忙,
- 消息进入服务器后会进入队列中, 先进先出.实时性不高.
JMS
概述:
- jms的全称叫做Java message service (Java消息服务) jms是jdk底层定义的规范
- 各大厂商都是实现这个规范的技术
jms消息服务器同类型技术
ActiveMQ
是apache的一个比较老牌的消息中间件, 它比较均衡, 既不是最安全的, 也不是最快的.
RabbitMQ
是阿里巴巴的一个消息中间件, 更适合金融类业务, 它对数据的安全性比较高.能够保证数据不丢失.
Kafka
Apache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;适合处理海量数据。
JMS中支持的消息类型
TextMessage
一个字符串对象
MapMessage
key-value
ObjectMessage
一个序列化的 Java 对象
BytesMessage
一个字节的数据流
StreamMessage
Java 原始值的数据流
JMS中的两种发送模式
点对点模式
- 一个发送方, 一个接收方.
- 也可以多个发送方, 一个接收方, 主要是接收方必须是第一个.
订阅发布模式
- 一个发送方, 多个接收方.
- 发送方也可以是多个, 主要看接收方, 接收方必须是多个
ActiveMQ安装
(1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器/usr/local目录下
(2)解压此文件
cd /usr/local
tar zxvf apache-activemq-5.12.0-bin.tar.gz
(3)为apache-activemq-5.12.0目录赋权
chmod 777 apache-activemq-5.12.0
(4)进入apache-activemq-5.12.0/bin目录赋与执行权限
cd apache-activemq-5.12.0/bin
chmod 755 activemq
(5) 启动
./activemq start
启动
在浏览器当中输入http://192.168.1.88:8161/
进入管理页面
用户名和密码都是 admin
说明
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
快速入门
创建普通Jar工程
引入pom依赖
org.apache.activemq
activemq-client
5.13.4
点对点模式Queue
创建QueueProducer
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象, 指定发送的队列名称, 队列名称可以随意起名, 但是发送到哪里, 就要从哪里去接收
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
创建QueueConsumer
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
订阅发布模式Topic
TopicConsumer1
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
TopicConsumer2
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
TopicProducer
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
消息中间件在工程里的应用
添加中间所需要的依赖在common工程当中
org.apache.activemq
activemq-client
5.13.4
org.springframework
spring-jms
javax.jms
javax.jms-api
2.0.1
在service_sellergoods工程当中添加配置文件
spring/applicationContext-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
在修改商品状态和删除商品时发送消息
//注入属性
@Autowired
private JmsTemplate jmsTemplate;
//为商品上架使用
@Autowired
private ActiveMQTopic topicPageAndSolrDestination;
//为商品下架使用
@Autowired
private ActiveMQQueue queueSolrDeleteDestination;
//商品审核通过
@Override
public void updateStatus(Long[] ids, String status) {
if (ids != null) {
for (final Long id : ids) {
//1. 根据商品id修改商品对象状态码
Goods goods = new Goods();
goods.setId(id);
goods.setAuditStatus(status);
goodsDao.updateByPrimaryKeySelective(goods);
//2. 根据商品id修改库存集合对象状态码
Item item = new Item();
item.setStatus(status);
ItemQuery query = new ItemQuery();
ItemQuery.Criteria criteria = query.createCriteria();
criteria.andGoodsIdEqualTo(id);
itemDao.updateByExampleSelective(item, query);
/**
* 将商品id作为消息发送给消息服务器
*/
if ("2".equals(status)) {
jmsTemplate.send(topicPageAndSolrDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(String.valueOf(id));
return textMessage;
}
});
}
}
}
}
//删除商品时
public void delete(Long[] ids) {
if (ids != null) {
for (final Long id : ids) {
// 1. 到数据库中对商品进行逻辑删除
Goods goods = new Goods();
goods.setId(id);
goods.setIsDelete("1");
goodsDao.updateByPrimaryKeySelective(goods);
//2 将商品id作为消息发送给消息服务器
jmsTemplate.send(queueSolrDeleteDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(String.valueOf(id));
return textMessage;
}
});
}
}
}
在service_page工程当中监听消息生成静态页面
添加配置文件
spring/applicationContext-jms-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
编写监听器
public class PageListener implements MessageListener {
@Autowired
private CmsService cmsService;
@Override
public void onMessage(Message message) {
ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
try {
String goodsId = atm.getText();
Map goodsData = cmsService.findGoodsData(Long.parseLong(goodsId));
cmsService.createStaticPage(Long.parseLong(goodsId), goodsData);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在service_search工程当中监听消息添加和删除solr商品信息
添加配置文件
spring/applicationContext-jms-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
添加监听器
ItemSearchListener
public class ItemSearchListener implements MessageListener {
@Autowired
private SolrManagerService solrManagerService;
@Autowired
private ItemDao itemDao;
@Override
public void onMessage(Message message) {
//为了方便获取文本消息, 将原生的消息对象转换成activeMq的文本消息对象
ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
try {
String goodsId = atm.getText();
ItemQuery query = new ItemQuery();
ItemQuery.Criteria criteria = query.createCriteria();
//查询指定商品的库存数据
criteria.andGoodsIdEqualTo(Long.parseLong(goodsId));
List- items = itemDao.selectByExample(query);
if (items != null) {
for (Item item : items) {
//获取规格json格式字符串
String specJsonStr = item.getSpec();
Map map = JSON.parseObject(specJsonStr, Map.class);
item.setSpecMap(map);
}
}
solrManagerService.saveItemToSolr(items);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ItemDeleteListener
public class ItemDeleteListener implements MessageListener {
@Autowired
private SolrManagerService solrManagerService;
@Override
public void onMessage(Message message) {
ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
try {
String goodsId = atm.getText();
ArrayList