19.docker 安装 RocketMq 并配置log4j2来收集日志
一.安装docker RocketMq
1.创建 nameSrv 数据挂载文件夹
mkdir -p data/rocketMQ/data/namesrv/logs mkdir -p data/rocketMQ/data/namesrv/store
2.查看RocketMq镜像来源
docker search rocketmq
3.找个star 最多的,我选择的是rocketmqinc/rocketmq
docker pull rocketmqinc/rocketmq
4.启动 namesrv 服务
docker run -d -p 9876:9876 -v /data/rocketMQ/data/namesrv/logs:/root/logs -v /data/rocketMQ/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" docker.io/rocketmqinc/rocketmq sh mqnamesrv
5.安装 broker
创建 broker 配置文件挂载文件夹
mkdir -p /data/rocketMQ/data/broker/logs mkdir -p /data/rocketMQ/data/broker/store mkdir -p /data/rocketMQ/data/broker/conf/
6.新增 broker 配置文件,在 /data/rocketMQ/data/broker/conf 目录下创建 broker.conf 文件,内容如下:
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
#自己的ip brokerIP1 = 192.168.88.50
7.运行 broker 容器
docker run -d -p 10911:10911 -p 10909:10909 -v /data/rocketMQ/data/broker/logs:/root/logs -v /data/rocketMQ/data/broker/store:/root/store -v /data/rocketMQ/data/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name rmqbroker --link rmqnamesrv:rmqnamesrv -e "NAMESRV_ADDR=192.168.88.50:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
8.安装控制台
docker search rocketmq-console
同样选择一个,我选择styletang/rocketmq-console-ng
9.获取控制台
docker pull styletang/rocketmq-console-ng
10.启动控制台镜像
#注意红色地址采用的是上面nameSrv 配置的
docker run -d -p 8080:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=192.168.88.50:9876 -Drocketmq.config.isVIPChannel=false" -t styletang/rocketmq-console-ng
浏览器登录
http://192.168.88.50:8080
二.配置springboot log4j2配置文件
1.在RocketMq平台新增主题 Log4j2ToRocketMq
2.spring-boot 项目 log4j2.xml 配置
由于log4j2暂时没有支持RocketMq,决定扩展AbstractAppender来实现自己的RocketMqAppender
首先maven部分
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 org.springframework.boot spring-boot-starter-log4j2
RocketMqAppender 实现,后续优化
package com.company.project.appenders; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.appender.AppenderLoggingException; import org.apache.logging.log4j.core.config.plugins.Plugin; import org.apache.logging.log4j.core.config.plugins.PluginAttribute; import org.apache.logging.log4j.core.config.plugins.PluginElement; import org.apache.logging.log4j.core.config.plugins.PluginFactory; import org.apache.logging.log4j.core.layout.PatternLayout; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.io.Serializable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 自定义Appender,继承 AbstractAppender 只需要覆盖自已想要的方法即可
* 类上面的注解是用来设置配置文件中的标签。 */ @Plugin(name = "RocketMqAppender", category = "Core", elementType = "appender", printObject = true) public final class RocketMqAppender extends AbstractAppender implements Serializable { private static final long serialVersionUID = -5120699436731845929L; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private static DefaultMQProducer producer; private static String rocketMqNameserver; private static String rocketMqProducerGroup; private static String rocketMqMsgTopic; private static String rocketMqMsgTag; //需要实现的构造方法,直接使用父类就行 private RocketMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, final boolean ignoreExceptions, String server, String group, String topic, String tag) { super(name, filter, layout, ignoreExceptions); initRocketMq(server, group, topic, tag); } private void initRocketMq(String server, String group, String topic, String tag) { if(this.producer==null){ this.rocketMqNameserver = server; this.rocketMqProducerGroup = group; this.rocketMqMsgTopic = topic; this.rocketMqMsgTag = tag; this.producer = new DefaultMQProducer(this.rocketMqProducerGroup); producer.setNamesrvAddr(this.rocketMqNameserver); try { producer.shutdown(); producer.start(); } catch (Exception e) { System.out.println(e); } } } @Override public void append(LogEvent logEvent) { readLock.lock(); try { final byte[] bytes = getLayout().toByteArray(logEvent);//日志二进制文件,输出到指定位置就行 //下面这个是要实现的自定义逻辑 //String strLog = new String(bytes); //System.out.println(strLog); if (producer != null) { Message msg = new Message(this.rocketMqMsgTopic, this.rocketMqMsgTag, bytes); producer.sendOneway(msg); msg = null; } } catch (Exception ex) { if (!ignoreExceptions()) { throw new AppenderLoggingException(ex); } } finally { readLock.unlock(); } } // 下面这个方法可以接收配置文件中的参数信息 @PluginFactory public static RocketMqAppender createAppender(@PluginAttribute("name") String name, @PluginAttribute("server") String server, @PluginAttribute("group") String group, @PluginAttribute("topic") String topic, @PluginAttribute("tag") String tag, @PluginElement("Filter") final Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginAttribute("ignoreExceptions") boolean ignoreExceptions) { if (name == null || server == null || group == null || topic == null) { LOGGER.error("PluginAttribute error"); return null; } if (layout == null) { layout = PatternLayout.createDefaultLayout(); } return new RocketMqAppender(name, filter, layout, ignoreExceptions, server, group, topic, tag); } }
log4j2.xml 配置文件
三.消费MQ队列消息
package com.company.project.mq; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Service @RocketMQMessageListener(consumerGroup = "SpringBootWeb-ConsumerGroup", topic = "Log4j2ToRocketMq") public class RocketConsumer implements RocketMQListener{ @Override public void onMessage(String message) { //System.err.println("接收到消息:" + message); //System.err.println("接收到消息一条" ); } }
参考:
https://www.cnblogs.com/moxiaodan/p/13800016.html