一文详解消息队列中为什么不用redis作为队列
目录
- 1 引言
- 1.1 Redis中List队列
- 1.1.1 简单使用
- 1.1.2 解决cpu空转问题
- 1.1.3 Redis阻塞式拉取
- 1.2 Redis发布订阅
- 1.2.1 简单使用
- 1.2.2 发布订阅的缺点
- 1.3 Redis中的Stream
- 1.3.1 简单使用
- 1.3.2 stream阻塞拉取
- 1.3.3 Stream支持发布 / 订阅模式
- 1.3.4 stream不丢消息
- 1.3.5 stream持久化处理
- 1.3.6 stream消息堆积
- 1.4 与专业消息对比
- 1.4.1 生产者会不会丢消息
- 1.4.2 消费者会不会丢消息
- 1.4.3 队列中间件会不会丢消息
- 1.4.4 消息积压怎么办
- 总结
1 引言
我经常听到很多人讨论,关于把 Redis 当作队列来用是否合适的问题。
有些人表示赞成,他们认为
Redis
很轻量,用作队列很方便。也些人则反对,认为
Redis
会丢数据,最好还是用专业的队列中间件更稳妥究竟哪种方案更好呢?
这篇文章,就聊一聊把
Redis
当作队列,究竟是否合适这个问题。从简单到复杂,一步步梳理其中的细节,把这个问题真正的讲清楚。
看完这篇文章后对这个问题你会有全新的认识。
在文章的最后,还会告诉你关于技术选型的思路,文章有点长,希望你可以耐心读完
1.1 Redis中List队列
1.1.1 简单使用
从最简单的开始:
List
队列首先,我们先从最简单的场景开始讲起
如果你的业务需求足够简单,想把
Redis
当作队列来使用,肯定最先想到的就是使用 List
这个数据类型因为
List
底层的实现就是一个链表,在头部和尾部操作元素,时间复杂度都是 O(1)
,这意味着它非常符合消息队列的模型。如果把
List
当作队列,你可以这么来用。生产者使用
LPUSH
发布消息:127.0.0.1:6379> LPUSH queue msg1 (integer) 1 127.0.0.1:6379> LPUSH queue msg2 (integer) 2
消费者这一侧,使用
RPOP
拉取消息:127.0.0.1:6379> RPOP queue "msg1" 127.0.0.1:6379> RPOP queue "msg2"
这个模型非常简单,也很容易理解。

但这里有个小问题,当队列中已经没有消息了,消费者在执行
RPOP
时,会返回 NULL
127.0.0.1:6379> RPOP queue (nil) // 没消息了
而我们在编写消费者逻辑时,一般是一个
死循环
,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:while true: msg = redis.rpop("queue") // 没有消息,继续循环 if msg == null: continue // 处理消息 handle(msg)
如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成
CPU 空转
,不仅浪费 CPU
资源,还会对 Redis
造成压力。1.1.2 解决cpu空转问题
怎么解决这个问题呢?
也很简单,当队列为空时,我们可以休眠一会,再去尝试拉取消息。代码可以修改成这样:
while true: msg = redis.rpop("queue") // 没有消息,休眠2s if msg == null: sleep(2) continue // 处理消息 handle(msg)
这就解决了
CPU
空转问题这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在延迟假设设置的休眠时间是
2s
,那新消息最多存在 2s
的延迟。1.1.3 Redis阻塞式拉取
要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发
CPU
空转问题。鱼和熊掌不可兼得
那如何做,既能及时处理新消息,还能避免
CPU
空转呢?Redis
是否存在这样一种机制:如果队列为空,消费者在拉取消息时就阻塞等待,一旦有新消息过来,就通知我的消费者立即处理新消息呢?幸运的是,
Redis
确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP
,这里的 B
指的是阻塞Block

现在,你可以这样来拉取消息了:
while true: // 没消息阻塞等待,0表示不设置超时时间 msg = redis.brpop("queue", 0) if msg == null: continue // 处理消息 handle(msg)
使用
BRPOP
这种阻塞式方式拉取消息时,还支持传入一个超时时间,如果设置为 0
,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL
这个方案不错,既兼顾了效率,还避免了 CPU
空转问题,一举两得注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被
Redis Server
判定为无效连接,之后 Redis Server
会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?
- 不支持重复消费:消费者拉取消息后,这条消息就从
List
中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据 - 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了
第一个问题是功能上的,使用
List
做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景第二个问题就比较棘手了,因为从
List
中 POP
一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。1.2 Redis发布订阅
1.2.1 简单使用
发布/订阅模型:
Pub/Sub
从名字就能看出来,这个模块是 Redis
专门是针对发布/订阅这种队列模型设计的它正好可以解决前面提到的第一个问题:重复消费。
即多组生产者、消费者的场景,我们来看它是如何做的。
Redis
提供了 PUBLISH / SUBSCRIBE
命令,来完成发布、订阅的操作。
假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。
首先,使用
SUBSCRIBE
命令,启动 2 个消费者,并订阅同一个队列。// 2个消费者 都订阅一个队列 127.0.0.1:6379> SUBSCRIBE queue Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "queue" 3) (integer) 1
此时,2 个消费者都会被阻塞住,等待新消息的到来。
之后,再启动一个生产者,发布一条消息。
127.0.0.1:6379> PUBLISH queue msg1 (integer) 1
这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。
127.0.0.1:6379> SUBSCRIBE queue // 收到新消息 1) "message" 2) "queue" 3) "msg1"
看到了么,使用
Pub/Sub
这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。除此之外,
Pub/Sub
还提供了匹配订阅模式,允许消费者根据一定规则,订阅多个自己感兴趣的队列// 订阅符合规则的队列 127.0.0.1:6379> PSUBSCRIBE queue.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "queue.*" 3) (integer) 1
这里的消费者,订阅了
queue.*
相关的队列消息。之后,生产者分别向
queue.p1
和 queue.p2
发布消息。127.0.0.1:6379> PUBLISH queue.p1 msg1 (integer) 1 127.0.0.1:6379> PUBLISH queue.p2 msg2 (integer) 1
这时再看消费者,它就可以接收到这 2 个生产者的消息了。
127.0.0.1:6379> PSUBSCRIBE queue.* Reading messages... (press Ctrl-C to quit) ... // 来自queue.p1的消息 1) "pmessage" 2) "queue.*" 3) "queue.p1" 4) "msg1" // 来自queue.p2的消息 1) "pmessage" 2) "queue.*" 3) "queue.p2" 4) "msg2"

我们可以看到,
Pub/Sub
最大的优势就是,支持多组生产者、消费者处理消息。1.2.2 发布订阅的缺点
讲完了它的优点,那它有什么缺点呢?
其实,
Pub/Sub
最大问题是:丢数据如果发生以下场景,就有可能导致数据丢失:
- 消费者下线
- Redis 宕机
- 消息堆积
究竟是怎么回事?
这其实与
Pub/Sub
的实现方式有很大关系。Pub/Sub
在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立数据转发通道,把符合规则的数据,从一端转发到另一端。一个完整的发布、订阅消息处理流程是这样的:
- 消费者订阅指定队列,
Redis
就会记录一个映射关系:队列->消费者 - 生产者向这个队列发布消息,那
Redis
就从映射关系中找出对应的消费者,把消息转发给它

看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。
这种设计方案,就导致了上面提到的那些问题。
例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。
如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部丢弃所以,当你在使用
Pub/Sub
时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。另外,因为
Pub/Sub
没有基于任何数据类型实现,所以它也不具备数据持久化的能力。也就是说,
Pub/Sub
的相关操作,不会写入到 RDB
和 AOF
中,当 Redis
宕机重启,Pub/Sub
的数据也会全部丢失。最后,我们来看
Pub/Sub
在处理消息积压
时,为什么也会丢数据?当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。
如果采用
List
当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis
内存会持续增长,直到消费者把所有数据都从链表中取出。但
Pub/Sub
的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!这是怎么回事?
还是回到
Pub/Sub
的实现细节上来说。每个消费者订阅一个队列时,
Redis
都会在 Server
上给这个消费者在分配一个缓冲区,这个缓冲区其实就是一块内存。当生产者发布消息时,
Redis
先把消息写到对应消费者的缓冲区中。之后,消费者不断地从缓冲区读取消息,处理消息。

但是,问题就出在这个缓冲区上
因为这个缓冲区其实是有上限的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。
如果超过了缓冲区配置的上限,此时,
Redis
就会强制把这个消费者踢下线这时消费者就会消费失败,也会丢失数据。
如果你有看过
Redis
的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60
它的参数含义如下:32mb
:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线8mb + 60
:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线
Pub/Sub
的这一点特点,是与 List
作队列差异比较大的从这里你应该可以看出,
List
其实是属于拉模型,而 Pub/Sub
其实属于推模型。List
中的数据可以一直积压在内存中,消费者什么时候来拉都可以。但
Pub/Sub
是把消息先推到消费者在 Redis Server
上的缓冲区中,然后等消费者再来取。当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,
Redis
为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。好了,现在我们总结一下
Pub/Sub
的优缺点:- 支持发布 / 订阅,支持多组生产者、消费者处理消息
- 消费者下线,数据会丢失
- 不支持数据持久化,
Redis
宕机,数据也会丢失 - 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失
有没有发现,除了第一个是优点之外,剩下的都是缺点。
所以,很多人看到
Pub/Sub
的特点后,觉得这个功能很鸡肋也正是以上原因,
Pub/Sub
在实际的应用场景中用得并不多目前只有哨兵集群和
Redis
实例通信时,采用了 Pub/Sub
的方案,因为哨兵正好符合即时通讯的业务场景。我们再来看一下,
Pub/Sub
有没有解决,消息处理时异常宕机,无法再次消费的问题呢?其实也不行,
Pub/Sub
从缓冲区取走数据之后,数据就从 Redis
缓冲区删除了,消费者发生异常,自然也无法再次重新消费。好,现在我们重新梳理一下,我们在使用消息队列时的需求。
当我们在使用一个消息队列时,希望它的功能如下:
- 支持阻塞等待拉取消息
- 支持发布 / 订阅模式
- 消费失败,可重新消费,消息不丢失
- 实例宕机,消息不丢失,数据可持久化
- 消息可堆积
Redis
除了 List
和 Pub/Sub
之外,还有符合这些要求的数据类型吗?其实,
Redis
的作者也看到了以上这些问题,也一直在朝着这些方向努力着。Redis
作者在开发 Redis
期间,还另外开发了一个开源项目 disque
这个项目的定位,就是一个基于内存的分布式消息队列中间件。但由于种种原因,这个项目一直不温不火。
终于,在
Redis 5.0
版本,作者把 disque
功能移植到了 Redis
中,并给它定义了一个新的数据类型:Stream
下面我们就来看看,它能符合上面提到的这些要求吗?1.3 Redis中的Stream
1.3.1 简单使用
趋于成熟的队列:
Stream
我们来看 Stream
是如何解决上面这些问题的我们依旧从简单到复杂,依次来看
Stream
在做消息队列时,是如何处理的?首先,
Stream
通过 XADD
和 XREAD
完成最简单的生产、消费模型:- XADD:发布消息
- XREAD:读取消息
生产者发布 2 条消息:
// *表示让Redis自动生成消息ID 127.0.0.1:6379> XADD queue * name zhangsan "1618469123380-0" 127.0.0.1:6379> XADD queue * name lisi "1618469127777-0"
使用
XADD
命令发布消息,其中的*
表示让 Redis
自动生成唯一的消息 ID
这个消息 ID
的格式是时间戳-自增序号
消费者拉取消息:
// 从开头读取5条消息,0-0表示从开头读取 127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0 1) 1) "queue" 2) 1) 1) "1618469123380-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618469127777-0" 2) 1) "name" 2) "lisi"
如果想继续拉取消息,需要传入上一条消息的 ID:
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0 (nil)
没有消息,
Redis
会返回 NULL
