一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

本篇文章給大家?guī)砹岁P(guān)于thinkphp的相關(guān)知識,其中主要整理了使用think-queue實現(xiàn)redis消息隊列的相關(guān)問題,下面一起來看一下,希望對大家有幫助。

一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

推薦學(xué)習(xí):《thinkphp

簡單介紹:

消息隊列中間件是大型系統(tǒng)中的重要組件,已經(jīng)逐漸成為企業(yè)系統(tǒng)內(nèi)部通信的核心手段。它具有松耦合、異步消息、流量削峰、可靠投遞、廣播、流量控制、最終一致性等一系列功能,已經(jīng)成為異步rpc的主要手段之一。

大白話:

消息隊列有兩個角色和一個容器,角色分別為生產(chǎn)者(負(fù)責(zé)發(fā)布任務(wù))和消費者(負(fù)責(zé)執(zhí)行任務(wù)),容器這是用來存放/積生產(chǎn)者發(fā)布的任務(wù),將發(fā)布和執(zhí)行兩個步驟分開且互不影響。

立即學(xué)習(xí)PHP免費學(xué)習(xí)筆記(深入)”;

消息隊列的大致流程為:

生產(chǎn)者發(fā)布任務(wù)存放/堆積在消息隊列中,由消費者主動去消息隊列中取出任務(wù)并執(zhí)行,先發(fā)布的先執(zhí)行(隊列:先進(jìn)先出),在沒有消費者的情況下任務(wù)會堆積在隊列中等待被取出執(zhí)行。

優(yōu)點:

消息隊列適用于大并發(fā)或者處理時間長并需要批量操作的第三方接口,可用于但不僅限于短信發(fā)送、郵件發(fā)送、APP推送等,支持跨系統(tǒng),即本系統(tǒng)發(fā)布的消息隊列可以由自己或者給其他系統(tǒng)執(zhí)行任務(wù),同理本系統(tǒng)也可以作為消費者執(zhí)行自己或者其他系統(tǒng)發(fā)布的消息隊列任務(wù)。


接下來主要介紹一下 think-queue 的使用

Thinkphp的Queue內(nèi)置了 Redis、database、Topthink、Sync四種驅(qū)動,這里使用的是 Redis,也推薦使用 Redis

think-queue 隊列消息可以進(jìn)行任務(wù)的發(fā)布、獲取、執(zhí)行、刪除、重新發(fā)布、延遲發(fā)布、超時控制等操作

消息隊列基本配置

在 extra 目錄下創(chuàng)建 queue.php 配置文件

<?phpreturn [     'connector'  => 'Redis',     'expire'     => null,   // 任務(wù)過期時間,默認(rèn)為60秒,若要禁用,則設(shè)置為 null     'default'    => 'REDIS_QUEUE',  // 默認(rèn)的隊列名     'host'       => '127.0.0.1',   // redis 主機ip     'port'       => 6379,   // redis 端口     'password'   => '',   // redis 密碼     'select'     => 0,   // 使用哪里一個 db,默認(rèn)為 db0     'timeout'    => 0,   // redis 連接的超時時間     'persistent' => false,   // 是否是長連接];

至于為什么放在這里,是因為 Queue 源代碼默認(rèn)從 extra 讀取 queue 文件獲取配置信息,如果想要將配置文件放置其他地方,則需要對應(yīng)去修改源代碼中的默認(rèn)獲取配置,如下圖所示

一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

生產(chǎn)者

創(chuàng)建一個測試類,寫入生產(chǎn)者方法

<?phpnamespace appapicontroller;use thinkController;use thinkQueue;class Test extends Controller{     // 生產(chǎn)者,添加消息隊列     public function addQueue()     {         // 參數(shù)         $data = [             'id' => rand(0, 99),             'userName' => '一起摸魚'         ];          // 消息隊列名         $queueName = 'testQueue';          // 推入消息隊列,注意這里的 ::class 是PHP5.5才有的寫法         $isPushed = Queue::push(TestQueue::class, $data, $queueName); 		// PHP5.5以下的可以直接寫命名空間         // $isPushed = Queue::push('appcommonqueueTestQueue', $data, $queueName);                  if ($isPushed !== false) {             // 成功之后的業(yè)務(wù)             echo '隊列加入成功';         } else {             // 失敗之后的業(yè)務(wù)             echo '隊列加入失敗';         }     }}

消費者

創(chuàng)建一個 TestQueue 類,用做消費者,執(zhí)行消息隊列中的任務(wù)

<?phpnamespace appcommonqueue;use thinkLog;use thinkqueueJob;class TestQueue{     // 消費者執(zhí)行入口     public function fire(Job $job, $data)     {         // 具體執(zhí)行業(yè)務(wù)         $isJobDone = $this->doJob($data);                  if ($isJobDone) {             // 消息隊列執(zhí)行成功,刪除隊列,否則會一直執(zhí)行             $job->delete();         } else {             // 消息隊列執(zhí)行失敗             // 獲取消息隊列已經(jīng)重試了幾遍             $attempts = $job->attempts();             if ($attempts == 0 || $attempts == 1) {                 // 重新發(fā)布,參數(shù) delay 是延時發(fā)布的時間                 $job->release(2);             }         }     }      // 消息隊列執(zhí)行失敗后會自動執(zhí)行該方法     public function failed($data)     {         Log::error('消息隊列達(dá)到最大重復(fù)執(zhí)行次數(shù)后失敗:' . json_encode($data));     }      // 消息隊列執(zhí)行方法     public function doJob($data)     {         // 具體執(zhí)行業(yè)務(wù)                           $data = json_encode($data);         echo '消息隊列:' . $data;         // 這里的判斷條件以具體業(yè)務(wù)是否執(zhí)行成功進(jìn)行判斷         if ($data) {             echo "執(zhí)行成功";             return true;         } else {             echo "執(zhí)行失敗";             return false;         }     }}

運行結(jié)果

請求接口,生產(chǎn)者發(fā)布任務(wù)

一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

redis 隊列存放任務(wù)

一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

接下來就是啟用隊列的監(jiān)聽模式了,因為不可能每次一有任務(wù)加進(jìn)來就去手動執(zhí)行一次隊列。隊列的監(jiān)聽模式有兩種,配置參數(shù)如下:

一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

項目根目錄執(zhí)行

php think queue:work –queue 隊列名

開啟消費者,執(zhí)行任務(wù)
一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

redis 隊列中的任務(wù)執(zhí)行后也被刪除

一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

但是由于需要,我們還要將消費者掛起守護(hù)進(jìn)程執(zhí)行,以確保關(guān)掉終端還能夠啟動隊列。

nohup php think queue:listen –queue 隊列名 &

一文教你ThinkPHP使用think-queue實現(xiàn)redis消息隊列

PS:shell中輸入exit來退出終端
PS:shell中輸入exit來退出終端
PS:shell中輸入exit來退出終端

因為在nohup執(zhí)行成功后直接點關(guān)閉程序按鈕關(guān)閉終端時會斷掉該命令所對應(yīng)的Session,導(dǎo)致 nohup 對應(yīng)的進(jìn)程被通知需要一起關(guān)掉。

至此,整個消息隊列流程就結(jié)束了。

推薦學(xué)習(xí):《thinkphp

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點贊14 分享