本篇文章帶大家了解一下redis進階用法-消息隊列,介紹一下redis中的延時隊列,希望對大家有所幫助!
說到消息隊列中間件,我們都會想到rabbitmq、rocketmq和kafka,來給應用實現異步消息傳遞的功能。這些都是專業的消息隊列中間件,其特性之多超出了我們的理解能力。
而這些消息中間件使用起來的是復雜的,例如RabbitMQ,發消息之前要創建Exchange,還要創建Queue,然后將Exchange和Queue通過某種規則綁定起來,發送消息的時候還要制定routing-key,還要 控制頭消息。這僅是生產者,消費者在消費消息之前也要將上面一系列的繁瑣步驟再操作一遍。
那么對于那些并不要求百分百可靠,并且希望實現簡單的消息隊列需求時,我們可以通過Redis將我們從消息隊列的中間件的繁瑣步驟中解脫出來。
Redis的消息隊列不是專業的消息隊列,他并沒有消息隊列中許多的高級特性,也沒有ack保證。如果對消息的可靠性有著極致的追求,請轉向專業的MQ中間件。【相關推薦:Redis視頻教程】
異步消息隊列
從最簡單的異步消息隊列開始,Redis的list數據結構常用來作為異步消息隊列,通過lrpush/lpush來操作入列,通過rpop/lpop來出列。
問題一:空隊列
對于pop操作來說,當消息隊列空了的時候,客戶端會陷入pop的死循環,造成大量的浪費生命的空輪詢,導致客戶端CPU拉高,同時Redis的QPS也被拉高。
對于以上問題的解決辦法就是通過list結構的blpop/brpop來操作出列,其中b前綴代表的就是blocking,阻塞讀。對于阻塞讀在隊列沒有數據的時候就會進入休眠狀態,一旦數據到來就會立刻醒來。完美的解決了上面這個問題。
問題二:空閑連接斷開
阻塞讀的方案看似完美,緊接著引出了另外一個問題:空閑連接。 如果線程一直阻塞在哪哪里,Redis的客戶端連接就變成了空閑連接。空閑時間過長,Redis服務器就會主動斷開連接,以減少閑置資源占用。這時候blpop/brpop就會拋出異常來。
所以,我們在編寫客戶端(應用程序)消費者的時候需要小心,注意捕獲異常,并進行重試。
應用一:延時隊列
在Redis的分布式鎖中一般有三種策略來處理加鎖失敗的情況:
-
直接拋出異常,前端提醒用戶是否要繼續操作;
-
sleep一會再重試;
-
將請求放到延時隊列中,一會再重試;
而Redis中延時隊列,我們可以通過zset(有序列表)數據結構來實現。我們將消息序列化作為一個字符串作為zse的value,而消息的到期處理時間(延時時間)作為score。然后通過輪詢zset獲取到期時間進行處理,通過zrem將key從zset移除代表成功消費,進而處理任務。
核心代碼如下:
//?生產 public?void?delay(T?msg)?{ ??TaskItem?task?=?new?TaskItem(); ??task.id?=?UUID.randomUUID().toString();?//?分配唯一的?uuid ??task.msg?=?msg; ??String?s?=?JSON.toJSONString(task);?//?fastjson?序列化 ??jedis.zadd(queueKey,?System.currentTimeMillis()?+?5000,?s);?//?塞入延時隊列?,5s?后再試 } //?消費 public?void?loop()?{ ??while?(!Thread.interrupted())?{ ???//?zrangeByScore參數中0,?System.currentTimeMills()代表從redis中去score范圍在0到系統當前時間的數據,?0,1表示從0開始取1個?拓展傳入的score為-inf,?+inf?分別表示zset中的最大值和最小值,當你不知道zset中的score最值時就可以使用inf作為參數變量 ???Set?values?=?jedis.zrangeByScore(queueKey,?0,?System.currentTimeMillis(),?0,?1); ???if?(values.isEmpty())?{ ?????try?{ ???????Thread.sleep(500);?//?歇會繼續 ????} ?????catch?(InterruptedException?e)?{ ???????break; ????} ?????continue; ??} ???String?s?=?values.iterator().next();??//消費隊列 ???if?(jedis.zrem(queueKey,?s)?>?0)?{?//?搶到了,要考慮到多線程下鎖爭搶的情況,只有rem成功代表成功的消費了一條消息。 ?????TaskItem?task?=?JSON.parseObject(s,?TaskType);?//?fastjson?反序列化 ?????this.handleMsg(task.msg); ??} } }
以上的代碼在多線程中對于同一個任務被多個線程爭搶的情況,雖然能夠通過zrem后在處理任務來避免一個任務被多次消費的情況。但是對于那些獲取到了任務但是沒有成功消費的線程來說,都是白白浪費時間取了一次任務。所以可以考慮通過lua scripting來優化這個邏輯。將zrangeByScore和zrem一同挪到服務器進行原子操作,就能夠完美解決了。
更多編程相關知識,請訪問:Redis視頻教程!!