ActiveMQ 的入门使用


简介:

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

特点:

  1. 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
  2. 完全支持JMS客户端和Message Broker中的企业集成模式
  3. 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
  4. 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
  5. Spring支持,以便ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
  6. 专为高性能集群,客户端 - 服务器,基于对等的通信而设计
  7. CXF和Axis支持,以便ActiveMQ可以轻松地放入这些Web服务堆栈中以提供可靠的消息传递
  8. 可以用作内存JMS提供程序,非常适合单元测试JMS
  9. 支持可插拔传输协议,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA传输
  10. 使用JDBC和高性能日志支持非常快速的持久性

运行测试:

windows安装

官网下载:https://activemq.apache.org/activemq-5014005-release

解压

账号密码都是:admin

Linux运行:

#下载Linux版本的解压包:https://activemq.apache.org/components/classic/download/

#解压:
`tar -zxvf xxxx.gz`
#进入bin目录下:
./activemq start #启动 

#查看进程
ps -ef|grep activemq

./activemq stop #停止

docker 安装 activemq

docker search activemq

docker pull rmohr/activemq

docker run --name activemq -d -p 61616:61616 -p 8161:8161 rmohr/activemq

访问:http://localhost:8161/admin/

JMS代码演示

创建Maven项目:

引入依赖:

    
     
            org.apache.activemq
            activemq-all
            5.9.1
      

队列模式

创建生产者:



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/*
    创建生产着
 */
public class AppProducer {
    private static final String URL = "tcp://localhost:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.创建连接
        Connection connection = connectionFactory.createConnection();
        // 3. 启动链接
        connection.start();

        // 4.创建会话
        // 是否启动事务,会话方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建发送消息的目标
        Destination destination = session.createQueue(queueName);

        // 6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {
            // 7. 创建消息
            TextMessage textMessage = session.createTextMessage("test->>>"+i);
            // 8.发送消息
            producer.send(textMessage);
            System.out.println("发送消息["+ textMessage.getText()+"]成功");
        }

        // 8.关闭连接
        connection.close();

    }
}

运行访问:

http://localhost:8161/admin/queues.jsp;jsessionid=1sqybj5hdfzgr6kjhctd3xjbg

创建消费者:


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppConsumer {
    private static final String URL = "tcp://localhost:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.创建连接
        Connection connection = connectionFactory.createConnection();
        // 3. 启动链接
        connection.start();

        // 4.创建会话
        // 是否启动事务,会话方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建发送消息的目标
        Destination destination = session.createQueue(queueName);

        // 6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.创建监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;

                try {
                    System.out.println("接收到的消息["+textMessage.getText()+"]");
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });

        // 8.关闭连接 消费者一直会监听,不需要关闭连接
        //connection.close();
    }
}

创建两个消费者并同时运行监听,生产者之后发送消息

运行:自动负载均衡

主题模式

在没有订阅主题的情况下,订阅者是不会接受到任何消息的

创建主题队列:



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/*
    创建主题
 */
public class AppProducer {

    private static final String URL = "tcp://localhost:61616";
    private static final String topicName = "queue-topic";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.创建连接
        Connection connection = connectionFactory.createConnection();
        // 3. 启动链接
        connection.start();

        // 4.创建会话
        // 是否启动事务,会话方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建发送消息的目标
        Destination destination = session.createTopic(topicName);//-------之前是创建队列,现在是创建主题

        // 6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {
            // 7. 创建消息
            TextMessage textMessage = session.createTextMessage("test->>>"+i);
            // 8.发送消息
            producer.send(textMessage);
            System.out.println("发送消息["+ textMessage.getText()+"]成功");
        }

        // 8.关闭连接
        connection.close();

    }
}

创建订阅者:



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppConsumer2 {
    private static final String URL = "tcp://localhost:61616";
    private static final String topicName = "queue-topic";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.创建连接
        Connection connection = connectionFactory.createConnection();
        // 3. 启动链接
        connection.start();

        // 4.创建会话
        // 是否启动事务,会话方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建发送消息的目标
        Destination destination = session.createTopic(topicName);//-------之前是创建队列,现在是创建主题

        // 6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.创建监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;

                try {
                    System.out.println("接收到的消息["+textMessage.getText()+"]");
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });

        // 8.关闭连接 消费者一直会监听,不需要关闭连接
        //connection.close();
    }
}

运行两个订阅者,之后发送消息:

SpringBoot集成JMS连接ActiveMQ

SingleConnectionFactory 连接工厂:使用它,只是用一个连接操作 。 CachingConnectionFactory连接工厂:继承上面的工厂,并提供了缓存,会话,消费生产功能

代码具体实现推荐博客: