RabbitMQ消息队列的使用


RabbitMQ消息队列的使用

RabbitMQwin安装和配置

 RabbitMQ有成千上万的用户,是最受欢迎的开源消息代理之一。

T-MobileRuntastic,RabbitMQ在全球范围内的小型初创企业和大型企业中都得到使用。

RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。

RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。

RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具

安装RabbitMQ

RabbitMQ和Rrlang的版本是有要求的,如果不知道该安装那个版本直接全部安装最新版就好。

2020-09-03,RabbitMQ团队很高兴宣布RabbitMQ 3.8.9的发布。

https://www.rabbitmq.com/news.html#2020-09-28T08:00:00+00:00

这是一个维护版本,着重于错误修复和可用性改进。发行说明可以在变更日志中找到。

此版本放弃了对Erlang / OTP 21.3的支持。现在最低支持22.3版本

新版本的二进制版本和软件包可以在GitHubPackage CloudBintray找到

请参阅RabbitMQ安装指南以了解更多信息。

我们鼓励所有使用RabbitMQ早期版本的用户升级到此最新版本。

Erang安装配置

Erlang下载地址:https://www.erlang.org/downloads/23.0

 

 

以管理员的身份运行Erlang应用,然后一直next就行。

设置环境变量,新建ERLANG_HOME

 

 


然后就是在path里面添加%ERLANG_HOME%\bin; 一样的套路。

打开cmd命令框,输入erl

 

 

RabbitMQ安装配置

RabbitMQ下载地址:https://www.rabbitmq.com/download.html

 

 

安装过程与erlang的安装过程相同。

RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。

 

 

打开sbin目录,双击rabbitmq-server.bat

等几秒钟看到一个界面后,不用管它,访问http://localhost:15672

 

 

用户和密码都是guest

到此RabbitMQ已经安装完成。

七种工作模式

1.生产-消费者模式

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。

官方介绍:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

 

 

首先创建一个springboot工程,再导入maven依赖


            org.springframework.boot
            spring-boot-starter-amqp

在RabbitMQ Management页面中创建用户和虚拟主机

 

 

由于我们可能需要多次创建连接,为了减少代码量将它封装成一个工具类。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

    private static ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");
    }

    /**
     * 连接rabbtimq对象
     * @return
     */
     public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
         return null;
    }


    /**
     * 关闭连接
     * @param connection
     * @param channel
     */
    public static void closeConnection(Connection connection, Channel channel) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

弄了这么多来使用一下吧!

生产端的代码如下:

private static final String QUEUE_NAME = "hello";

String msg = "Hello Word!";
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 通道绑定队列
         * 声明队列,如果Rabbit中没有此队列将自动创建
         * param1:队列名称
         * param2:是否持久化
         * param3:队列是否独占此连接
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
         *
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**
         * 消息发布方法
         * param1:Exchange的名称,如果没有指定,则使用Default Exchange
         * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
         * param3:消息包含的属性
         * param4:消息体
         * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
         * 默认的交换机,routingKey等于队列名称
         */
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("utf-8"));
        System.out.println("发送成功");
        ConnectionUtil.closeConnection(connection, channel);

执行成功后我们来到管理页面,在Queues有一个消息队列hell显示有一个未被消费。

 

 

消费端的代码如下:

private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 通道绑定队列
         * 声明队列,如果Rabbit中没有此队列将自动创建
         * param1:队列名称
         * param2:是否持久化
         * param3:队列是否独占此连接
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
         *
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
             * 收到消息失败后是否需要重新发送)
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("得到消息" + new String(body));
            }
        });
        ConnectionUtil.closeConnection(connection, channel);
    }

运行成功控制台打印:

 

 

2.工作队列模式

工作队列(又名:任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。

相反,我们安排任务在以后完成。我们将任务封装 为消息并将其发送到队列。

在后台运行的工作进程将弹出任务并最终执行作业。

当您运行许多工作人员时,任务将在他们之间共享。

这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求窗口内处理复杂的任务。

消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?

C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)

 

 

生产端的代码如下:

@Test
    void testProviderMessages() throws Exception {
        String msg = "Hello Word!";
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 通道绑定队列
         * 声明队列,如果Rabbit中没有此队列将自动创建
         * param1:队列名称
         * param2:是否持久化
         * param3:队列是否独占此连接
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
         *
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**
         * 消息发布方法
         * param1:Exchange的名称,如果没有指定,则使用Default Exchange
         * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
         * param3:消息包含的属性
         * param4:消息体
         * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
         * 默认的交换机,routingKey等于队列名称
         */
        for (int i = 0; i < 10; i++) {
            String c = msg + i;
            channel.basicPublish("", QUEUE_NAME, null, c.getBytes("utf-8"));
        }
        System.out.println("发送成功");
        ConnectionUtil.closeConnection(connection, channel);
    }

消费端1,2的代码跟上面消费端代码一样,将两个消费端执行在运行生产端会发现两个消费端打印的内容都不一样

 

 

 

 

3.发布订阅模式

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。

实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

相反,生产者只能将消息发送到交换机

交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。

交易所必须确切知道如何处理收到的消息。

是否应将其附加到特定队列?

是否应该将其附加到许多队列中?

还是应该丢弃它。规则由交换类型定义 。

有几种交换类型可用:direct,topic,headers 和fanout。我们将集中讨论最后一个-fanout。

让我们创建该类型的交换,并将其称为扇出交换非常简单。

应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

Publish/subscribe 模式绑定两个消费端,因此需要有两个消费端,一个邮件消费端,一个短信消费端;

 

 

生产端:

1、声明队列,声明交换机

2、创建连接

3、创建通道

4、通道声明交换机

5、通道声明队列

6、通过通道使队列绑定到交换机

7、制定消息

8、发送消息

消费端:

1、声明队列,声明交换机

2、创建连接

3、创建通道

4、通道声明交换机

5、通道声明队列

6、通过通道使队列绑定到交换机

7、重写消息消费方法

8、执行消息方法

生产端的代码如下:

private static final String QUEUE_NAME = "hello1";

    // 生产消息
    @Test
    void testProviderMessages() throws Exception {
        String msg = "Hello Word!";
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 通道绑定交换机
         * 参数明细
         * 1交换机名称
         * 2交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.FANOUT);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueDeclare("QUEUE_NAME", false, false, false, null);
        /**
         * 交换机和队列绑定
         * 参数明细
         * 1队列名称
         * 2交换机名称
         * 3路由key
         */
        channel.queueBind("QUEUE_NAME", QUEUE_NAME, "");
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, "");

        channel.basicPublish(QUEUE_NAME, "", null, msg.getBytes("utf-8"));
        System.out.println("发送成功");
        ConnectionUtil.closeConnection(connection, channel);
    }

消费端1代码如下:

private static final String QUEUE_NAME = "hello1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.FANOUT);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("得到消息" + new String(body));
            }
        });
        ConnectionUtil.closeConnection(connection, channel);
    }

消费端2代码如下:

 private static final String QUEUE_NAME = "hello1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.FANOUT);
        channel.queueDeclare("QUEUE_NAME", false, false, false, null);
        channel.queueBind("QUEUE_NAME", QUEUE_NAME, "");
        channel.basicConsume("QUEUE_NAME", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("得到消息" + new String(body));
            }
        });
        ConnectionUtil.closeConnection(connection, channel);
    }

4.Routing 路由模式

在此设置中,我们可以看到绑定了两个队列的直接交换X。

第一个队列由绑定键orange绑定,第二个队列有两个绑定,一个绑定键为black,另一个绑定为green。

在这样的设置中,通过路由键橙色发布到交换机的消息 将被路由到队列Q1。路由键为黑色 或绿色的消息将转到Q2。所有其他消息将被丢弃。

 

 

路由模式:

1、每个消费者监听自己的队列,并且设置routingkey;
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;

应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

生产端:

1、声明队列,声明交换机

2、创建连接

3、创建通道

4、通道声明交换机

5、通道声明队列

6、通过通道使队列绑定到交换机并指定该队列的routingkey

7、制定消息

8、发送消息并指定routingkey

消费端:

1、声明队列,声明交换机

2、创建连接

3、创建通道

4、通道声明交换机

5、通道声明队列

6、通过通道使队列绑定到交换机并指定routingkey

7、重写消息消费方法

8、执行消息方法

按照假设的应用场景,同样,Routing 路由模式也是一个生产端,两个消费端,所不同的是,声明交换机的类型不同,队列绑定交换机的时候需要指定Routing key,发送消息的时候也需要指定Routing key,这样根据Routing key就能把相应的消息发送到相应的队列中去。

生产端的代码如下:

private static final String QUEUE_NAME = "hello2";

    // 生产消息
    @Test
    void testProviderMessages() throws Exception {
        String msg = "Hello Word!";
        String msg2 = "Hello Word2!";
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueDeclare("QUEUE_NAME2", false, false, false, null);
        channel.queueBind("QUEUE_NAME2", QUEUE_NAME, "QUEUE_NAME2");
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, QUEUE_NAME);
        channel.basicPublish(QUEUE_NAME, "QUEUE_NAME2", null, msg.getBytes("utf-8"));
        channel.basicPublish(QUEUE_NAME, QUEUE_NAME, null, msg2.getBytes("utf-8"));
        System.out.println("发送成功");
        ConnectionUtil.closeConnection(connection, channel);
    }

消费端1代码如下:

private static final String QUEUE_NAME = "hello2";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, QUEUE_NAME);
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("得到消息" + new String(body));
            }
        });
        ConnectionUtil.closeConnection(connection, channel);
    }

消费端2代码如下:

private static final String QUEUE_NAME = "hello2";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, "QUEUE_NAME2");
        channel.basicConsume("QUEUE_NAME2", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("得到消息" + new String(body));
            }
        });
        ConnectionUtil.closeConnection(connection, channel);
    }

5.Topics 模式

发送到主题交换的消息不能具有任意的 routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。

一些有效的路由关键示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。

路由关键字中可以包含任意多个单词,最多255个字节。

绑定密钥也必须采用相同的形式。

主题交换背后的逻辑类似于直接交换的逻辑 -使用特定路由密钥发送的消息将被传递到所有与匹配的绑定密钥绑定的队列。

但是,绑定键有两个重要的特殊情况:

 *(星号)可以代替一个单词。

 #(哈希)可以替代零个或多个单词。

Topics 模式和Routing 路由模式最大的区别就是,Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的。

 

 

按照假设的应用场景,Topics 模式也是一个生产端,两个消费端,生产端队列绑定交换机的时候,需要指定的routingkey是通配符,发送消息的时候绑定的routingkey也是通配符,消费端队列绑定交换机的时候routingkey也是通配符,这样就能根据通配符匹配到消息了。

生产端的代码如下:

private static final String QUEUE_NAME = "hello5";

    // 生产消息
    @Test
    void testProviderMessages() throws Exception {
        String msg = "Hello Word!";
        String msg2 = "Hello Word2!";
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.TOPIC);

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueDeclare("QUEUE_NAME5", false, false, false, null);

        channel.queueBind("QUEUE_NAME5", QUEUE_NAME, "a.");
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, "b.");

        channel.basicPublish(QUEUE_NAME, "a.a", null, msg.getBytes("utf-8"));
        channel.basicPublish(QUEUE_NAME, "a.b", null, msg2.getBytes("utf-8"));
        channel.basicPublish("QUEUE_NAME5", "b.a", null, msg.getBytes("utf-8"));
        channel.basicPublish("QUEUE_NAME5", "b.b", null, msg2.getBytes("utf-8"));

        System.out.println("发送成功");
    }

消费端1代码如下:

private static final String QUEUE_NAME = "hello5";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.TOPIC);
        channel.queueDeclare("QUEUE_NAME5", false, false, false, null);
        channel.queueBind("QUEUE_NAME5", QUEUE_NAME, "b.#");
        channel.basicConsume("QUEUE_NAME5", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("得到消息" + new String(body));
            }
        });
        ConnectionUtil.closeConnection(connection, channel);
    }

消费端2代码如下:

private static final String QUEUE_NAME = "hello5";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(QUEUE_NAME, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, "a.#");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("得到消息" + new String(body));
            }
        });
        ConnectionUtil.closeConnection(connection, channel);
    }

6.Header 模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

根据假设使用场景,需要一个生产端,两个消费端,不同的是,生产端声明交换机时,交换机的类型不同,是headers类型,生产端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,发送消息时也是使用header中的 key/value(键值对)匹配队列。

消费端同样是声明交换机时,交换机的类型不同,是headers类型,消费端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,消费消息时也是使用header中的 key/value(键值对)匹配队列。

生产端的代码如下:

public class Producer05 {
    //声明两个队列和一个交换机
    //Header 模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
   
    public static void main(String[] args) {
            Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
            //通道绑定交换机
            /**
              * 参数明细
              * 1交换机名称
              * 2交换机类型,fanout、topic、direct、headers
              */
            //Header 模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
            //通道绑定队列
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:是否持久化
             * param3:队列是否独占此连接
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定短信队列
            //交换机和队列绑定
            /**
             * 参数明细
             * 1队列名称
             * 2交换机名称
             * 3路由key
             * 4
             * String queue, String exchange, String routingKey, Map arguments
             */
            Map headers_email = new Hashtable();
            headers_email.put("inform_type","email");
            Map headers_sms = new Hashtable();
            headers_sms.put("inform_type","sms");
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
            channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_sms);
            //给email队列发消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 发送email消息。。。");
                Map headers = new Hashtable();
                headers.put("inform_type","email");//匹配email通知消费者绑定的header
                /**
                  * 消息发布方法
                  * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                  * param3:消息包含的属性
                  * param4:消息体
                  * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                  * 默认的交换机,routingKey等于队列名称
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
                properties.headers(headers);
                //Email通知
                channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
                System.out.println("mq email 消息发送成功!");
            }
            //给sms队列发消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 发送sms消息。。。");
                Map headers = new Hashtable();
                headers.put("inform_type","sms");//匹配sms通知消费者绑定的header
                /**
                  * 消息发布方法
                  * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                  * param3:消息包含的属性
                  * param4:消息体
                  * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                  * 默认的交换机,routingKey等于队列名称
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
                properties.headers(headers);
                //sms通知
                channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
                System.out.println("mq sms 消息发送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

邮件消费端的代码如下:

public class Consumer05 {
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
           Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
            //通道绑定交换机
            /**
                          * 参数明细
                          * 1交换机名称
                          * 2交换机类型,fanout、topic、direct、headers
                          */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
            //通道绑定队列
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:是否持久化
             * param3:队列是否独占此连接
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
            //交换机和队列绑定
            /**
             * 参数明细
             * 1队列名称
             * 2交换机名称
             * 3路由key
             * 4
             * String queue, String exchange, String routingKey, Map arguments
             */
            Map headers_email = new Hashtable();
            headers_email.put("inform_email","email");
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消费者接收消息调用此方法
                  * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                  * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                    (收到消息失败后是否需要重新发送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消费者启动成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

短信消费端的代码如下:

public class Consumer05 {
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
           Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
            //通道绑定交换机
            /**
              * 参数明细
              * 1交换机名称
              * 2交换机类型,fanout、topic、direct、headers
              */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
            //通道绑定队列
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:是否持久化
             * param3:队列是否独占此连接
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定邮件队列
            //交换机和队列绑定
            /**
             * 参数明细
             * 1队列名称
             * 2交换机名称
             * 3路由key
             * 4
             * String queue, String exchange, String routingKey, Map arguments
             */
            Map headers_email = new Hashtable();
            headers_email.put("inform_email","sms");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_email);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消费者接收消息调用此方法
                  * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                  * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                    (收到消息失败后是否需要重新发送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消费者启动成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

7.RPC 模式

 RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3、服务端将RPC方法 的结果发送到RPC响应队列。

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

 

 

最后

 感觉您阅读了这篇文章,作者是一名新手如有问题请留言作者。

相关