Pengzna's blog 👋

Jul 26, 2022

基于 Redis 发布订阅实现IM消息队列

使用Redis的发布订阅模式实现即时通讯功能的消息队列中间件,以适应分布式模块中即时通讯服务的需要。

整合方式:SpringBoot + WebSocket + Redis

在这里插入图片描述

1. 发布订阅模式

Redis的发布订阅机制是一种消息通信模式,包括三个部分,发布者(pub)发送消息,订阅者(sub)接收信息和Channel,这里的Channel类似于Kafka中的topic的概念。

img

发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,可以理解为一种特殊的数据存储结构。发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。

Redis客户端可以订阅任意数量的频道。

Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。

下图展示频道channel1,以及订阅这个频道的三个客户端–client2,client5和client1之间的关系

img

当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端

在这里插入图片描述

1.1. 相关命令

1.1.1. 订阅者/等待接收消息

首先打开 Redis 客户端,然后订阅了一个名为“bbx”的 channel,使用如下命令:

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1

使用SUBSCRIBE命令订阅了名为 bbx 的 channel。命令执行后该客户端会出处于等待接收消息的阻塞状态。

1.1.2. 发布者/发送消息

下面再启动一个 Redis 客户端,输入如下命令:

127.0.0.1:6379> PUBLISH bbx hello
(integer) 1
127.0.0.1:6379> PUBLISH bbx world
(integer) 1
127.0.0.1:6379>

1.1.3. 订阅者/成功接收消息

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1
#等待读取推送消息
1) "message" #消息
2) "bbx" #频道
3) "hello" #消息具体内容
1) "message"
2) "bbx"
3) "world"

1.2. 原理

源码:pubsub.c

Redis通过PUBLISHSUBSCRIBEPSUBSCRIBE等命令实现发布和订阅功能

通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定频道的订阅链表中。

通过PUBLISH命令向订阅者发送消息,redis-server会使用给定频道作为键,在它维护的频道字典中查找记录了订阅这个频道的所有客户端的链表,将消息发布给所有订阅者

Pub和Sub从字面上理解就是发布(Publish)和订阅(Subscribe),在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是实时消息系统,比如普通的即时聊天,群聊等功能。

1.3. 缺点

  • 消息无法持久化,存在丢失风险,即消息一经发布,即使没有任何订阅方处理,该条消息就会丢失
  • 没有类似ACK的机制,即发布方不会确保订阅方成功接收
  • 广播机制,下游消费能力取决于消费方本身。广播机制无法通过添加多个消费方增强消费能力, 因为这和发布/订阅模型本身的目的是不符的。广播机制的目的是一个一个发布者被多个订阅进行不同的处理

解决方法:

  1. 添加持久化层,加入redis缓存和MySQL
  2. 结合WebSocket,实现消息推送、握手、心跳检测和广播等机制

2. 和SpringBoot、WebSocket的整合

2.1. 依赖

  • spring-boot-starter-web:帮助我们启动一个Web服务器;
  • spring-boot-starter-data-redis:帮助我们集成Redis;
  • lombok:方便我们使用 @Slf4j/@Data 等,简化代码;
  • spring-boot-starter-websocket:帮助我们在SpringBoot客户端起WebSocket进程

2.2. 架构

2.2.1. 整体架构

Redis(消息队列)——> MySQL(持久化) ——> WebSockets(消息转发)——> FrontEnd(客户端)

2.2.2. 后端架构

Controller——(n,1)——>Redis——(1,n)———>Listener——(1,n)——>WebSockets

2.3. Redis配置

2.3.1. 自定义RedisTemplate

@Configuration
public class RedisTemp {
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
// Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// String 的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}

2.3.2. 发布订阅配置

由于我们在发布者、订阅者的代码中均没有指定要订阅的channel,因此需要在其他地方(发布订阅配置类)指定channel。

此配置类主要实现了Redis消息监听器容器,这个容器加载了RedisConnectionFactory和消息监听器;可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器通过反射技术调用消息订阅处理器的相关方法进行一些业务处理。

@Configuration
public class SubConfig {

private final RedisConnectionFactory redisConnectionFactory;

@Autowired
@SuppressWarnings("all")
public SubConfig(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}

/**
* <h2>配置消息监听器</h2>
* */
@Bean
public MessageListener listener() {
return ConnectionListener.getInstance();
}

/**
* <h2>配置 发布/订阅 的 Topic</h2>
* */
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("nxb message test");
}


/**
* <h2>将消息监听器绑定到消息容器</h2>
* 并进行容器的设置
* */
@Bean
public RedisMessageListenerContainer messageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
// 此处可以更换订阅主题
container.addMessageListener(listener(), channelTopic());
return container;
}
}

2.4. 消费者(sub)

这里需要实现MessageListener接口。目的是利用onMessage方法,监听channel中的消息队列,实现接受消息。

消费者的核心方法是onMessage,方法传入订阅到的消息,然后进行处理即可。

这里把redis的消费者方法和websocket进行了整合,redis消费者收到上游消息后,由websocket转发给对应用户,再转发给前端。

/**
* 负责订阅redis发布的接受器。
*/
@Component
public class ConnectionListener implements MessageListener {

private ConcurrentHashMap<String, WebSocket> webSockets = new ConcurrentHashMap<>();

private static final ConnectionListener instance = new ConnectionListener();

private ConnectionListener(){}

public static ConnectionListener getInstance(){
return instance;
}

/**
* 接收到消息
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String body = new String(message.getBody());
MessageVO messageVO = JSON.parseObject((String)JSON.parse(message.toString()), MessageVO.class);
System.out.println("received message: "+ messageVO);
sendMessage(messageVO);
}

public void addConnection(String userId,WebSocket webSocket){
webSockets.put(userId,webSocket);
}

public boolean containConnection(String userId){
return webSockets.containsKey(userId);
}

public void removeConnection(String userId){
webSockets.remove(userId);
}

public int connectionCount(){
return webSockets.size();
}

public void sendMessage(MessageVO message){
// 优化逻辑
if (containConnection(message.getUser())){
String key = message.getUser();
webSockets.get(key).sendMessage(message);
System.out.println(message + " has been sent to " + key);
}
}
}

2.5. 生产者(pub)

指定要发布到的channel和要发布的消息。注意这里的channel由配置类进行管理

/**
* 消息发布者服务
*/
@Service
@Slf4j
public class PublishService {

//注入自定义redisTemplate
@Autowired
private RedisUtil redisUtil;

@Autowired
private ChannelTopic channelTopic;

/**
* Publish.
* @param message the message
*/
public void publish(Object message) {
redisUtil.convertAndSend(channelTopic.getTopic(), message);
}
}

其中redisUtil封装了转发消息方法:

/**
* 向通道发布消息
*/
public boolean convertAndSend(String channel, Object message) {
if (!StringUtils.hasText(channel)) {
return false;
}
try {
redisTemplate.convertAndSend(channel, message);
log.info("发送消息成功,channel:{},message:{}", channel, message);
return true;
} catch (Exception e) {
log.info("发送消息失败,channel:{},message:{}", channel, message);
e.printStackTrace();
}
return false;
}

3. 测试

3.1. 测试类

@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class RedisSubPubTest {

@Autowired
private ChannelTopic topic;

@Autowired
private RedisUtil redisUtil;

@Test
public void test(){
final String TOPIC = "TEST_TOPIC1"; // 订阅主题
// 发布消息
MessageVO messageVO = MessageVO.builder()
.user("pengzna")
.msg("hello world!")
.build();
redisUtil.convertAndSend(topic.getTopic(), messageVO);
}
}

3.2. 测试结果

image-20220726182800342


参考资料

OLDER > < NEWER