ActiveMQ 的入门使用
简介:
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
特点:
- 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
- 完全支持JMS客户端和Message Broker中的企业集成模式
- 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
- 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
- Spring支持,以便ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
- 专为高性能集群,客户端 - 服务器,基于对等的通信而设计
- CXF和Axis支持,以便ActiveMQ可以轻松地放入这些Web服务堆栈中以提供可靠的消息传递
- 可以用作内存JMS提供程序,非常适合单元测试JMS
- 支持可插拔传输协议,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA传输
- 使用JDBC和高性能日志支持非常快速的持久性
运行测试:
windows安装
官网下载:https://activemq.apache.org/activemq-5014005-release
解压
账号密码都是:admin
Linux运行:
#下载Linux版本的解压包:https://activemq.apache.org/components/classic/download/
#解压:
`tar -zxvf xxxx.gz`
#进入bin目录下:
./activemq start #启动
#查看进程
ps -ef|grep activemq
./activemq stop #停止
docker 安装 activemq
docker search activemq
docker pull rmohr/activemq
docker run --name activemq -d -p 61616:61616 -p 8161:8161 rmohr/activemq
访问:http://localhost:8161/admin/
JMS代码演示
创建Maven项目:
引入依赖:
org.apache.activemq
activemq-all
5.9.1
队列模式
创建生产者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/*
创建生产着
*/
public class AppProducer {
private static final String URL = "tcp://localhost:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 2.创建连接
Connection connection = connectionFactory.createConnection();
// 3. 启动链接
connection.start();
// 4.创建会话
// 是否启动事务,会话方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建发送消息的目标
Destination destination = session.createQueue(queueName);
// 6.创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7. 创建消息
TextMessage textMessage = session.createTextMessage("test->>>"+i);
// 8.发送消息
producer.send(textMessage);
System.out.println("发送消息["+ textMessage.getText()+"]成功");
}
// 8.关闭连接
connection.close();
}
}
运行访问:
http://localhost:8161/admin/queues.jsp;jsessionid=1sqybj5hdfzgr6kjhctd3xjbg
创建消费者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppConsumer {
private static final String URL = "tcp://localhost:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 2.创建连接
Connection connection = connectionFactory.createConnection();
// 3. 启动链接
connection.start();
// 4.创建会话
// 是否启动事务,会话方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建发送消息的目标
Destination destination = session.createQueue(queueName);
// 6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 7.创建监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到的消息["+textMessage.getText()+"]");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 8.关闭连接 消费者一直会监听,不需要关闭连接
//connection.close();
}
}
创建两个消费者并同时运行监听,生产者之后发送消息
运行:自动负载均衡
主题模式
在没有订阅主题的情况下,订阅者是不会接受到任何消息的
创建主题队列:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/*
创建主题
*/
public class AppProducer {
private static final String URL = "tcp://localhost:61616";
private static final String topicName = "queue-topic";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 2.创建连接
Connection connection = connectionFactory.createConnection();
// 3. 启动链接
connection.start();
// 4.创建会话
// 是否启动事务,会话方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建发送消息的目标
Destination destination = session.createTopic(topicName);//-------之前是创建队列,现在是创建主题
// 6.创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7. 创建消息
TextMessage textMessage = session.createTextMessage("test->>>"+i);
// 8.发送消息
producer.send(textMessage);
System.out.println("发送消息["+ textMessage.getText()+"]成功");
}
// 8.关闭连接
connection.close();
}
}
创建订阅者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppConsumer2 {
private static final String URL = "tcp://localhost:61616";
private static final String topicName = "queue-topic";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 2.创建连接
Connection connection = connectionFactory.createConnection();
// 3. 启动链接
connection.start();
// 4.创建会话
// 是否启动事务,会话方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建发送消息的目标
Destination destination = session.createTopic(topicName);//-------之前是创建队列,现在是创建主题
// 6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 7.创建监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到的消息["+textMessage.getText()+"]");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 8.关闭连接 消费者一直会监听,不需要关闭连接
//connection.close();
}
}
运行两个订阅者,之后发送消息:
SpringBoot集成JMS连接ActiveMQ
SingleConnectionFactory 连接工厂:使用它,只是用一个连接操作 。 CachingConnectionFactory连接工厂:继承上面的工厂,并提供了缓存,会话,消费生产功能
代码具体实现推荐博客: