RocketMQ 学习笔记
为什么选择 RocketMQ ?
比较一下其他的消息中间件:
最关键的是,RocketMQ
是 Java
写的。。。易于学习
-
先要用起来,然后看文档,读源码
-
官方文档 链接: 官方文档当然也是要看的
-
Github 链接: 看源码肯定是必须的就不多说什么了
安装 RocketMQ
-
下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.7.0
-
配置环境变量
-
启动 NAMESERVER
> mqnamesrv
- 启动 BROCKER
> mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
- 安装可视化插件(可有可无的)
下载
Git地址:https://github.com/apache/rocketmq-externals
gitee地址:https://gitee.com/mirrors/RocketMQ-Externals
配置application.properties
端口
> server.port=9876
Maven
编译运行 jar
进文件夹下
> mvn clean package -Dmaven.test.skip=true
> java -jar *.jar
引入 pom 依赖
org.apache.rocketmq
rocketmq-spring-boot-starter
2.1.1
MQService
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RocketMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
* @param topic
* @param body
* @throws Exception
*/
public void sendMessage(String topic,String body) throws Exception{
Message message = new Message(topic,body.getBytes());
rocketMQTemplate.getProducer().send(message);
}
/**
* 发送延时消息
* @param topic
* @param body
* @param delayTimeLevel
* @throws Exception
*/
public void sendDelayMessage(String topic, String body, int delayTimeLevel) throws Exception {
Message message = new Message(topic, body.getBytes());
message.setDelayTimeLevel(delayTimeLevel);
rocketMQTemplate.getProducer().send(message);
}
}
测试监听
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Component
@RocketMQMessageListener(topic = "test-mq", consumerGroup = "conmuserGrop-mq")
public class ConsumerListener implements RocketMQListener {
@Override
public void onMessage(MessageExt messageExt) {
try {
String body = new String(messageExt.getBody(), "UTF-8");
System.out.println("receive message:" + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
测试类
import com.jiuzhang.seckill.mq.RocketMQService;
import com.jiuzhang.seckill.service.SeckillActivityService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@SpringBootTest
public class MQTest {
@Autowired
RocketMQService rocketMQService;
@Autowired
SeckillActivityService seckillActivityService;
@Test
public void sendMQTest() throws Exception {
rocketMQService.sendMessage("test-mq", "Hello RocketMQ!" + new Date().toString());
}
}