消息中间件


目录
  • 消息中间件
    • 什么是消息中间件
      • 概述
      • 消息中间件应用场景
      • 同步与异步技术
        • 同步技术
        • 异步技术
    • JMS
      • 概述:
      • jms消息服务器同类型技术
      • JMS中支持的消息类型
      • JMS中的两种发送模式
        • 点对点模式
        • 订阅发布模式
    • ActiveMQ安装
      • 启动
    • 快速入门
      • 创建普通Jar工程
      • 引入pom依赖
      • 点对点模式Queue
        • 创建QueueProducer
        • 创建QueueConsumer
      • 订阅发布模式Topic
        • TopicConsumer1
        • TopicConsumer2
        • TopicProducer
    • 消息中间件在工程里的应用
      • 添加中间所需要的依赖在common工程当中
      • 在service_sellergoods工程当中添加配置文件
        • spring/applicationContext-jms.xml
      • 在修改商品状态和删除商品时发送消息
      • 在service_page工程当中监听消息生成静态页面
        • 添加配置文件
          • spring/applicationContext-jms-consumer.xml
      • 编写监听器
      • 在service_search工程当中监听消息添加和删除solr商品信息
        • 添加配置文件
          • spring/applicationContext-jms-consumer.xml
        • 添加监听器
          • ItemSearchListener
          • ItemDeleteListener

消息中间件

什么是消息中间件

概述

  • 消息中间件可以理解成就是一个服务软件,保存信息的容器,比如生活中的快递云柜.
  • 我们把数据放到消息中间件当中, 然后通知对应的服务进行获取
  • 消息中间件是在消息的传输过程中保存信息的容器

消息中间件应用场景

  • 使用消息服务器当做大的队列使用, 先进先出, 来处理高并发写入操作
  • 使用消息服务器可以将业务系统的串行执行改为并行执行, 处理效率高, 更合理的榨取服务器的性能.

同步与异步技术

同步技术

  • dubbo是一中同步技术, 实时性高, controller调用service项目, 调用就执行,
  • 如果service项目中的代码没有执行完, controller里面的代码一致等待结果.

异步技术

  • mq消息中间件技术(jms) 是一种异步技术, 消息发送方, 将消息发送给消息服务器,
  • 消息服务器未必立即处理.什么时候去处理, 主要看消息服务器是否繁忙,
  • 消息进入服务器后会进入队列中, 先进先出.实时性不高.

JMS

概述:

  • jms的全称叫做Java message service (Java消息服务) jms是jdk底层定义的规范
  • 各大厂商都是实现这个规范的技术

jms消息服务器同类型技术

ActiveMQ
	是apache的一个比较老牌的消息中间件, 它比较均衡, 既不是最安全的, 也不是最快的.
	
RabbitMQ
	是阿里巴巴的一个消息中间件, 更适合金融类业务, 它对数据的安全性比较高.能够保证数据不丢失.
	
Kafka
	Apache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;适合处理海量数据。

JMS中支持的消息类型

TextMessage
	一个字符串对象
	
MapMessage
	key-value
	
ObjectMessage
	一个序列化的 Java 对象
	
BytesMessage
	一个字节的数据流
	
StreamMessage
	Java 原始值的数据流

JMS中的两种发送模式

点对点模式

  • 一个发送方, 一个接收方.
  • 也可以多个发送方, 一个接收方, 主要是接收方必须是第一个.

订阅发布模式

  • 一个发送方, 多个接收方.
  • 发送方也可以是多个, 主要看接收方, 接收方必须是多个

ActiveMQ安装

(1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器/usr/local目录下
(2)解压此文件
	cd /usr/local
	tar  zxvf  apache-activemq-5.12.0-bin.tar.gz
(3)为apache-activemq-5.12.0目录赋权
	chmod 777 apache-activemq-5.12.0
(4)进入apache-activemq-5.12.0/bin目录赋与执行权限
	cd apache-activemq-5.12.0/bin
	chmod 755 activemq 
  (5)    启动
	 ./activemq start

启动

在浏览器当中输入http://192.168.1.88:8161/
进入管理页面
用户名和密码都是 admin

说明
	Number Of Pending Messages  :等待消费的消息 这个是当前未出队列的数量。
	Number Of Consumers  :消费者 这个是消费者端的消费者数量
	Messages Enqueued  :进入队列的消息  进入队列的总数量,包括出队列的。
	Messages Dequeued  :出了队列的消息  可以理解为是消费这消费掉的数量。

快速入门

创建普通Jar工程

引入pom依赖


	
		org.apache.activemq
		activemq-client
		5.13.4
	

点对点模式Queue

创建QueueProducer

public static void main(String[] args) throws Exception{
	//1.创建连接工厂
	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
	//2.获取连接
	Connection connection = connectionFactory.createConnection();
	//3.启动连接
	connection.start();
	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//5.创建队列对象, 指定发送的队列名称, 队列名称可以随意起名, 但是发送到哪里, 就要从哪里去接收
	Queue queue = session.createQueue("test-queue");
	//6.创建消息生产者
	MessageProducer producer = session.createProducer(queue);
	//7.创建消息
	TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
	//8.发送消息
	producer.send(textMessage);
	//9.关闭资源
	producer.close();
	session.close();
	connection.close();
}

创建QueueConsumer

public static void main(String[] args) throws Exception{
	//1.创建连接工厂
	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
	//2.获取连接
	Connection connection = connectionFactory.createConnection();
	//3.启动连接
	connection.start();
	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//5.创建队列对象
	Queue queue = session.createQueue("test-queue");
	//6.创建消息消费
	MessageConsumer consumer = session.createConsumer(queue);
	//7.监听消息
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(Message message) {
			TextMessage textMessage=(TextMessage)message;
			try {
				System.out.println("接收到消息:"+textMessage.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	});
	//8.等待键盘输入
	System.in.read();
	//9.关闭资源
	consumer.close();
	session.close();
	connection.close();

}

订阅发布模式Topic

TopicConsumer1

public static void main(String[] args) throws Exception {
//1.创建连接工厂
	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
	//2.获取连接
	Connection connection = connectionFactory.createConnection();
	//3.启动连接
	connection.start();
	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//5.创建主题对象
	Topic topic = session.createTopic("test-topic");
	//6.创建消息消费
	MessageConsumer consumer = session.createConsumer(topic);
	//7.监听消息
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(Message message) {
			TextMessage textMessage=(TextMessage)message;
			try {
				System.out.println("接收到消息:"+textMessage.getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	});
	//8.等待键盘输入
	System.in.read();
	//9.关闭资源
	consumer.close();
	session.close();
	connection.close();
}

TopicConsumer2

public static void main(String[] args) throws Exception {
	//1.创建连接工厂
	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
	//2.获取连接
	Connection connection = connectionFactory.createConnection();
	//3.启动连接
	connection.start();
	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//5.创建主题对象
	Topic topic = session.createTopic("test-topic");
	//6.创建消息消费
	MessageConsumer consumer = session.createConsumer(topic);

    //7.监听消息
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
    //8.等待键盘输入
    System.in.read();
    //9.关闭资源
    consumer.close();
    session.close();
    connection.close();

}

TopicProducer

public static void main(String[] args) throws Exception {
	//1.创建连接工厂
	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
	//2.获取连接
	Connection connection = connectionFactory.createConnection();
	//3.启动连接
	connection.start();
	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//5.创建主题对象
	Topic topic = session.createTopic("test-topic");
	//6.创建消息生产者
	MessageProducer producer = session.createProducer(topic);
	//7.创建消息
	TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
	//8.发送消息
	producer.send(textMessage);
	//9.关闭资源
	producer.close();
	session.close();
	connection.close();
}

消息中间件在工程里的应用

添加中间所需要的依赖在common工程当中



	org.apache.activemq
	activemq-client
	5.13.4


	org.springframework
	spring-jms


	javax.jms
	javax.jms-api
	2.0.1

在service_sellergoods工程当中添加配置文件

spring/applicationContext-jms.xml

<?xml version="1.0" encoding="UTF-8"?>


      
      
        
    

      
      
      
          
      

      
      
          
          
       

    
    
        
        
    

      
    
        
        
      

在修改商品状态和删除商品时发送消息

//注入属性		
@Autowired
private JmsTemplate jmsTemplate;
//为商品上架使用
@Autowired
private ActiveMQTopic topicPageAndSolrDestination;
//为商品下架使用
@Autowired
private ActiveMQQueue queueSolrDeleteDestination;
 //商品审核通过		
@Override
public void updateStatus(Long[] ids, String status) {
	if (ids != null) {
		for (final Long id : ids) {
			//1. 根据商品id修改商品对象状态码
			Goods goods  = new Goods();
			goods.setId(id);
			goods.setAuditStatus(status);
			goodsDao.updateByPrimaryKeySelective(goods);
			//2. 根据商品id修改库存集合对象状态码
			Item item = new Item();
			item.setStatus(status);
			ItemQuery query = new ItemQuery();
			ItemQuery.Criteria criteria = query.createCriteria();
			criteria.andGoodsIdEqualTo(id);
			itemDao.updateByExampleSelective(item, query);
			/**
			 * 将商品id作为消息发送给消息服务器
			 */
			if ("2".equals(status)) {
				jmsTemplate.send(topicPageAndSolrDestination, new MessageCreator() {
					@Override
					public Message createMessage(Session session) throws JMSException {
						TextMessage textMessage = session.createTextMessage(String.valueOf(id));
						return textMessage;
					}
				});
			}
		}
	}
}

//删除商品时
public void delete(Long[] ids) {
	if (ids != null) {
		for (final Long id : ids) {
			// 1. 到数据库中对商品进行逻辑删除
			Goods goods = new Goods();
			goods.setId(id);
			goods.setIsDelete("1");
			goodsDao.updateByPrimaryKeySelective(goods);
			//2 将商品id作为消息发送给消息服务器
			jmsTemplate.send(queueSolrDeleteDestination, new MessageCreator() {
				@Override
				public Message createMessage(Session session) throws JMSException {
					TextMessage textMessage = session.createTextMessage(String.valueOf(id));
					return textMessage;
				}
			});

		}
	}
}

在service_page工程当中监听消息生成静态页面

添加配置文件

spring/applicationContext-jms-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>


      
      
        
    

      
      
      
          
      

      
    
        
        
        

    
    
        
        
        
    
    


编写监听器

public class PageListener implements MessageListener {
    @Autowired
    private CmsService cmsService;
    @Override
    public void onMessage(Message message) {
        ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
        try {
            String goodsId = atm.getText();
            Map goodsData = cmsService.findGoodsData(Long.parseLong(goodsId));
            cmsService.createStaticPage(Long.parseLong(goodsId), goodsData);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在service_search工程当中监听消息添加和删除solr商品信息

添加配置文件

spring/applicationContext-jms-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>


      
      
        
    

      
      
      
          
      

      
    
        
        
        

    
    
        
        
        
    
    

    
    
        
        
    

    
    
        
        
        
    
    


添加监听器

ItemSearchListener
public class ItemSearchListener implements MessageListener {
    @Autowired
    private SolrManagerService solrManagerService;
    @Autowired
    private ItemDao itemDao;
    @Override
    public void onMessage(Message message) {
        //为了方便获取文本消息, 将原生的消息对象转换成activeMq的文本消息对象
        ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
        try {
            String goodsId = atm.getText();
            ItemQuery query = new ItemQuery();
            ItemQuery.Criteria criteria = query.createCriteria();
            //查询指定商品的库存数据
            criteria.andGoodsIdEqualTo(Long.parseLong(goodsId));
            List items = itemDao.selectByExample(query);
            if (items != null) {
                for (Item item : items) {
                    //获取规格json格式字符串
                    String specJsonStr = item.getSpec();
                    Map map = JSON.parseObject(specJsonStr, Map.class);
                    item.setSpecMap(map);
                }
            }
            solrManagerService.saveItemToSolr(items);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
ItemDeleteListener
public class ItemDeleteListener implements MessageListener {
    @Autowired
    private SolrManagerService solrManagerService;
    @Override
    public void onMessage(Message message) {
        ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
        try {
            String goodsId = atm.getText();
            ArrayList itemGoodsID = new ArrayList<>();
            itemGoodsID.add(Long.parseLong(goodsId));
            solrManagerService.deleteItemByGoodsId(itemGoodsID);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}