swoole如何做消息通知

swoole如何做消息通知

基于swoole、redis做一個(gè)消息通知功能

利用swoole開啟常駐進(jìn)程,需要幾個(gè)按自己的情況來定,swoole進(jìn)程數(shù)最好是和服務(wù)器cpu核數(shù)相等? ? ?(推薦學(xué)習(xí): swoole視頻教程

利用swoole啟動(dòng)的常駐進(jìn)程不斷的去探測redis隊(duì)列里面的值,可以按鍵值來做一個(gè)快中慢這樣的權(quán)重處理,需要急需處理,數(shù)據(jù)量大的可以用多幾個(gè)進(jìn)程,一般的可以分配不同的進(jìn)程數(shù)來執(zhí)行。

下面上代碼:

swoole啟動(dòng)代碼

function?run() { ????try?{ ????????$swoole?=?new?swoole_server(127.0.0.1,?9999); ????????$swoole->set([ ????????????'daemonize'?=>?1,?//是否開啟守護(hù)進(jìn)程 ????????????'worker_num'?=>?8,?//實(shí)際需要去設(shè)定 ????????????'log_file'?=>?__APP_LOGS_PATH__?.?'/swoole.log' ????????]); ????????$swoole->on('WorkerStart',?'onWorkerStart'); ????????$swoole->on('Receive',?'onReceive'); ????????$swoole->start(); ????}?catch?(Exception?$e)?{ ????????logs(['err_code'?=>?$e->getCode(),?'err_msg'?=>?$e->getMessage()],?'error'); ????} }

swoole實(shí)時(shí)監(jiān)測redis隊(duì)列里的數(shù)據(jù),根據(jù)鍵值進(jìn)行權(quán)重排比

代碼

function?onWorkerStart(swoole_server?$swoole,?$worker_id) { ????$chQuick?=?[0,?1,?2,?3]; ????$chNormal?=?[4,?5]; ????$chSlow?=?[6]; ????for?($i?=?1;?$i?llen(QUEUE_QUICK)) ????????????????$keys[]?=?QUEUE_QUICK; ????????????if?($keys) ????????????????$queueData?=?$redis->brpop(QUEUE_QUICK,?5); ????????}?elseif?(in_array($worker_id,?$chNormal))?{ ????????????if?($redis->llen(QUEUE_NORMAL)) ????????????????$keys[]?=?QUEUE_NORMAL; ????????????if?($redis->llen(QUEUE_QUICK)) ????????????????$keys[]?=?QUEUE_QUICK; ????????????if?($keys) ????????????????$queueData?=?$redis->brpop(QUEUE_NORMAL,?QUEUE_QUICK,?5); ????????}?elseif?(in_array($worker_id,?$chSlow))?{ ????????????if?($redis->llen(QUEUE_SLOW)) ????????????????$keys[]?=?QUEUE_SLOW; ????????????if?($redis->llen(QUEUE_NORMAL)) ????????????????$keys[]?=?QUEUE_NORMAL; ????????????if?($redis->llen(QUEUE_QUICK)) ????????????????$keys[]?=?QUEUE_QUICK; ????????????if?($keys) ????????????????$queueData?=?$redis->brpop(QUEUE_SLOW,?QUEUE_QUICK,?QUEUE_NORMAL,?5); ????????}?else?{ ????????????if?($redis->llen(QUEUE_FAIL)) ????????????????$keys[]?=?QUEUE_FAIL; ????????????if?($redis->llen(QUEUE_SLOW)) ????????????????$keys[]?=?QUEUE_SLOW; ????????????if?($redis->llen(QUEUE_NORMAL)) ????????????????$keys[]?=?QUEUE_NORMAL; ????????????if?($redis->llen(QUEUE_QUICK)) ????????????????$keys[]?=?QUEUE_QUICK; ????????????if?($keys) ????????????????$queueData?=?$redis->brpop(QUEUE_FAIL,?QUEUE_QUICK,?QUEUE_NORMAL,?QUEUE_SLOW,?5); ????????} ????????logs('test'.$keys.'%%'.$queueData); ????????if?($queueData)?{ ????????????$queueName?=?$queueData[0]; ????????????$message?=?$queueData[1]; ????????????if?($worker_id?==?QUEUE_FAIL_WORKER_ID?&&?$queueName?==?QUEUE_FAIL)?{ ????????????????call_user_func_array('retryPostMessage',?[&$message,?&$redis]); ????????????}?else?{ ????????????????call_user_func_array('postMessage',?[&$message,?&$redis]); ????????????} ????????} ????????else ????????{ ????????????sleep(5); ????????} ????} sleep(10); ????$redis->close(); ????unset($redis); ????method_exists($swoole,?'stop')???$swoole->stop()?:?@exit; }

里面的for循環(huán)是為了配合sleep函數(shù)來使用,三次失敗的可以記入失敗,可以手動(dòng)去處理。

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊13 分享