使用Redis的发布订阅模式实现即时通讯功能的消息队列中间件,以适应分布式模块中即时通讯服务的需要。
整合方式:SpringBoot + WebSocket + Redis
1. 发布订阅模式 Redis的发布订阅机制是一种消息通信模式,包括三个部分,发布者(pub)发送消息,订阅者(sub)接收信息和Channel
,这里的Channel
类似于Kafka
中的topic
的概念。
发布者和订阅者都是Redis
客户端,Channel
则为Redis
服务器端,可以理解为一种特殊的数据存储结构 。发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。
Redis客户端可以订阅任意数量的频道。
Redis
的这种发布订阅机制与基于主题的发布订阅类似,Channel
相当于主题。
下图展示频道channel1,以及订阅这个频道的三个客户端–client2,client5和client1之间的关系
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端
1.1. 相关命令 1.1.1. 订阅者/等待接收消息 首先打开 Redis 客户端,然后订阅了一个名为“bbx”的 channel,使用如下命令:
127.0.0.1:6379> SUBSCRIBE bbx Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "bbx" 3) (integer ) 1
使用SUBSCRIBE命令订阅了名为 bbx 的 channel。命令执行后该客户端会出处于等待接收消息的阻塞状态。
1.1.2. 发布者/发送消息 下面再启动一个 Redis 客户端,输入如下命令:
127.0.0.1:6379> PUBLISH bbx hello (integer ) 1 127.0.0.1:6379> PUBLISH bbx world (integer ) 1 127.0.0.1:6379>
1.1.3. 订阅者/成功接收消息 127.0.0.1:6379> SUBSCRIBE bbx Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "bbx" 3) (integer ) 1 1) "message" 2) "bbx" 3) "hello" 1) "message" 2) "bbx" 3) "world"
1.2. 原理
源码:pubsub.c
Redis通过PUBLISH
,SUBSCRIBE
和PSUBSCRIBE
等命令实现发布和订阅功能
通过SUBSCRIBE
命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。SUBSCRIBE
命令的关键,就是将客户端添加到给定频道的订阅链表中。
通过PUBLISH
命令向订阅者发送消息,redis-server会使用给定频道作为键,在它维护的频道字典中查找记录了订阅这个频道的所有客户端的链表,将消息发布给所有订阅者
Pub和Sub从字面上理解就是发布(Publish)和订阅(Subscribe),在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是实时消息系统,比如普通的即时聊天,群聊等功能。
1.3. 缺点
消息无法持久化,存在丢失风险,即消息一经发布,即使没有任何订阅方处理,该条消息就会丢失
没有类似ACK的机制,即发布方不会确保订阅方成功接收
广播机制,下游消费能力取决于消费方本身。广播机制无法通过添加多个消费方增强消费能力, 因为这和发布/订阅模型本身的目的是不符的。广播机制的目的是一个一个发布者被多个订阅进行不同的处理
解决方法:
添加持久化层,加入redis缓存和MySQL
结合WebSocket,实现消息推送、握手、心跳检测和广播等机制
2. 和SpringBoot、WebSocket的整合 2.1. 依赖
spring-boot-starter-web :帮助我们启动一个Web服务器;
spring-boot-starter-data-redis :帮助我们集成Redis;
lombok :方便我们使用 @Slf4j /@Data 等,简化代码;
spring-boot-starter-websocket :帮助我们在SpringBoot客户端起WebSocket进程
2.2. 架构 2.2.1. 整体架构
Redis(消息队列)——> MySQL(持久化) ——> WebSockets(消息转发)——> FrontEnd(客户端)
2.2.2. 后端架构
Controller——(n,1)——>Redis——(1,n)———>Listener——(1,n)——>WebSockets
2.3. Redis配置 2.3.1. 自定义RedisTemplate @Configuration public class RedisTemp { @Bean @SuppressWarnings("all") public RedisTemplate<String, Object> redisTemplate (RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate <String, Object>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer (Object.class); ObjectMapper om = new ObjectMapper (); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer (); template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
2.3.2. 发布订阅配置 由于我们在发布者、订阅者的代码中均没有指定要订阅的channel,因此需要在其他地方(发布订阅配置类)指定channel。
此配置类主要实现了Redis消息监听器容器,这个容器加载了RedisConnectionFactory和消息监听器;可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器通过反射技术调用消息订阅处理器的相关方法进行一些业务处理。
@Configuration public class SubConfig { private final RedisConnectionFactory redisConnectionFactory; @Autowired @SuppressWarnings("all") public SubConfig (RedisConnectionFactory redisConnectionFactory) { this .redisConnectionFactory = redisConnectionFactory; } @Bean public MessageListener listener () { return ConnectionListener.getInstance(); } @Bean public ChannelTopic channelTopic () { return new ChannelTopic ("nxb message test" ); } @Bean public RedisMessageListenerContainer messageListenerContainer () { RedisMessageListenerContainer container = new RedisMessageListenerContainer (); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(listener(), channelTopic()); return container; } }
2.4. 消费者(sub) 这里需要实现MessageListener
接口。目的是利用onMessage
方法,监听channel
中的消息队列,实现接受消息。
消费者的核心方法是onMessage
,方法传入订阅到的消息,然后进行处理即可。
这里把redis的消费者方法和websocket进行了整合,redis消费者收到上游消息后,由websocket转发给对应用户,再转发给前端。
@Component public class ConnectionListener implements MessageListener { private ConcurrentHashMap<String, WebSocket> webSockets = new ConcurrentHashMap <>(); private static final ConnectionListener instance = new ConnectionListener (); private ConnectionListener () {} public static ConnectionListener getInstance () { return instance; } @Override public void onMessage (Message message, byte [] pattern) { String body = new String (message.getBody()); MessageVO messageVO = JSON.parseObject((String)JSON.parse(message.toString()), MessageVO.class); System.out.println("received message: " + messageVO); sendMessage(messageVO); } public void addConnection (String userId,WebSocket webSocket) { webSockets.put(userId,webSocket); } public boolean containConnection (String userId) { return webSockets.containsKey(userId); } public void removeConnection (String userId) { webSockets.remove(userId); } public int connectionCount () { return webSockets.size(); } public void sendMessage (MessageVO message) { if (containConnection(message.getUser())){ String key = message.getUser(); webSockets.get(key).sendMessage(message); System.out.println(message + " has been sent to " + key); } } }
2.5. 生产者(pub) 指定要发布到的channel和要发布的消息。注意这里的channel由配置类进行管理
@Service @Slf4j public class PublishService { @Autowired private RedisUtil redisUtil; @Autowired private ChannelTopic channelTopic; public void publish (Object message) { redisUtil.convertAndSend(channelTopic.getTopic(), message); } }
其中redisUtil
封装了转发消息方法:
public boolean convertAndSend (String channel, Object message) { if (!StringUtils.hasText(channel)) { return false ; } try { redisTemplate.convertAndSend(channel, message); log.info("发送消息成功,channel:{},message:{}" , channel, message); return true ; } catch (Exception e) { log.info("发送消息失败,channel:{},message:{}" , channel, message); e.printStackTrace(); } return false ; }
3. 测试 3.1. 测试类 @Slf4j @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class RedisSubPubTest { @Autowired private ChannelTopic topic; @Autowired private RedisUtil redisUtil; @Test public void test () { final String TOPIC = "TEST_TOPIC1" ; MessageVO messageVO = MessageVO.builder() .user("pengzna" ) .msg("hello world!" ) .build(); redisUtil.convertAndSend(topic.getTopic(), messageVO); } }
3.2. 测试结果
参考资料