ActiveMQ(二)之Java实现简单的ActiveMQ通讯(queue模式)
Java实现简单的ActiveMQ通讯(queue模式)
我对学习的理解:先理解,再记忆(在脑海中回忆4遍记忆,或者是默写4遍)
说出你遇到的问题。以及不懂的地方,欢迎大家评论
- 
你会学到:
- 如何用java实现一个简单的生产者生产消息
 - 如何用java实现一个简单的消费者消费消息
 - 消费者消费消息的二种方式(同步阻塞、异步非阻塞)
 - 消费者和生产者谁先启动会造成什么影响?
 
 - 
前提条件:你必须在centos安装了activeMQ,并且能够得到web界面,如果你没有安装,请看这篇
 - 
需要的maven依赖
 
    org.apache.activemq 
    activemq-all 
    5.15.11 
 
    org.apache.xbean 
    xbean-spring 
    4.15 
 
- ???请记住下面这张图片(下面的代码都是基于这张图片的)
 

准备好了,那么我们开始吧!
1. 生产者生产消息
生成生产流程(依据上面这张图):
- 创建给定ActiveMQ服务连接工厂
 - 由连接工厂生成出connection连接对象
 - 由连接对象创建出session会话对象
 - 创建目的地(队列或者主题)
 - 创建消息生产者(需要与目的地建立连接)
 - 发送消息(创建消息、发送消息)
 - 释放资源
 
public class JSMProducor {
    // ActiveMQ服务地址 (它采用的是tcp协议)
    public static final String ACTIVEMQ_URL = "tcp://182.61.35.6:61617";
    // 消息队列名称
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        // 1. 创建工厂对象 (这个工厂对象连接到哪里?ACTIVEMQ_URL)
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2. 创建连接对象, 并启动访问
        Connection connection = factory.createConnection();
        connection.start();
        // 3. 通过连接对象创建session对象,第一个参数为是否开启事务,第二个参数为签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(队列或者主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5. 创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        // 6. 发送消息
        for (int i = 0; i < 3; i++) {
            // 6.1 创建消息
            TextMessage textMessage = session.createTextMessage("msg信息:" + i + ":hello world");
            // 6.2 将消息发送到MQ
            producer.send(textMessage);
        }
        // 7. 关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("success.....");
    }
}
这里就可以显示消息发送成功
2. 消费者消费消息
消费者消费流程(依据上面那张图):
- 创建工厂对象(思考:这个工厂连接着哪里?)
 - 根据工厂生产出连接对象,并建立连接
 - 根据连接对象创建session会话对象
 - 创建目的地(需要与消费者建立通道)
 - 创建消费者(与目的地建立连接)
 - 消费消息
 - 释放资源
 
class JSMConsumer{
    // ActiveMQ服务地址 (它采用的是tcp协议)
    public static final String ACTIVEMQ_URL = "tcp://182.61.35.6:61617";
    // 消息队列名称
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
//        while(true){
//            TextMessage receive = (TextMessage)consumer.receive();
//            TextMessage receive1 = (TextMessage)consumer.receive(3000L);
//            if(receive != null) {
//                System.out.println("消息内容是:" + receive.getText());
//            }else {
//                break;
//            }
//        }
//        // 关闭资源
//        session.close();
//        consumer.close();
//        connection.close();
        // 方式二:设置监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听器接受到的消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 说明为什么要使用 ystem.in.read();
        // 因为设置监听器是异步非阻塞,线程不需要等它接收消息就会往下执行
        // 所以这里设置了等待,防止他直接释放资源,接收线程了
        System.in.read();
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}
下面就说明了消费者消费成功了
3. 思考一个问题?
生产者先启动:
? 出现一个消费者?怎么办?直接消费。
? 出现两个消费者? 怎么办?先启动的消费者消费。
2个消费者先启动:
? 生成者生成的消息均摊到2个消费者上面去。
4. 总结 + 几个知识点和注意
- 
请记住上面的那张图,那么敲这些代码就简单了
 - 
解释一下下面的几个点

 - 
解释一些消费者的receive方法

 - 
消费者消费的两种方式
- 同步阻塞:通过receive()方法消费
 - 同步非阻塞:通过设置监听器(这种方式是异步非阻塞的)来消费消息
 
 - 
生产者和消费者的先后启动问题