RocketMQ环境搭建与简单应用


安装

RocketMQ

RocketMQ 使用Java语言开发,因此需要JDK运行环境和Maven编译构建环境。

1、安装JDK

不赘述

2、安装Maven
# 下载3.5.4版本maven安装包
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz

# 解压安装包
tar -zxvf apache-maven-3.5.4-bin.tar.gz

# 重命名目录
mv apache-maven-3.5.4 maven

# 进入安装目录
cd maven/

# 查看安装全路径
pwd

# 安装全路径
/home/bingo/soft/maven

------------------------------------------------------------------------------------
# 切换至root账户
su

# 打开环境变量配置文件
vim /etc/profile

# 在相应位置写入maven环境变量配置
?```
# Maven
export MAVEN_HOME=/home/bingo/soft/maven
export PATH=$MAVEN_HOME/bin:$PATH
?```

# 重载配置文件使环境变量配置生效
source /etc/profile

# 查看maven版本(若普通账户显示无mvn命令,再次 source /etc/profile 即可)
mvn -version

?```
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /home/bingo/soft/maven
Java version: 1.8.0_171, vendor: Oracle Corporation, runtime: /home/bingo/soft/java/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "4.18.0-240.15.1.el8_3.x86_64", arch: "amd64", family: "unix"
?```
3、安装RocketMQ
# 下载RocketMQ源码

wget https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-4.9.2.tar.gz

# 解压压缩包
tar -zxvf rocketmq-all-4.9.2.tar.gz

# 重命名
mv rocketmq-rocketmq-all-4.9.2/ rocketmq-all

# 进入源码目录
cd rocketmq-all/

# 执行mvn命令编译RocketMQ源码
mvn -Prelease-all -DskipTests clean install -U

# 出现以下内容表示编译成功

?```
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] Apache RocketMQ 4.9.2 4.9.2 ........................ SUCCESS [03:17 min]
[INFO] rocketmq-logging 4.9.2 ............................. SUCCESS [ 18.488 s]
[INFO] rocketmq-remoting 4.9.2 ............................ SUCCESS [  4.688 s]
[INFO] rocketmq-common 4.9.2 .............................. SUCCESS [  7.096 s]
[INFO] rocketmq-client 4.9.2 .............................. SUCCESS [  9.334 s]
[INFO] rocketmq-store 4.9.2 ............................... SUCCESS [  4.628 s]
[INFO] rocketmq-srvutil 4.9.2 ............................. SUCCESS [  0.320 s]
[INFO] rocketmq-filter 4.9.2 .............................. SUCCESS [  2.143 s]
[INFO] rocketmq-acl 4.9.2 ................................. SUCCESS [  2.286 s]
[INFO] rocketmq-broker 4.9.2 .............................. SUCCESS [  3.817 s]
[INFO] rocketmq-tools 4.9.2 ............................... SUCCESS [  2.196 s]
[INFO] rocketmq-namesrv 4.9.2 ............................. SUCCESS [  0.773 s]
[INFO] rocketmq-logappender 4.9.2 ......................... SUCCESS [  1.330 s]
[INFO] rocketmq-test 4.9.2 ................................ SUCCESS [  4.238 s]
[INFO] rocketmq-openmessaging 4.9.2 ....................... SUCCESS [  1.898 s]
[INFO] rocketmq-example 4.9.2 ............................. SUCCESS [ 10.156 s]
[INFO] rocketmq-distribution 4.9.2 4.9.2 .................. SUCCESS [01:40 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 06:13 min
[INFO] Finished at: 2021-11-15T14:09:47+08:00
[INFO] ------------------------------------------------------------------------
?```

# 进入目录
cd distribution/

# 启动RocketMQ NameServer
nohup sh bin/mqnamesrv &

# 报错显示找不到 NameServer 服务启动主类,是因为我们没有进入真正的构建完成目录
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
错误: 找不到或无法加载主类 org.apache.rocketmq.namesrv.NamesrvStartup

# 进入真正的构建完成目录(全路径: /home/bingo/soft/mq/rocketmq/rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2)
cd rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2

# 启动RocketMQ NameServer
nohup sh bin/mqnamesrv &

# 启动成功
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

# 启动RocketMQ broker服务
nohup sh bin/mqbroker -n localhost:9876 &

# 报错显示服务器内存不足,因为NameServer和Broker启动文件中都分配了4G内存,通过调整内存分配重新启动
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/bingo/soft/mq/rocketmq/rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2/hs_err_pid1046990.log

# 先停止NameServer服务
sh bin/mqshutdown namesrv

# 停止broker命令
sh bin/mqshutdown broker

# 修改NameServer服务启动脚本将4g调整为1g或512m
vim runserver.sh

# 修改Broker服务启动脚本将4g调整为1g或512m
vim runbroker.sh

# 新建日志目录(为了方便查询日志)
mkdir logs

# 再次启动NameServer服务
nohup ./bin/mqnamesrv > logs/namesrv.log 2>&1 &

# 启动成功
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

# 再次启动broker服务
nohup ./bin/mqbroker -n localhost:9876 > logs/broker.log 2>&1 &

# 启动成功,监听端口 9876
The broker[iZbp1c3u9bhuo790ircffmZ, 172.28.177.11:10911] boot success. serializeType=JSON and name server is localhost:9876

# 查看进程
jps

# broker & nameserver 服务进程
1111315 BrokerStartup
1090788 NamesrvStartup

# 查看监听端口
netstat -ntlp

# nameserver监听端口 9876
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name           
tcp6       0      0 :::9876                 :::*                    LISTEN      1090788/java 
4、测试用例

执行生产者测试用例

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

报错信息如下:

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
        at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
        at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:709)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:579)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
        at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
        at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
        at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1367)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1357)
        at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622)
        ... 7 more
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
        at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
        at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:709)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:579)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
        at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
        at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
        at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)

这是因为我们未添加NameServer环境变量

export NAMESRV_ADDR=localhost:9876

再次执行生产者测试用例

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

生产消息如下:

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361840000, offsetMsgId=AC1CB10B00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361C10001, offsetMsgId=AC1CB10B00002A9F00000000000000BE, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361C60002, offsetMsgId=AC1CB10B00002A9F000000000000017C, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361D00003, offsetMsgId=AC1CB10B00002A9F000000000000023A, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361D30004, offsetMsgId=AC1CB10B00002A9F00000000000002F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=1], queueOffset=1]

执行消费者测试用例

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

消费消息如下:

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1636987018689, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018691, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000000BE, commitLogOffset=190, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042291, UNIQ_KEY=7F000001CE4F7D4991AD4CF361C10001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1636987018710, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018711, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000003B6, commitLogOffset=950, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042293, UNIQ_KEY=7F000001CE4F7D4991AD4CF361D60005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1636987018719, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018722, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000006AE, commitLogOffset=1710, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042293, UNIQ_KEY=7F000001CE4F7D4991AD4CF361DF0009, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]] 
5、控制台

从网上找了很多博文均说从官方github扩展包直接下载-编译即可运行。但实践发现不管是externals包还是dashboard包都无法编译成功。

在将externals包切换到origin/release-rocketmq-console-1.0.0分支,发现rocketmq-console项目有很多过时的方法。

最后通过下载网友们编译好的包直接运行才将控制台搭建完成,搭建过程中也踩了很多坑,一一记录。

编译完成的包已上传网盘以做备份:链接: https://pan.baidu.com/s/1aQUzvjhdTNTEMVDpF3rZWg 密码: fe2t

编写启动命令启动控制台程序

nohup java -Xms512m -Xmx512m -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='127.0.0.1:9876' > ./logs/console.log 2>&1 &

### 如果不通过 --rocketmq.config.namesrvAddr='127.0.0.1:9876' 外部参数指定NameServer,程序启动成功后不可用。

打开页面,控制台如下

http://localhost:8080/

6、程序

利用SpringBoot2.x下的rocketmq-spring-boot-starter来整合RocketMQ进行消息的生产与消费。

场景为:用户点赞视频,生产者将数据(用户编号+视频编号)通过MQ发往消费者,达到削峰解耦的目的。

生产者

pom.xml依赖


		
			org.springframework.boot
			spring-boot-starter-web
		

		
			org.apache.rocketmq
			rocketmq-spring-boot-starter
			2.2.1
		

		
			org.projectlombok
			lombok
			true
		

    
		
			com.bingo
			common
			0.0.1-SNAPSHOT
		
	

application.yaml 配置文件

server:
  port: 10000

spring:
  application:
    name: producer
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer-group-praise

生产者程序

package com.bingo.producer.controller;

import com.bingo.common.constants.RocketMQConstants;
import com.bingo.common.request.PraiseRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @program: PraiseController
 * @description: 点赞
 * @author: Bingo
 * @create: 2021-11-16 12:16
 **/
@Slf4j
@RestController
public class PraiseController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 点赞
     *
     * RocketMQ的消息发送方式主要含syncSend()同步发送、asyncSend()异步发送、sendOneWay()三种方式,
     * sendOneWay()也是异步发送,区别在于不需等待Broker返回确认,所以可能会存在信息丢失的状况,但吞吐量更高,具体需根据业务情况选用。
     * 性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默认是同步(syncSend)的
     * @param request
     * @return
     */
    @PostMapping(value = "praise")
    public ResponseEntity praise(@RequestBody PraiseRequest request) {
        log.info("点赞请求参数: {}", request);
        rocketMQTemplate.sendOneWay(RocketMQConstants.PRAISE_TOPIC, MessageBuilder.withPayload(request).build());
        return ResponseEntity.ok(Boolean.TRUE);
    }
}

消费者

pom.xml


		
			org.springframework.boot
			spring-boot-starter-web
		

		
			org.apache.rocketmq
			rocketmq-spring-boot-starter
			2.2.1
		

		
			org.projectlombok
			lombok
			true
		

    
		
			com.bingo
			common
			0.0.1-SNAPSHOT
		
	

application.yaml 配置文件

server:
  port: 10003

spring:
  application:
    name: consumer3
rocketmq:
  name-server: 127.0.0.1:9876

消费者程序

package com.bingo.consumer.listener;

import com.bingo.common.constants.RocketMQConstants;
import com.bingo.common.request.PraiseRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @program: PraiseListener
 * @description: 点赞消费者监听
 * @author: Bingo
 * @create: 2021-11-16 12:29
 **/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstants.PRAISE_TOPIC, consumerGroup = RocketMQConstants.PRAISE_CONSUMER_GROUP)
public class PraiseListener implements RocketMQListener {

    @Override
    public void onMessage(PraiseRequest request) {
        log.info("消费者1消费点赞数据: {}", request);
    }
}
程序测试

1、启动生产者程序,多次调用点赞接口

[exec-1] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=123, userId=456)
[exec-2] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=1234, userId=456)
[exec-3] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=12345, userId=456)
[exec-4] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=123456, userId=456)
[exec-5] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=1234567, userId=456)
[exec-6] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=12345678, userId=456)

!!! 调用点赞接口发送生产消息时,无法连接服务器,报错如下:

2021-11-16 14:46:03.659 ERROR 4722 --- [io-10000-exec-1] o.a.r.spring.core.RocketMQTemplate       : sendOneWay failed. destination:topic-praise, message:GenericMessage [payload=PraiseRequest(videoId=1234567891234567, userId=456), headers={id=eabf0a33-d4ca-0b03-8773-d3642c1570d0, timestamp=1637045160524}] 
2021-11-16 14:46:03.676 ERROR 4722 --- [io-10000-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: Send [1] times, still failed, cost [3085]ms, Topic: topic-praise, BrokersSent: [iZbp1c3u9bhuo790ircffmZ]
See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [3085]ms, Topic: topic-praise, BrokersSent: [iZbp1c3u9bhuo790ircffmZ]
See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 172.28.177.11:10911 failed
	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeOneway(NettyRemotingClient.java:554) ~[rocketmq-remoting-4.9.1.jar:4.9.1]
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:471) ~[rocketmq-client-4.9.1.jar:4.9.1]
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:431) ~[rocketmq-client-4.9.1.jar:4.9.1]
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:877) ~[rocketmq-client-4.9.1.jar:4.9.1]
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:607) ~[rocketmq-client-4.9.1.jar:4.9.1]
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendOneway(DefaultMQProducerImpl.java:1028) ~[rocketmq-client-4.9.1.jar:4.9.1]
	at org.apache.rocketmq.client.producer.DefaultMQProducer.sendOneway(DefaultMQProducer.java:403) ~[rocketmq-client-4.9.1.jar:4.9.1]
	at org.apache.rocketmq.spring.core.RocketMQTemplate.sendOneWay(RocketMQTemplate.java:911) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
	at com.bingo.producer.controller.PraiseController.praise(PraiseController.java:40) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_301]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_301]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_301]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_301]
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.12.jar:5.3.12]
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:681) ~[tomcat-embed-core-9.0.54.jar:4.0.FR]
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.12.jar:5.3.12]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.54.jar:4.0.FR]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.12.jar:5.3.12]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.12.jar:5.3.12]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.12.jar:5.3.12]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1722) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.54.jar:9.0.54]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_301]

翻了一圈博客发现需要修改RocketMQ conf/broker.conf文件并在启动broker的时候指定此配置文件
原文链接: https://blog.csdn.net/u014786083/article/details/118714139

修改如下:

# 新增这个配置,IP为 NameServer 地址
brokerIP1=8.136.224.118
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

重新启动broker

nohup ./bin/mqbroker -n localhost:9876 -c conf/broker.conf > logs/broker.log 2>&1 &

2、启动消费者程序,消息无序消费

[MessageThread_5] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=123456, userId=456)
[MessageThread_3] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=1234, userId=456)
[MessageThread_1] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=12345, userId=456)
[MessageThread_6] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=12345678, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=123, userId=456)
[MessageThread_4] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=1234567, userId=456)

3、修改IDEA配置启动多个消费者(修改方式:Edit Configurations... -> Single instance only 前面的勾去掉 -> 修改application.yaml配置文件端口号 即可将同一个程序启动多个端口)

4、多次调用点赞接口

[exec-8] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=123456789, userId=456)
[exec-9] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=1234567891, userId=456)
[exec-10] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=12345678912, userId=456)
[exec-1] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=123456789123, userId=456)
[exec-2] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=1234567891234, userId=456)
[exec-3] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=12345678912345, userId=456)
[exec-4] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=123456789123456, userId=456)
[exec-5] c.b.p.controller.PraiseController        : 点赞请求参数: PraiseRequest(videoId=1234567891234567, userId=456)

5、消费者10001消费消息(10001、10002、10003指的是端口)

[MessageThread_7] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=123456789123, userId=456)
[MessageThread_8] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=1234567891234, userId=456)
[MessageThread_9] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=12345678912345, userId=456)
[MessageThread_10] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=1234567891234567, userId=456)

6、消费者10002消费消息

[MessageThread_1] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=1234567891, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=123456789123456, userId=456)

7、消费者10003消费消息

[MessageThread_1] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=123456789, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener     : 消费者1消费点赞数据: PraiseRequest(videoId=12345678912, userId=456)

本章所涉及代码已上传giteespringboot-rocketmq-demo

至此 RocketMQ 的单Master环境搭建和简单应用已经完成,后续将继续研究它的其他特性,例如:集群环境搭建、消息重试、消息重复处理、顺序消息、定时消息、批量发送消息、事务消息、回溯消息以及多种消息过滤方式等等。