詳解Redis和隊列

下面由redis教程欄目給大家詳解redis和隊列,希望對需要的朋友有所幫助!

詳解Redis和隊列

概要

Redis不僅可作為緩存服務器,還可用作消息隊列。它的列表類型天生支持用作消息隊列。如下圖所示:

詳解Redis和隊列

由于Redis的列表是使用雙向鏈表實現的,保存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。

普通隊列實現

所以可以直接使用Redis的List實現消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。簡單示例如下:

存放消息端(消息生產者):

package?org.yamikaze.redis.messsage.queue;? import?org.yamikaze.redis.test.MyJedisFactory;import?redis.clients.jedis.Jedis;? import?Java.util.concurrent.TimeUnit;? /** ?*?消息生產者 ?*?@author?yamikaze?*/public?class?Producer?extends?Thread?{? ????public?static?final?String?MESSAGE_KEY?=?"message:queue";????private?Jedis?jedis;????private?String?producerName;????private?volatile?int?count;? ????public?Producer(String?name)?{????????this.producerName?=?name; ????????init(); ????}? ????private?void?init()?{ ????????jedis?=?MyJedisFactory.getLocalJedis(); ????}? ????public?void?putMessage(String?message)?{ ????????Long?size?=?jedis.lpush(MESSAGE_KEY,?message); ????????System.out.println(producerName?+?":?當前未被處理消息條數為:"?+?size); ????????count++; ????}? ????public?int?getCount()?{????????return?count; ????} ? ????@Override????public?void?run()?{????????try?{????????????while?(true)?{ ????????????????putMessage(StringUtils.generate32Str()); ????????????????TimeUnit.SECONDS.sleep(1); ????????????} ????????}?catch?(InterruptedException?e)?{ ? ????????}?catch?(Exception?e)?{ ????????????e.printStackTrace(); ????????} ????}? ????public?static?void?main(String[]?args)?throws?InterruptedException{ ????????Producer?producer?=?new?Producer("myProducer"); ????????producer.start();? ????????for(;?;)?{ ????????????System.out.println("main?:?已存儲消息條數:"?+?producer.getCount()); ????????????TimeUnit.SECONDS.sleep(10); ????????} ????} }

消息處理端(消息消費者):

package?org.yamikaze.redis.messsage.queue;? import?org.yamikaze.redis.test.MyJedisFactory;import?redis.clients.jedis.Jedis;? /** ?*?消息消費者 ?*?@author?yamikaze?*/public?class?Customer?extends?Thread{? ????private?String?customerName;????private?volatile?int?count;????private?Jedis?jedis;? ????public?Customer(String?name)?{????????this.customerName?=?name; ????????init(); ????}? ????private?void?init()?{ ????????jedis?=?MyJedisFactory.getLocalJedis(); ????}? ????public?void?processMessage()?{ ????????String?message?=?jedis.rpop(Producer.MESSAGE_KEY);????????if(message?!=?NULL)?{ ????????????count++; ????????????handle(message); ????????} ????}? ????public?void?handle(String?message)?{ ????????System.out.println(customerName?+?"?正在處理消息,消息內容是:?"?+?message?+?"?這是第"?+?count?+?"條"); ????} ? ????@Override????public?void?run()?{????????while?(true)?{ ????????????processMessage(); ????????} ????}? ????public?static?void?main(String[]?args)?{ ????????Customer?customer?=?new?Customer("yamikaze"); ????????customer.start(); ????} }

貌似還不錯,但上述例子中消息消費者有一個問題存在,即需要不停的調用rpop方法查看List中是否有待處理消息。每調用一次都會發起一次連接,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,但這樣做有兩個問題:

1)、如果生產者速度大于消費者消費速度,消息隊列長度會一直增大,時間久了會占用大量內存空間。

2)、如果睡眠時間過長,這樣不能處理一些時效性的消息,睡眠時間過短,也會在連接上造成比較大的開銷。

所以可以使用brpop指令,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,于是消費端可以將processMessage可以改為這樣:

public?void?processMessage()?{????/** ?????*?brpop支持多個列表(隊列) ?????*?brpop指令是支持隊列優先級的,比如這個例子中MESSAGE_KEY的優先級大于testKey(順序決定)。 ?????*?如果兩個列表中都有元素,會優先返回優先級高的列表中的元素,所以這兒優先返回MESSAGE_KEY ?????*?0表示不限制等待,會一直阻塞在這兒?????*/ ????List<string>?messages?=?jedis.brpop(0,?Producer.MESSAGE_KEY,?"testKey");????if(messages.size()?!=?0)?{????????//由于該指令可以監聽多個Key,所以返回的是一個列表????????//列表由2項組成,1)?列表名,2)數據 ????????String?keyName?=?messages.get(0);????????//如果返回的是MESSAGE_KEY的消息 ????????if(Producer.MESSAGE_KEY.equals(keyName))?{ ????????????String?message?=?messages.get(1); ????????????handle(message); ????????} ? ????} ????System.out.println("======================="); }</string>

然后可以運行Customer,清空控制臺,可以看到程序沒有任何輸出,阻塞在了brpop這兒。然后在打開Redis的客戶端,輸入指令client list,可以查看當前有兩個連接。

一次生產多次消費的隊列

Redis除了對消息隊列提供支持外,還提供了一組命令用于支持發布/訂閱模式。利用Redis的pub/sub模式可以實現一次生產多次消費的隊列。

1)發布
? ? PUBLISH指令可用于發布一條消息,格式 PUBLISH channel message

? ? 返回值表示訂閱了該消息的數量。
? ? 2)訂閱
? ? SUBSCRIBE指令用于接收一條消息,格式 SUBSCRIBE channel

? ? 可以看到使用SUBSCRIBE指令后進入了訂閱模式,但沒有接收到publish發送的消息,這是因為只有在消息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回復。回復分為三種類型:
? ? 1、如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是第幾個訂閱的頻道?(理解成序號?)?
? ? 2、如果為message(消息),第二個值為產生該消息的頻道,第三個值為消息
? ? 3、如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當前客戶端的訂閱數量。

? ? 可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。
? ?
? ? Redis還支持基于通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

? ?再試試推送消息會得到以下結果:

? ?可以看到publish指令返回的是2,而訂閱端這邊接收了兩次消息。這是因為PSUBSCRIBE指令可以重復訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時PUNSUBSCRIBE指令通配符不會展開。
例如:PUNSUBSCRIBE * 不會匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。

代碼示范如下:

package?org.yamikaze.redis.messsage.subscribe;? import?org.yamikaze.redis.messsage.queue.StringUtils;import?org.yamikaze.redis.test.MyJedisFactory;import?redis.clients.jedis.Jedis;? /** ?*?消息發布方 ?*?@author?yamikaze?*/public?class?Publisher?{? ????public?static?final?String?CHANNEL_KEY?=?"channel:message";????private?Jedis?jedis;? ????public?Publisher()?{ ????????jedis?=?MyJedisFactory.getLocalJedis(); ????}? ????public?void?publishMessage(String?message)?{????????if(StringUtils.isBlank(message))?{????????????return; ????????} ????????jedis.publish(CHANNEL_KEY,?message); ????}? ????public?static?void?main(String[]?args)?{ ????????Publisher?publisher?=?new?Publisher(); ????????publisher.publishMessage("Hello?Redis!"); ????} }

簡單的發送一個消息。

消息訂閱方:

package?org.yamikaze.redis.messsage.subscribe;? import?org.yamikaze.redis.test.MyJedisFactory;import?redis.clients.jedis.Jedis;import?redis.clients.jedis.JedisPubSub;? import?java.util.concurrent.TimeUnit;? /** ?*?消息訂閱方客戶端 ?*?@author?yamikaze?*/public?class?SubscribeClient?{? ????private?Jedis?jedis;????private?static?final?String?EXIT_COMMAND?=?"exit";? ????public?SubscribeClient()?{ ????????jedis?=?MyJedisFactory.getLocalJedis(); ????}? ????public?void?subscribe(String?...channel)?{????????if(channel?==?null?||?channel.length?<p>先運行client,再運行Publisher進行消息發送,輸出結果:</p><p><img src="https://img.php.cn/upload/article/000/000/020/17a7d7dbee3e472ecb977109a62f0631-1.png" alt=""></p><p><span   style="max-width:90%">Redis的pub/sub也有其缺點,那就是如果消費者下線,生產者的消息會丟失。</span></p><h2>延時隊列</h2><h3>背景</h3><p>在業務發展過程中,會出現一些需要延時處理的場景,比如:</p><p>a.訂單下單之后超過30分鐘用戶未支付,需要取消訂單<br>b.訂單一些評論,如果48h用戶未對商家評論,系統會自動產生一條默認評論<br>c.點我達訂單下單后,超過一定時間訂單未派出,需要超時取消訂單等。。。<br>處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在數據量不大的場景下是完全沒問題,但是當數據量大的時候高頻的輪訓數據庫就會比較的耗資源,導致數據庫的慢查或者查詢超時。所以在處理這類需求時候,采用了延時隊列來完成。</p><h3>幾種延時隊列</h3><p>延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列:</p><h4>1.Java中java.util.concurrent.DelayQueue</h4><p>優點:JDK自身實現,使用方便,量小適用<br>缺點:隊列消息處于jvm內存,不支持分布式運行和消息持久化</p><h4>2.rocketmq延時隊列</h4><p>優點:消息持久化,分布式<br>缺點:不支持任意時間精度,只支持特定level的延時消息</p><h4>3.rabbitmq延時隊列(TTL+DLX實現)</h4><p>優點:消息持久化,分布式<br>缺點:延時相同的消息必須扔在同一個隊列</p><h3>Redis實現的延時消息隊列適合的項目特點:</h3>
  • spring框架管理對象
  • 有消息需求,但不想維護mq中間件
  • 有使用redis
  • 對消息持久化并沒有很苛刻的要求

Redis實現的延時消息隊列思路

Redis由于其自身的Zset數據結構,本質就是Set結構上加了個排序的功能,除了添加數據value之外,還提供另一屬性score,這一屬性在添加修改元素時候可以指定,每次指定后,Zset會自動重新按新的值調整順序。可以理解為有兩列字段的數據表,一列存value,一列存順序編號。操作中key理解為zset的名字,那么對延時隊列又有何用呢?

試想如果score代表的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它變會按照時間戳大小進行排序,也就是對執行時間前后進行排序,這樣的話,起一個死循環線程不斷地進行取第一個key值,如果當前時間戳大于等于該key值的socre就將它取出來進行消費刪除,就可以達到延時執行的目的, 注意不需要遍歷整個Zset集合,以免造成性能浪費。

Zset的排列效果如下圖:

詳解Redis和隊列

java代碼實現如下:

package?cn.chinotan.service.delayQueueRedis;import?org.apache.commons.lang3.StringUtils;import?redis.clients.jedis.Jedis;import?redis.clients.jedis.JedisPool;import?redis.clients.jedis.Tuple;import?java.text.SimpleDateFormat;import?java.util.Calendar;import?java.util.Date;import?java.util.Set;import?java.util.concurrent.CountDownLatch;import?java.util.concurrent.TimeUnit;/** ?*?@program:?test ?*?@description:?redis實現延時隊列 ?*?@author:?xingcheng ?*?@create:?2018-08-19 ?**/public?class?AppTest?{????private?static?final?String?ADDR?=?"127.0.0.1";????private?static?final?int?PORT?=?6379;????private?static?JedisPool?jedisPool?=?new?JedisPool(ADDR,?PORT);????private?static?CountDownLatch?cdl?=?new?CountDownLatch(10);????public?static?Jedis?getJedis()?{????????return?jedisPool.getResource(); ????}????/** ?????*?生產者,生成5個訂單?????*/ ????public?void?productionDelayMessage()?{????????for?(int?i?=?0;?i??order?=?jedis.zrangeWithScores("orderId",?0,?0);????????????if?(order?==?null?||?order.isEmpty())?{ ????????????????System.out.println("當前沒有等待的任務");????????????????try?{ ????????????????????TimeUnit.MICROSECONDS.sleep(500); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????}????????????????continue; ????????????} ????????????Tuple?tuple?=?(Tuple)?order.toArray()[0];????????????double?score?=?tuple.getScore(); ????????????Calendar?instance?=?Calendar.getInstance();????????????long?nowTime?=?instance.getTimeInMillis()?/?1000;????????????if?(nowTime?&gt;=?score)?{ ????????????????String?element?=?tuple.getElement(); ????????????????Long?orderId?=?jedis.zrem("orderId",?element);????????????????if?(orderId?&gt;?0)?{ ????????????????????System.out.println(new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss").format(new?Date())?+?":redis消費了一個任務:消費的訂單OrderId為"?+?element); ????????????????} ????????????} ????????} ????}????static?class?DelayMessage?implements?Runnable{ ????????@Override????????public?void?run()?{????????????try?{ ????????????????cdl.await(); ????????????????consumerDelayMessage(); ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????} ????}???? ????public?static?void?main(String[]?args)?{ ????????AppTest?appTest?=?new?AppTest(); ????????appTest.productionDelayMessage();????????for?(int?i?=?0;?i?<p>實現效果如下:</p><p><img src="https://img.php.cn/upload/article/000/000/020/513545df87e39a5f0af8877d9b3550f2-3.jpg" alt=""    style="max-width:90%"  style="max-width:90%"></p>

? 版權聲明
THE END
喜歡就支持一下吧
點贊9 分享