docker 创建 rocketMQ 记录 +springboot集成mq
环境搭建参考
一、查看镜像的版本
docker search rocketmq
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
rocketmqinc/rocketmq Image repository for Apache RocketMQ 35
styletang/rocketmq-console-ng rocketmq-console-ng 27
foxiswho/rocketmq rocketmq 23
二、检索具体版本
这次选了foxiswho/rocketmq,以下是一个查看当前镜像所有的版本shell命令:
curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags | tr -d '[\[\]" ]' | tr '}' '\n' | awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF && $3 != ""){printf("%s:%s\n",image,$3)}}'
结果:
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 934 0 934 0 0 310 0 --:--:-- 0:00:03 --:--:-- 310
foxiswho/rocketmq:4.7.0
foxiswho/rocketmq:base
foxiswho/rocketmq:base-4.3.2
foxiswho/rocketmq:base-4.4.0
foxiswho/rocketmq:base-4.5.0
foxiswho/rocketmq:base-4.5.1
foxiswho/rocketmq:base-4.5.2
foxiswho/rocketmq:base-4.6.1
foxiswho/rocketmq:broker
foxiswho/rocketmq:broker-4.3.2
foxiswho/rocketmq:broker-4.4.0
foxiswho/rocketmq:broker-4.5.0
foxiswho/rocketmq:broker-4.5.1
foxiswho/rocketmq:broker-4.5.2
foxiswho/rocketmq:broker-4.6.1
foxiswho/rocketmq:broker-4.7.0
foxiswho/rocketmq:server
foxiswho/rocketmq:server-4.3.2
foxiswho/rocketmq:server-4.4.0
foxiswho/rocketmq:server-4.5.0
foxiswho/rocketmq:server-4.5.1
foxiswho/rocketmq:server-4.5.2
foxiswho/rocketmq:server-4.6.1
foxiswho/rocketmq:server-4.7.0
三、启动NameServer
docker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:server-4.5.1
四、启动broker
docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqserver:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m" foxiswho/rocketmq:broker-4.5.1
五、修改NameServer容器 和 broker 容器 的配置参数 ----这步很重要!
NameServer容器 和 broker 容器 默认的配置文件的路径为:
/etc/rocketmq/broker.conf
broker.conf的配置内容如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
在最后一行后面新增
brokerIP1 = 192.168.3.27
ip改成你的宿主机的ip
六、安装 rocketmq console
如果一切正常,NameServer和Broker一会儿就会安装好,为了管理上的方便,rocketmq console也是必不可少的工具了,通过上面查询的方式找到需要启动的版本,启动方式如下:
docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -t styletang/rocketmq-console-ng
七、重启容器
重启 NameServer 容器 和 broker容器
然后通过如下命令检查一下启动情况:
docker ps|grep rocketmq
614a3d0ed22b styletang/rocketmq-console-ng "sh -c 'java $JAVA_O…" 2 hours ago Up 2 hours 0.0.0.0:8180->8080/tcp rmqconsole
a76891fbfd35 foxiswho/rocketmq:broker-4.5.1 "/bin/sh -c 'cd ${RO…" 2 hours ago Up 16 minutes 0.0.0.0:10909->10909/tcp, 0.0.0.0:10911->10911/tcp rmqbroker
01d29f8f68c8 foxiswho/rocketmq:server-4.5.1 "/bin/sh -c 'cd ${RO…" 2 hours ago Up 2 hours 0.0.0.0:9876->9876/tcp
最后访问 http://localhost:8180/ 进入 rocketmq console 控制面板
环境搭建参考: https://www.cnblogs.com/gmhappy/p/13457026.html
监控页面讲解: https://blog.csdn.net/so_geili/article/details/90142461
springboot集成 rocketmq
maven依赖
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.0
pom配置文件
rocketmq:
name-server: 192.168.1.101:9876 //mq的server地址
producer:
group: lucax
send-message-timeout: 10000 //发送mq超时时间
若rocketmq是在阿里云平台买的,使用以下配置即可
rocketmq:
name-server: http://xxxxx.mq-internet-access.mq-internet.aliyuncs.com:80 #阿里云rocketmq连接地址
producer:
group: rocketmq_test #自定义的组名称
access-key: your aliyun accessKey
secret-key: your aliyun secretKey
send-message-timeout: 3000 #消息发送超时时长
发送mq方法代码
package com.control.test;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SedMQ {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendmq")
public String aaa(){
// 发送普通消息没有返回的
rocketMQTemplate.convertAndSend("test-topic-1:tag", "深圳:123");
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*/
Object aa=rocketMQTemplate.syncSend("test-topic-1:tag", MessageBuilder.withPayload("什么东东").build());
//拿到messageid 和 发送的状态
System.out.println(aa);
System.out.println("发送带tag");
/**
* 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时,单位秒)
*/
Object bb=rocketMQTemplate.syncSend("test-topic-1:tag", MessageBuilder.withPayload("延迟mq").build(),3, 30);
//拿到messageid 和 发送的状态
System.out.println(bb);
System.out.println("发送延迟消息");
return "123";
}
}
接口mq方代码 ---写了就会自动监听又没mq消息了(注意要先去rocketmq console 控制面板 把消费订阅组广播打开,不然也是接收不到mq消息的)
package com.control.test;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
//consumerGroup--订阅组 在mq系统上添加
@RocketMQMessageListener(
topic="test-topic-1",
consumerGroup="lucax",
selectorExpression="tag"
)
class getMQ implements RocketMQListener {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到消息:{}", msg);
//能拿到messaggeid
System.out.println(message);
}
}
科普
RocketMQ支持集群消费和广播消费。
集群消费
默认是集群消费模式,对于同一个消费组里的消费者,会分摊消息,如A消费了B就不能消费。
广播消费
对于同一个消费组里的消费者,每个消费者都能收到每一份消息,AB能消费同一个消息,相当于广播。
记得将consumer的consumeBroadcastEnable设为true,不然导致无法消费集群消息。
参考:
springboot 调用: https://zhuanlan.zhihu.com/p/137477999
https://blog.csdn.net/javahongxi/article/details/86160085---这个比较详细
https://blog.csdn.net/qq_38306688/article/details/107716046
https://www.jianshu.com/p/985a40295e1b
其中发送tag的方法是 在topic后加冒号
rocketMQTemplate.convertAndSend("test-topic-1:这个是tag", "xxxxxhah");
见 https://blog.csdn.net/LYM0721/article/details/105702801
这个环境搭建貌似也不错 https://blog.csdn.net/qq_26154077/article/details/110847184