19.docker 安装 RocketMq 并配置log4j2来收集日志


一.安装docker RocketMq

1.创建 nameSrv 数据挂载文件夹

mkdir -p data/rocketMQ/data/namesrv/logs
mkdir -p data/rocketMQ/data/namesrv/store

 2.查看RocketMq镜像来源

docker search rocketmq

 3.找个star 最多的,我选择的是rocketmqinc/rocketmq

docker pull rocketmqinc/rocketmq

 4.启动 namesrv 服务

docker run -d -p 9876:9876 -v /data/rocketMQ/data/namesrv/logs:/root/logs -v /data/rocketMQ/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" docker.io/rocketmqinc/rocketmq sh mqnamesrv

 

 5.安装 broker

创建 broker 配置文件挂载文件夹

mkdir -p /data/rocketMQ/data/broker/logs
mkdir -p /data/rocketMQ/data/broker/store
mkdir -p /data/rocketMQ/data/broker/conf/

 6.新增 broker 配置文件,在 /data/rocketMQ/data/broker/conf 目录下创建 broker.conf 文件,内容如下:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#自己的ip brokerIP1 = 192.168.88.50

 7.运行 broker 容器

docker run -d -p 10911:10911 -p 10909:10909 -v /data/rocketMQ/data/broker/logs:/root/logs -v /data/rocketMQ/data/broker/store:/root/store -v /data/rocketMQ/data/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name rmqbroker --link rmqnamesrv:rmqnamesrv -e "NAMESRV_ADDR=192.168.88.50:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

 8.安装控制台

docker search rocketmq-console

  同样选择一个,我选择styletang/rocketmq-console-ng 

 9.获取控制台

docker pull styletang/rocketmq-console-ng

10.启动控制台镜像

#注意红色地址采用的是上面nameSrv 配置的
docker run -d -p 8080:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=192.168.88.50:9876 -Drocketmq.config.isVIPChannel=false" -t styletang/rocketmq-console-ng

 浏览器登录

http://192.168.88.50:8080

二.配置springboot log4j2配置文件

 1.在RocketMq平台新增主题 Log4j2ToRocketMq

 2.spring-boot 项目 log4j2.xml 配置

由于log4j2暂时没有支持RocketMq,决定扩展AbstractAppender来实现自己的RocketMqAppender

首先maven部分

      
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.0.4
        
        
        
            org.springframework.boot
            spring-boot-starter-log4j2
                

  RocketMqAppender 实现,后续优化

package com.company.project.appenders;

import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.io.Serializable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


/**
 * 自定义Appender,继承 AbstractAppender 只需要覆盖自已想要的方法即可
* 类上面的注解是用来设置配置文件中的标签。 */ @Plugin(name = "RocketMqAppender", category = "Core", elementType = "appender", printObject = true) public final class RocketMqAppender extends AbstractAppender implements Serializable { private static final long serialVersionUID = -5120699436731845929L; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private static DefaultMQProducer producer; private static String rocketMqNameserver; private static String rocketMqProducerGroup; private static String rocketMqMsgTopic; private static String rocketMqMsgTag; //需要实现的构造方法,直接使用父类就行 private RocketMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, final boolean ignoreExceptions, String server, String group, String topic, String tag) { super(name, filter, layout, ignoreExceptions); initRocketMq(server, group, topic, tag); } private void initRocketMq(String server, String group, String topic, String tag) { if(this.producer==null){ this.rocketMqNameserver = server; this.rocketMqProducerGroup = group; this.rocketMqMsgTopic = topic; this.rocketMqMsgTag = tag; this.producer = new DefaultMQProducer(this.rocketMqProducerGroup); producer.setNamesrvAddr(this.rocketMqNameserver); try { producer.shutdown(); producer.start(); } catch (Exception e) { System.out.println(e); } } } @Override public void append(LogEvent logEvent) { readLock.lock(); try { final byte[] bytes = getLayout().toByteArray(logEvent);//日志二进制文件,输出到指定位置就行 //下面这个是要实现的自定义逻辑 //String strLog = new String(bytes); //System.out.println(strLog); if (producer != null) { Message msg = new Message(this.rocketMqMsgTopic, this.rocketMqMsgTag, bytes); producer.sendOneway(msg); msg = null; } } catch (Exception ex) { if (!ignoreExceptions()) { throw new AppenderLoggingException(ex); } } finally { readLock.unlock(); } } // 下面这个方法可以接收配置文件中的参数信息 @PluginFactory public static RocketMqAppender createAppender(@PluginAttribute("name") String name, @PluginAttribute("server") String server, @PluginAttribute("group") String group, @PluginAttribute("topic") String topic, @PluginAttribute("tag") String tag, @PluginElement("Filter") final Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginAttribute("ignoreExceptions") boolean ignoreExceptions) { if (name == null || server == null || group == null || topic == null) { LOGGER.error("PluginAttribute error"); return null; } if (layout == null) { layout = PatternLayout.createDefaultLayout(); } return new RocketMqAppender(name, filter, layout, ignoreExceptions, server, group, topic, tag); } }

  log4j2.xml 配置文件





    
        
        
        
        
        
        
        
        
        
        
        
        
    
    
    
        
        
            
            
            
            
        

        
        

        
        
            
            
            
            
            
                
                
            
            
            
        
        
            
            
            
                
                
            
        
        
            
            
            
                
                
            
        

        
        
            
        
    
    
    
        
        
        
            
            
        
        
            
            
            
            
            
        
    

三.消费MQ队列消息

package com.company.project.mq;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(consumerGroup = "SpringBootWeb-ConsumerGroup", topic = "Log4j2ToRocketMq")
public class RocketConsumer implements RocketMQListener {

    @Override
    public void onMessage(String message) {
        //System.err.println("接收到消息:" + message);
        //System.err.println("接收到消息一条" );
    }
}

参考:

https://www.cnblogs.com/moxiaodan/p/13800016.html