MQ消息队列(二)- RabbitMQ
参考文章
- RabbitMQ四种Exchange类型
- RabbitMQ入门教程(九):首部交换机Headers
RabbitMQ
简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,为了兼容不同客户端/中间件产品的使用限制。
相关概念
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual Host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
Docker安装
- docker pull rabbitmq:management
- docker run -d --name my-rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 18800:15672 -p 18801:5672 rabbitmq:management
- 网页访问:
http://localhost:18800
,账号密码都为admin
,可在上面命令修改
六种工作模式
官网文档:https://www.rabbitmq.com/getstarted.html
先准备好工具代码,以下代码先运行消费者,再运行发布者
// 导包
com.rabbitmq
amqp-client
5.9.0
// 常量
public interface Constants {
String QUEUE_BASIC = "hello-world";
String QUEUE_WORK_QUEUE = "work-queue";
String EXCHANGE_FANOUT = "exchange-fanout";
String EXCHANGE_DIRECT = "exchange-direct";
String EXCHANGE_TOPIC = "exchange-topic";
String QUEUE_TOPIC_FIRST = "topic-first";
String QUEUE_TOPIC_SECOND = "topic-second";
String QUEUE_LOG_ALL = "log-all";
String QUEUE_LOG_ERROR = "log-error";
String QUEUE_NEWS_CITY_BJ = "news-city-bj";
String QUEUE_NEWS_DATE_GQ = "news-date-gq";
}
// 工具类
public class RabbitmqUtils {
public static void doWithConnection(RabbitmqConsumer consumer) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(18801);
factory.setUsername("admin");
factory.setPassword("admin");
// 如果没有创建vhost,则这一行可以不写
factory.setVirtualHost("/my");
try (Connection connection = factory.newConnection();) {
consumer.accept(connection);
}
}
}
// 工具类
@FunctionalInterface
public interface RabbitmqConsumer {
void accept(CONNECTION connection) throws Exception;
}
// 消息类
@Data
public class Msg {
private Integer num;
private String msg;
public Msg(Integer num) {
this.num = num;
}
public Msg(Integer num, String msg) {
this.num = num;
this.msg = msg;
}
}
-
简单模式
简单的生产者消费者模式。
点击查看代码
// 消费者
public class Consumer {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
// 每个新建队列都会自动绑定到默认交换机上,绑定的路由键名称与队列名称相同。
// 默认交换机实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机。
// 如果需要消息持久化,则把第二个参数设置为true。
channel.queueDeclare(Constants.QUEUE_BASIC, false, false, false, null);
channel.basicConsume(Constants.QUEUE_BASIC, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope);
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body, StandardCharsets.UTF_8));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
// 防止消息接受过程中,连接被释放
latch.await();
});
}
}
// 发布者
public class Publisher {
public static void main(String[] args) throws Exception {
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
channel.basicPublish("", Constants.QUEUE_BASIC, null, "hello world".getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功");
});
}
}
-
Work queues
相比
简单模式
,同一队列多了一个或多个消费者。- 一条消息只会被一个消费端接收
- 队列采用轮询的方式将消息是平均发送给消费者的
- 消费者在处理完某条消息后,才会收到下一条消息
- 应用场景:减缓压力,扩容
点击查看代码
// 消费者
public class Consumer {
static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
Runnable runnable = () -> {
CountDownLatch latch = new CountDownLatch(1);
try {
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.QUEUE_WORK_QUEUE, false, false, false, null);
/*
* 该方法主要起到限流的作用,防止任务无限制积压在客户端,确保客户端最多处理N个任务。
* 个人理解,此前提是消息的异步处理及异步提交,任务被积压在线程池的队列中
*/
//channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_WORK_QUEUE, false, buildConsumer(channel));
latch.await();
});
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(runnable).start();
new Thread(runnable).start();
new Thread(runnable).start();
System.out.println("启动完毕!");
}
static DefaultConsumer buildConsumer(Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag + " : " + Thread.currentThread().getName() + " : " + new String(body, StandardCharsets.UTF_8));
EXECUTOR.execute(new AsyncBasicAck(getChannel(), envelope, consumerTag + " : " + Thread.currentThread().getName()));
}
};
}
static class AsyncBasicAck implements Runnable {
private final Channel channel;
private final Envelope envelope;
private final String name;
AsyncBasicAck(Channel channel, Envelope envelope, String name) {
this.channel = channel;
this.envelope = envelope;
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(name + " : 确认成功!");
try {
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// 发布者
public class Publisher {
public static void main(String[] args) throws Exception {
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
for (int i = 0; i < 10; i++) {
channel.basicPublish("", Constants.QUEUE_WORK_QUEUE, null, JsonUtils.toJsonString(new Msg( i + 1)).getBytes(StandardCharsets.UTF_8));
}
System.out.println("发送成功!");
});
}
}
-
Publish / Subscribe
相比
Work queues模式
,同一交换机绑定队列多了一个或多个队列。- 每个消费者监听自己的队列
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
- 应用场景:同一消息,多次消费
点击查看代码
// 消费者
public class Consumer {
static class First {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
// 关键点:声明一个扇出模式的交换机,会将消息广播到所有绑定到该交换机的队列
channel.exchangeDeclare(Constants.EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);
channel.queueDeclare(Constants.QUEUE_TOPIC_FIRST, false, false, false, null);
channel.queueBind(Constants.QUEUE_TOPIC_FIRST, Constants.EXCHANGE_FANOUT, "");
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_TOPIC_FIRST, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static class Second {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);
channel.queueDeclare(Constants.QUEUE_TOPIC_SECOND, false, false, false, null);
channel.queueBind(Constants.QUEUE_TOPIC_SECOND, Constants.EXCHANGE_FANOUT, "");
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_TOPIC_SECOND, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static DefaultConsumer buildConsumer(Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag + " : " + Thread.currentThread().getName() + " : " + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
}
}
// 发布者
public class Publisher {
public static void main(String[] args) throws Exception {
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
for (int i = 0;;i++) {
TimeUnit.SECONDS.sleep(2);
channel.basicPublish(Constants.EXCHANGE_FANOUT, "", null, JsonUtils.toJsonString(new Msg(i + 1)).getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功:" + (i + 1));
}
});
}
}
-
Routing
相比
Publish / Subscribe模式
,同一交换机绑定队列增加了不同的路由规则。
点击查看代码
// 消费者
public class Consumer {
public enum Log {
INFO,WARN,ERROR
}
static class First {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
// 关键点1:声明一个直连模式的交换机
channel.exchangeDeclare(Constants.EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Constants.QUEUE_LOG_ALL, false, false, false, null);
// 关键点2:队列绑定一个或多个路由键
for (Log log : Log.values()) {
channel.queueBind(Constants.QUEUE_LOG_ALL, Constants.EXCHANGE_DIRECT, log.name());
}
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_LOG_ALL, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static class Second {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Constants.QUEUE_LOG_ERROR, false, false, false, null);
channel.queueBind(Constants.QUEUE_LOG_ERROR, Constants.EXCHANGE_DIRECT, Log.ERROR.name());
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_LOG_ERROR, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static DefaultConsumer buildConsumer(Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag + " : " + Thread.currentThread().getName() + " : " + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
}
}
// 发布者
public class Publisher {
public static void main(String[] args) throws Exception {
Consumer.Log[] logs = Consumer.Log.values();
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
for (int i = 0;;i++) {
TimeUnit.SECONDS.sleep(2);
String routerKey = logs[i % logs.length].name();
String allMsg = JsonUtils.toJsonString(new Msg(i + 1, routerKey));
channel.basicPublish(Constants.EXCHANGE_DIRECT, routerKey, null, allMsg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功:" + allMsg);
}
});
}
}
-
Topics
相比Routing 模式
,Topics 模式
的路由键可以通过字符匹配的方式转发队列。
点击查看代码
// 消费者
public class Consumer {
static class First {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
// 关键点1:声明一个主题模式的交换机
channel.exchangeDeclare(Constants.EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC);
channel.queueDeclare(Constants.QUEUE_NEWS_CITY_BJ, false, false, false, null);
// 关键点2:配置路由键规则
// 1. 对于发布者:它必须是一个单词列表,由点分隔,路由键中可以有任意多个单词,最多255个字节。例如:aa.bb.cc.dd 等等。
// 2. 对于消费者:除了1中规则,路由键可以动态匹配,*(星号)可以正好代替一个词,# (hash) 可以代替零个或多个单词。例如:*.*.cc.# 等等。
channel.queueBind(Constants.QUEUE_NEWS_CITY_BJ, Constants.EXCHANGE_TOPIC, "*.20201001.#");
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_NEWS_CITY_BJ, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static class Second {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC);
channel.queueDeclare(Constants.QUEUE_NEWS_DATE_GQ, false, false, false, null);
channel.queueBind(Constants.QUEUE_NEWS_DATE_GQ, Constants.EXCHANGE_TOPIC, "beijing.*.#");
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_NEWS_DATE_GQ, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static DefaultConsumer buildConsumer(Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag + " : " + Thread.currentThread().getName() + " : " + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
}
}
// 发布者
public class Publisher {
static final String[] CITIES = {"beijing","shanghai","shenzhen","guangzhou"};
static final String[] DATES = {"20201001", "20201024", "20201212"};
public static void main(String[] args) throws Exception {
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
for (int i = 0;;i++) {
TimeUnit.SECONDS.sleep(2);
String routerKeyCity = CITIES[i % CITIES.length];
String routerKeyDate = DATES[i % DATES.length];
String routerKey = routerKeyCity + "." + routerKeyDate;
String allMsg = JsonUtils.toJsonString(new Msg(i + 1, routerKey));
channel.basicPublish(Constants.EXCHANGE_TOPIC, routerKey, null, allMsg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功:" + allMsg);
}
});
}
}
-
RPC
-
特殊的交换机模式:headers
首部交换机和扇形交换机都不需要路由键RoutingKey,交换机时通过Headers头部来将消息映射到队列的,有点像HTTP的Headers,Hash结构中要求携带一个键
x-match
,这个键的Value可以是any
或者all
,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)而是Object类型。- any: 只要在发布消息时携带的有一对键值对headers满足队列定义的多个参数arguments的其中一个就能匹配上,注意这里是键值对的完全匹配,只匹配到键了,值却不一样是不行的;
- all:在发布消息时携带的所有Entry必须和绑定在队列上的所有Entry完全匹配;
点击查看代码
// 消费者
public class Consumer {
static final Map HEADERS = Maps.newHashMapWithExpectedSize(5);
static {
HEADERS.put("first", "hello");
HEADERS.put("second", "world");
}
static class First {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
// 关键点1:声明一个首部模式的交换机
channel.exchangeDeclare(Constants.EXCHANGE_HEADERS, BuiltinExchangeType.HEADERS);
channel.queueDeclare(Constants.QUEUE_HEADERS_ALL, false, false, false, null);
// 关键点2:队列绑定时设置首部参数
HEADERS.put("x-match", "all");
channel.queueBind(Constants.QUEUE_HEADERS_ALL, Constants.EXCHANGE_HEADERS, "", HEADERS);
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_HEADERS_ALL, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static class Second {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.EXCHANGE_HEADERS, BuiltinExchangeType.HEADERS);
channel.queueDeclare(Constants.QUEUE_HEADERS_ANY, false, false, false, null);
HEADERS.put("x-match", "any");
channel.queueBind(Constants.QUEUE_HEADERS_ANY, Constants.EXCHANGE_HEADERS, "", HEADERS);
channel.basicQos(1);
channel.basicConsume(Constants.QUEUE_HEADERS_ANY, false, buildConsumer(channel));
System.out.println("启动完毕!");
latch.await();
});
}
}
static DefaultConsumer buildConsumer(Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag + " : " + Thread.currentThread().getName() + " : " + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
}
}
// 发布者
public class Publisher {
public static void main(String[] args) throws Exception {
Map all = Maps.newHashMapWithExpectedSize(5);
all.put("first", "hello");
all.put("second", "world");
Map any = Maps.newHashMapWithExpectedSize(5);
any.put("first", "hello");
any.put("second", "you");
Map[] maps = new Map[] {all, any};
RabbitmqUtils.doWithConnection(connection -> {
Channel channel = connection.createChannel();
for (int i = 0;;i++) {
TimeUnit.SECONDS.sleep(2);
Map headers = maps[i % maps.length];
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
String allMsg = JsonUtils.toJsonString(new Msg(i + 1, JsonUtils.toJsonString(headers)));
channel.basicPublish(Constants.EXCHANGE_HEADERS, "", props, allMsg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功:" + allMsg);
}
});
}
}