docker 创建 rocketMQ 记录 +springboot集成mq


环境搭建参考

一、查看镜像的版本

docker search rocketmq   

NAME                               DESCRIPTION                                     STARS               OFFICIAL            AUTOMATED
rocketmqinc/rocketmq               Image repository for Apache RocketMQ            35                                      
styletang/rocketmq-console-ng      rocketmq-console-ng                             27                                      
foxiswho/rocketmq                  rocketmq                                        23                                  

二、检索具体版本

这次选了foxiswho/rocketmq,以下是一个查看当前镜像所有的版本shell命令:

curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags | tr -d '[\[\]" ]' | tr '}' '\n' | awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF && $3 != ""){printf("%s:%s\n",image,$3)}}'

结果:

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   934    0   934    0     0    310      0 --:--:--  0:00:03 --:--:--   310
foxiswho/rocketmq:4.7.0
foxiswho/rocketmq:base
foxiswho/rocketmq:base-4.3.2
foxiswho/rocketmq:base-4.4.0
foxiswho/rocketmq:base-4.5.0
foxiswho/rocketmq:base-4.5.1
foxiswho/rocketmq:base-4.5.2
foxiswho/rocketmq:base-4.6.1
foxiswho/rocketmq:broker
foxiswho/rocketmq:broker-4.3.2
foxiswho/rocketmq:broker-4.4.0
foxiswho/rocketmq:broker-4.5.0
foxiswho/rocketmq:broker-4.5.1
foxiswho/rocketmq:broker-4.5.2
foxiswho/rocketmq:broker-4.6.1
foxiswho/rocketmq:broker-4.7.0
foxiswho/rocketmq:server
foxiswho/rocketmq:server-4.3.2
foxiswho/rocketmq:server-4.4.0
foxiswho/rocketmq:server-4.5.0
foxiswho/rocketmq:server-4.5.1
foxiswho/rocketmq:server-4.5.2
foxiswho/rocketmq:server-4.6.1
foxiswho/rocketmq:server-4.7.0

三、启动NameServer

docker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:server-4.5.1

四、启动broker

docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqserver:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m" foxiswho/rocketmq:broker-4.5.1

五、修改NameServer容器 和 broker 容器 的配置参数   ----这步很重要!

NameServer容器 和 broker 容器 默认的配置文件的路径为:

/etc/rocketmq/broker.conf

broker.conf的配置内容如下:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

在最后一行后面新增

brokerIP1 = 192.168.3.27

ip改成你的宿主机的ip

六、安装 rocketmq console

如果一切正常,NameServer和Broker一会儿就会安装好,为了管理上的方便,rocketmq console也是必不可少的工具了,通过上面查询的方式找到需要启动的版本,启动方式如下:

docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -t styletang/rocketmq-console-ng

七、重启容器

重启  NameServer 容器  和   broker容器

然后通过如下命令检查一下启动情况:

docker ps|grep rocketmq

614a3d0ed22b        styletang/rocketmq-console-ng        "sh -c 'java $JAVA_O…"   2 hours ago         Up 2 hours          0.0.0.0:8180->8080/tcp                                                                                                                                       rmqconsole
a76891fbfd35        foxiswho/rocketmq:broker-4.5.1       "/bin/sh -c 'cd ${RO…"   2 hours ago         Up 16 minutes       0.0.0.0:10909->10909/tcp, 0.0.0.0:10911->10911/tcp                                                                                                           rmqbroker
01d29f8f68c8        foxiswho/rocketmq:server-4.5.1       "/bin/sh -c 'cd ${RO…"   2 hours ago         Up 2 hours          0.0.0.0:9876->9876/tcp 

最后访问  http://localhost:8180/ 进入 rocketmq console 控制面板

环境搭建参考: https://www.cnblogs.com/gmhappy/p/13457026.html

监控页面讲解: https://blog.csdn.net/so_geili/article/details/90142461

springboot集成 rocketmq   

maven依赖


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.2.0

pom配置文件

rocketmq:
  name-server: 192.168.1.101:9876     //mq的server地址
  producer:
    group: lucax                      
    send-message-timeout: 10000       //发送mq超时时间

若rocketmq是在阿里云平台买的,使用以下配置即可

rocketmq:
  name-server: http://xxxxx.mq-internet-access.mq-internet.aliyuncs.com:80  #阿里云rocketmq连接地址
  producer:
    group: rocketmq_test #自定义的组名称
    access-key: your aliyun accessKey
    secret-key: your aliyun secretKey
    send-message-timeout: 3000  #消息发送超时时长

发送mq方法代码

package com.control.test;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class SedMQ {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @GetMapping("/sendmq")
    public String aaa(){
//        发送普通消息没有返回的
        rocketMQTemplate.convertAndSend("test-topic-1:tag", "深圳:123");

        /**
         * 发送带tag的消息,直接在topic后面加上":tag"
         */
        Object aa=rocketMQTemplate.syncSend("test-topic-1:tag", MessageBuilder.withPayload("什么东东").build());
        //拿到messageid 和 发送的状态
        System.out.println(aa);
        System.out.println("发送带tag");

        /**
         * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时,单位秒)
         */
        Object bb=rocketMQTemplate.syncSend("test-topic-1:tag", MessageBuilder.withPayload("延迟mq").build(),3, 30);
        //拿到messageid 和 发送的状态
        System.out.println(bb);
        System.out.println("发送延迟消息");


        return "123";

    }
}

接口mq方代码 ---写了就会自动监听又没mq消息了(注意要先去rocketmq console 控制面板 把消费订阅组广播打开,不然也是接收不到mq消息的)

package com.control.test;


import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
//consumerGroup--订阅组 在mq系统上添加
@RocketMQMessageListener(
        topic="test-topic-1",
        consumerGroup="lucax",
        selectorExpression="tag"
)
class getMQ implements RocketMQListener {

    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到消息:{}", msg);
        //能拿到messaggeid
        System.out.println(message);
}

}

科普

RocketMQ支持集群消费和广播消费。

集群消费
默认是集群消费模式,对于同一个消费组里的消费者,会分摊消息,如A消费了B就不能消费。
广播消费
对于同一个消费组里的消费者,每个消费者都能收到每一份消息,AB能消费同一个消息,相当于广播。
记得将consumer的consumeBroadcastEnable设为true,不然导致无法消费集群消息。

参考:

springboot 调用: https://zhuanlan.zhihu.com/p/137477999

https://blog.csdn.net/javahongxi/article/details/86160085---这个比较详细

https://blog.csdn.net/qq_38306688/article/details/107716046

https://www.jianshu.com/p/985a40295e1b

其中发送tag的方法是 在topic后加冒号

rocketMQTemplate.convertAndSend("test-topic-1:这个是tag", "xxxxxhah");

见 https://blog.csdn.net/LYM0721/article/details/105702801

这个环境搭建貌似也不错 https://blog.csdn.net/qq_26154077/article/details/110847184