SpringBoot整合Redis实现发布订阅功能实践


一、项目结构

我首先用 SpringBoot Initializer 创建一个简单的 Demo,然后在 Demo 上进行修改,这样更便捷。项目结构如下图所示:

项目结构也很简单

  • PrintMessageListener 负责处理订阅消息,我仅仅是打印了收到的Redis信息;
  • AdminController 负责从浏览器输入url,实现动态订阅/取消订阅以及发布;
  • RedisConfiguration 可能是最重要的,需要负责向 Spring容器注入以下 Bean:
    • RedisTemplate :可以通过调用它的 convertAndSend(channel, Object message) 方法 发布消息;
    • RedisMessageListenerContainer ,可以通过调用它的 addMessageListener(MessageListener listener, Topic topic) 方法 订阅消息;相反地,也可以调用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消订阅消息;
  • PubsubApplication 是 SpringBoot 的启动类;
  • logback.xml 配置内容可以参考

PS:作为 Maven 项目,肯定还要有 pom.xml,图片中没有反映出来,所以我补充一下。

二、Maven 依赖

项目需要引入的依赖包括:

  • spring-boot-starter-web:帮助我们启动一个Web服务器;
  • spring-boot-starter-data-redis:帮助我们集成Redis;
  • lombok:方便我们使用 @Slf4j/@Data 等,简化代码;
  • slf4j-api:让我们能够使用 LoggerLoggerFactory 等类;
  • logback-classic:让我们能够真正打印出日志。

完整的 pom.xml 文件:

<?xml version="1.0" encoding="UTF-8"?>

  4.0.0
  
    org.springframework.boot
    spring-boot-starter-parent
    2.5.3
     
  

  com.example.demo
  pubsub
  0.0.1-SNAPSHOT
  pubsub
  Demo project for Spring Boot

  
    1.8
    1.7.32
    1.2.6
  
  
    
      org.springframework.boot
      spring-boot-starter-web
    

    
      org.springframework.boot
      spring-boot-starter-data-redis
    

    
      org.projectlombok
      lombok
      true
    
    
      org.slf4j
      slf4j-api
      ${slf4j.version}
    
    
      ch.qos.logback
      logback-classic
      ${logback.version}
    

    
      org.springframework.boot
      spring-boot-starter-test
      test
    
  

  
    
      
        org.springframework.boot
        spring-boot-maven-plugin
	
	  
	    
	      org.projectlombok
	      lombok
            
	  
        
      
    
  


三、消息监听

我们收到发布的消息后,需要处理逻辑,这部分逻辑写在 PrintMessageListener 中:

package com.example.demo.pubsub.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 功能描述:打印收到的Redis信息
 *
 * @author geekziyu
 * @version 1.0.0
 */
@Slf4j
public class PrintMessageListener implements MessageListener {

    private StringRedisSerializer stringRedisSerializer;

    public PrintMessageListener(StringRedisSerializer stringRedisSerializer) {
        this.stringRedisSerializer = stringRedisSerializer;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = stringRedisSerializer.deserialize(message.getChannel());
        String body = stringRedisSerializer.deserialize(message.getBody());
        handleMessage(channel, body);
    }

    private void handleMessage(String channel, String body) {
        log.info("消费Redis消息\n channel:{}\n body:{}", channel, body);
    }
}

四、Redis配置

前面也说过了,我们要使用 spring-boot-starter-data-redis 中提供的API实现Redis发布和订阅消息,就需要用到 RedisTemplateRedisMessageListenerContainer,现在就来把他们注入Spring容器:

package com.example.demo.pubsub.config;

import com.example.demo.pubsub.listener.PrintMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 功能描述:Redis 配置
 *
 * @author geekziyu
 * @version 1.0.0
 */
@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer result = new RedisMessageListenerContainer();
        result.setConnectionFactory(redisConnectionFactory);

        return result;
    }

    @Bean("redisTemplate")
    public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate result = new RedisTemplate<>();
        result.setConnectionFactory(factory);

        result.setKeySerializer(stringRedisSerializer());
        result.setHashKeySerializer(stringRedisSerializer());

        result.setValueSerializer(stringRedisSerializer());
        result.setHashValueSerializer(stringRedisSerializer());
        return result;
    }

    @Bean
    public PrintMessageListener printMessageListener() {
        return new PrintMessageListener(stringRedisSerializer());
    }

    @Bean
    public StringRedisSerializer stringRedisSerializer() {
        return new StringRedisSerializer();
    }
}

需要注意的有以下几点:
第一、如果不调用 setConnectionFactory(RedisConnectionFactory),给 RedisMessageListenerContainer 设置连接工厂,在调用 addMessageListener 执行订阅时,会出现空指针异常,具体发生异常的位置如下图:

第二、如果不调用 RedisTemplatesetConnectionFactory 方法设置Redis连接工厂,会在启动时就发生异常,如下图所示:

// 说明 RedisConnectionFactory 对于 RedisTemplate 而言是必需的!
Caused by: java.lang.IllegalStateException: RedisConnectionFactory is required
	at org.springframework.util.Assert.state(Assert.java:76)
	at org.springframework.data.redis.core.RedisAccessor.afterPropertiesSet(RedisAccessor.java:38)
	at org.springframework.data.redis.core.RedisTemplate.afterPropertiesSet(RedisTemplate.java:128)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)

五、通过HTTP请求订阅发布

我这里用 AdminController 来接受发布和订阅/取消订阅的请求,源代码如下:

package com.example.demo.pubsub.controller;

import com.example.demo.pubsub.listener.PrintMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * 功能描述:后台控制器
 *
 * @author geekziyu
 * @version 1.0.0
 */
@RestController
@RequestMapping("/admin")
public class AdminController {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private RedisMessageListenerContainer container;

    private Map registeredListener = new HashMap<>();

    @Autowired
    private StringRedisSerializer stringRedisSerializer;


    @GetMapping("/pub")
    public String publish(String channel, String body) {
        redisTemplate.convertAndSend(channel, body);
        return "ok";
    }

    @GetMapping("/sub")
    public String subscribe(String channel) {
        MessageListener listener = registeredListener.computeIfAbsent(channel, ch -> new PrintMessageListener(stringRedisSerializer));
        container.addMessageListener(listener, new ChannelTopic(channel));
        return "ok";
    }

    @GetMapping("/unsub")
    public String unsubscribe(String channel) {
        MessageListener messageListener = registeredListener.get(channel);
        if (messageListener != null) {
            container.removeMessageListener(messageListener, new ChannelTopic(channel));
        }
        return "ok";
    }

}

六、打印日志

为了顺利的在控制台输出日志,你可能需要 logback.xml 的完整代码:

<?xml version="1.0" encoding="UTF-8"?>

  
    
      %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
    
  

  
    
  

小节

这样,我们就已经可以实现发布订阅了。

首先订阅一下:

http://localhost:8080/admin/sub?channel=dream

再发布一下:

http://localhost:8080/admin/pub?channel=dream&body=engineer

检查控制台,Redis消息消费成功:

需要注意,你的 application.properties 中Redis的连接默认为 localhost:6379

spring.redis.host=localhost
spring.redis.port=6379

你需要确保本地已经启动了Redis,且服务端口是6379。如果你不熟悉如何搭建Redis,那么你需要修改 Redis 连接到一个可用的 Redis 服务上去。

参考文档

SpringBoot整合Redis实现消息发布订阅