优雅的Pulsar Consumer Listener


目前考虑到的方式是使用AOP实现

--------------------切面类-----------------------
package com.hd.resource.aspect;

import com.hd.resource.pulsar.PulsarProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("ALL")
@Component
//标注这个类是切面
@Aspect
@Slf4j
public class PulsarConsumerListenerAspect {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Autowired
PulsarProperties pulsarProperties;

@Pointcut("@annotation(com.hd.resource.aspect.PulsarExceptionHandle)")
public void executeService() {
}

/**
* 环绕通知, 围绕着方法执行
*
* @param joinPoint
* @return
* @throws Throwable
*/
@Around("executeService()")
public void doAround(ProceedingJoinPoint joinPoint) {
log.info("收到消息:Around ====================================================");

long start = System.currentTimeMillis();
Object[] args = joinPoint.getArgs(); // 参数
Consumer consumer = (Consumer) args[0];
Message msg = (Message) args[1];
String data = new String(msg.getData());
String topicSub = "";
String[] controllerMethodDescription = null;

if (this.judgeIsOff()) {
consumer.negativeAcknowledge(msg);
log.error("当前开关为off 拒绝消费消息, MessageId {} data {}", msg.getMessageId(), data);
return;
}

try {
controllerMethodDescription = getControllerMethodDescription(joinPoint);
topicSub = controllerMethodDescription[0] + "===>" + controllerMethodDescription[1];

log.info("当前消费者和订阅:{}, 接收到消息ID为:{}, 消息体为:{}",
topicSub, msg.getMessageId(), data);
} catch (Exception e) {
log.error("执行解析注解方法时出现异常,检查BUG", e);
}

// 记录下请求内容
log.info("请求时间 : " + sdf.format(start));

// 执行业务逻辑
try {
Object result = joinPoint.proceed();
} catch (RuntimeException e) {
String processMethod = controllerMethodDescription[2];
log.error("执行订阅:{}的Listener时捕获RuntimeException,本方法运行时异常处理策略:{}", topicSub, PulsarConsumerListenerProcessMethodEnum.MAP.get(processMethod), e);

if (processMethod.equals(PulsarConsumerListenerProcessMethodEnum.DINGDING.getCode())) {
try {
//todo
consumer.acknowledge(msg.getMessageId());
} catch (PulsarClientException e2) {
log.error("向Pulsar Server端ACK时出现异常", e2);
}
} else if (processMethod.equals(PulsarConsumerListenerProcessMethodEnum.NACK.getCode())) {
consumer.negativeAcknowledge(msg.getMessageId());
} else if (processMethod.equals(PulsarConsumerListenerProcessMethodEnum.RE_CONSUME.getCode())) {
try {
consumer.reconsumeLater(msg, 1, TimeUnit.SECONDS);
} catch (PulsarClientException pulsarClientException) {
pulsarClientException.printStackTrace();
}
}

return;
} catch (Throwable throwable) {
log.error("捕获非运行时异常, 拒绝消费消息, MessageId {}, 消息体 {}", msg.getMessageId(), data, throwable);
consumer.negativeAcknowledge(msg.getMessageId());
return;
}

Long total = System.currentTimeMillis() - start;
// 处理完请求,返回内容

try {
consumer.acknowledge(msg.getMessageId());
} catch (PulsarClientException e) {
log.error("向Pulsar Server端ACK时出现异常", e);
}

log.info("Aop耗时 : " + total + " ms!");
log.info("执行完成 :==========================================================================================");
}

// 通过反射获取参入的参数
public static String[] getControllerMethodDescription(JoinPoint joinPoint) throws Exception {
String targetName = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
Object[] arguments = joinPoint.getArgs();
Class targetClass = Class.forName(targetName);
String topic = "";
String sub = "";
String processingMethod = "";

Method[] methods = targetClass.getMethods();
for (Method method : methods) {
if (method.getName().equals(methodName)) {
Class[] clazzs = method.getParameterTypes();

if (clazzs.length == arguments.length) {
if (method.getAnnotation(com.hd.resource.aspect.PulsarExceptionHandle.class) != null) {
topic = method.getAnnotation(com.hd.resource.aspect.PulsarExceptionHandle.class).topic();
sub = method.getAnnotation(com.hd.resource.aspect.PulsarExceptionHandle.class).sub();
processingMethod = method.getAnnotation(com.hd.resource.aspect.PulsarExceptionHandle.class).processingMethod();
}
break;
}
}
}

return new String[]{topic, sub, processingMethod};
}

/**
* 判断开关
*
* @return is equals off
*/
public boolean judgeIsOff() {
return pulsarProperties.getOnOff().equals("off");
}
}
--------------------------注解类------------------------------------
import java.lang.annotation.*;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.PARAMETER})
@Documented
public @interface PulsarExceptionHandle {
String topic() default "无描述";

String sub() default "无描述";

String processingMethod() default "1";
}
-----------------------------枚举----------------------------------
import java.util.HashMap;
import java.util.Map;

/**
* Pulsar Consumer Listener RuntimeException的处理策略
*/
public enum PulsarConsumerListenerProcessMethodEnum {
DINGDING("1", "钉钉群推送一条消息"),
NACK("2", "拒绝消费"),
RE_CONSUME("3", "1分钟之后重新消费"); //前提要有重试信队列&死信队列

private String code;
private String desc;

public static final Map MAP = new HashMap<>(PulsarConsumerListenerProcessMethodEnum.values().length);

static {
for (PulsarConsumerListenerProcessMethodEnum processMethodEnum : PulsarConsumerListenerProcessMethodEnum.values()) {
MAP.put(processMethodEnum.getCode(), processMethodEnum.getDesc());
}
}

PulsarConsumerListenerProcessMethodEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}

public String getCode() {
return code;
}

public String getDesc() {
return desc;
}
}
---------------Listener实现类 只需关注业务逻辑-------------------
@Component
public class AuditCommentResultListener extends AbstractListener {

@Autowired
CommentService commentService;

@PulsarExceptionHandle(topic = "audit-comment-result-topic", sub = "resource-sub-audit-comment-result")
@Override
public void received(Consumer consumer, Message msg) {
// if (1 == 1) throw new Error();
java.lang.String data = new java.lang.String(msg.getData());

// 处理业务逻辑
CommentAuditCallBackDto dto = JSONObject.parseObject(data, CommentAuditCallBackDto.class);
if (ObjectUtils.isEmpty(dto) || ObjectUtils.isEmpty(dto.getLicenseStatus())
|| ObjectUtils.isEmpty(dto.getLicenseId())) {
log.error("参数为空, MessageId {} data {}", msg.getMessageId(), data);
} else {
if (AuditLicenseStatus.SUCCESS.getCode().equals(dto.getLicenseStatus())) {
dto.setLicenseStatus(LicenseStatus.AUDIT_SUCCESS.getCode());
} else {
dto.setLicenseStatus(LicenseStatus.AUDIT_FAILED.getCode());
}
commentService.licenseCallback(dto.getLicenseStatus(), dto.getLicenseId());
}
}
}