如何基于redis實現消息隊列

如何基于redis實現消息隊列

消息隊列,Message Queue,常用于解決并發系統中的資源一致性問題,提升峰值的處理能力,同時保證消息的順序性、可恢復性、必送達性,對應用進行解耦,或者實現異步通訊等。 ? (推薦學習:Redis視頻教程

市面上的 MQ應用有很多(例如:kafkarabbitmq,Disque),同時也可以基于 redis 來實現,比較典型的方案有:

基于List的 LPUSH+BRPOP 的實現

PUB/SUB,訂閱/發布模式

基于Sorted-Set的實現

基于Stream類型的實現

在消息隊列使用中,有生產者producter和消費者consumer。生產者負責生成消息,消費者負責使用處理消息。

生產,指的是將消息放入消息隊列。

消費,指的是讀取并處理消息。通常一個消息再被消費后,就應該從消息隊列中刪除。

如何基于redis實現消息隊列

基于List的 LPUSH+BRPOP 的實現

典型的命令為:

LPUSH,將消息隊列 BRPOP,從隊列中取出消息,阻塞模式

就是一個典型的基于FIFL隊列的解決方案。其中LPUSH是生產者做的事,而BRPOP是消費者做的事。

該模式有很多優點:

實現簡單

Reids支持持久化消息,意味著消息不會丟失,可以重復查看(注意不是消費,只看不用,LRANGE類的指令)。

可以保證順序,保證使用LPUSH命令,可以保證消息的順序性

使用RPUSH,可以將消息放在隊列的開頭,達到優先消息的目的,可以實現簡易的消息優先隊列。

同時也有些劣勢:

做消費確認ACK比較麻煩,就是不能保證消費者在讀取之后,未處理后的宕機問題。導致消息意外丟失。通常需要自己維護一個Pending列表,保證消息的處理確認。

不能做廣播模式,例如典型的Pub/Discribe模式。

不能重復消費,一旦消費就會被刪除

不支持分組消費,需要自己在業務邏輯層解決

PUB/SUB,訂閱/發布模式

SUBSCRIBE,用于訂閱信道 PUBLISH,向信道發送消息 UNSUBSCRIBE,取消訂閱

生產者和消費者通過相同的一個信道(channel)進行交互。信道其實也就是隊列。通常會有多個消費者。多個消費者訂閱同一個信道,當生產者向信道發布消息時,該信道會立即將消息逐一發布給每個消費者。可見,該信道對于消費者是發散的信道,每個消費者都可以得到相同的消息。典型的對多的關系。

典型的優點是:

典型的廣播模式,一個消息可以發布到多個消費者

多信道訂閱,消費者可以同時訂閱多個信道,從而接收多類消息

消息即時發送,消息不用等待消費者讀取,消費者會自動接收到信道發布的消息

也有些缺點:

消息一旦發布,不能接收。換句話就是發布時若客戶端不在線,則消息丟失,不能尋回

不能保證每個消費者接收的時間是一致的

若消費者客戶端出現消息積壓,到一定程度,會被強制斷開,導致消息意外丟失。通常發生在消息的生產遠大于消費速度時

可見,Pub/Sub 模式不適合做消息存儲,消息積壓類的業務,而是擅長處理廣播,即時通訊,即時反饋的業務。

基于SortedSet有序集合的實現

ZADD?KEY?score?member,壓入集合 ZRANGEBYSCORE,依據score獲取成員

有序集合的方案是在自己確定消息順ID時比較常用,使用集合成員的Score來作為消息ID,保證順序,還可以保證消息ID的單調遞增。通常可以使用時間戳+序號的方案。確保了消息ID的單調遞增,利用SortedSet的依據Score排序的特征,就可以制作一個有序的消息隊列了。

和上面的方案相比,優點就是可以自定義消息ID,在消息ID有意義時,比較重要。缺點也明顯,不允許重復消息(以為是集合),同時消息ID確定有錯誤會導致消息的順序出錯。

所以,若不是需要自定義消息ID,這個方案好像有點雞肋…

基于 Stream 類型的實現

這個Stream類型redis就是為了實現消息隊列的。支持自動生成消息ID,分組消費,ACK,消息轉移,隊列監控等核心消息隊列功能

更多Redis相關技術文章,請訪問Redis視頻教程欄目進行學習!

? 版權聲明
THE END
喜歡就支持一下吧
點贊6 分享