本篇文章給大家分享一個swoole高并發(fā)聚合請求實例,介紹在高并發(fā)場景下如何通過聚合請求,充分利用數(shù)據(jù)庫的批量處理更高效地實現(xiàn)業(yè)務功能。此示例僅用作拋磚引玉,希望能激發(fā)大家更深入的思考。
相關視頻課程推薦:《千萬級數(shù)據(jù)并發(fā)解決方案(理論+實戰(zhàn))》分享一些高并發(fā)面試題:15個PHP關于高并發(fā)的面試題(總結(jié))
本示例選取的背景是并發(fā)下單業(yè)務。常規(guī)情況下,后端創(chuàng)建訂單是逐條 insert 的操作。在并發(fā)較低的時候,數(shù)據(jù)庫的 insert 操作的確能保持不錯的效率,但是當遇到請求數(shù)量增多,數(shù)據(jù)庫 頻繁地單次 insert 就會讓下單業(yè)務整體效率變低 (本文簡單地假設1次下單=1個 insert)。
通過上面的描述,其實已經(jīng)很容易想到需要優(yōu)化的地方了。類比現(xiàn)實生活中乘坐電梯的場景:一架電梯 裝滿后再上行,可以最快地緩解人流壓力。
下面我們就來用代碼簡單實現(xiàn)一下我們思路:
<?php swooleRuntime::enableCoroutine($flags = SWOOLE_HOOK_ALL); // 最大等待次數(shù) const MAX_TIMES = 10; // 按批處理時, 每一批的最大請求暫留數(shù)量 const MAX_REQUEST = 3; // 服務端最大超時時間, 避免客戶端一直等待 const MAX_TIMEOUT = 5; Corun(function () { // 請求傳輸?shù)腸hannel, 原因是不要在swoole的協(xié)程環(huán)境中, 使用多個協(xié)程修改同一個全局變量 // 如果是golang, 當然是可以不定義這里的$rqChannel // 只需要簡單的將下面的$rqQueue和$times定義為全局變量即可達到一樣的效果 // 但是最好的方式任然是是通過channel共享內(nèi)存 $rqChannel = new SwooleCoroutineChannel(MAX_REQUEST); // 模擬創(chuàng)建訂單 $createOrder = function () use ($rqChannel) { // 使用數(shù)組模擬請求暫留隊列 $rqQueue = []; // 使用等待次數(shù)模擬tick效果 $times = MAX_TIMES; while (true) { $times--; // 必須帶上timeout參數(shù), 否則channel是阻塞的 $rq = $rqChannel->pop(1); ????????????//?保存1個正常的請求數(shù)據(jù) ????????????if?(!empty($rq))?{ ????????????????$rqQueue[]?=?$rq; ????????????} ????????????//?請求數(shù)量未達上限或者還有等待次數(shù)時,?提前進入下一次循環(huán) ????????????if?($times?>?0?&&?count($rqQueue)??$rq)?{ ????????????????list($data,?$chan)?=?$rq; ????????????????//?這里可以考慮后置執(zhí)行,?原因是后面可以有一些補救邏輯 ????????????????unset($rqQueue[$index]); ????????????????//?判斷$chan是否關閉? ????????????????if?($chan->errCode?===?SWOOLE_CHANNEL_CLOSED)?{ ????????????????????$data?=?null; ????????????????????continue; ????????????????} ????????????????$bool?=?$validator($data); ????????????????if?($bool)?{ ????????????????????$inserts[]?=?"({$data['user_name']},?{$data['amount']},?{$data['mobile']})"; ????????????????????$chan->push(['state'?=>?1]); ????????????????}?else?{ ????????????????????$chan->push(['state'?=>?0]); ????????????????} ????????????????//?unset($rqQueue[$index]); ????????????} ????????????$sql?.=?(implode(',',?$inserts)?.?';'); ????????????//?模擬創(chuàng)建訂單落庫的邏輯 ????????????echo?$sql; ????????} ????}; ????//?新手要注意這一句代碼的位置,?原因是?$server->start()?之后的代碼不會執(zhí)行 ????go($createOrder); ????//?路由處理器 ????$orderHandler?=?function?($rq,?$res)?use?($rqChannel)?{ ????????$chan?=?new?SwooleCoroutineChannel(1); ????????//?使用timeout參數(shù)模擬超時 ????????$bool?=?$rqChannel->push([$rq->post,?$chan],?MAX_TIMEOUT); ????????if?(!$bool)?{ ????????????//?關閉$chan ????????????$chan->close(); ????????????$res->end('timeout'); ????????} ????????if?(!empty($data?=?$chan->pop()))?{ ????????????//?關閉$chan ????????????$chan->close(); ????????????//?區(qū)分成功或失敗狀態(tài)再輸出響應 ????????????if?($data['state']?===?1)?{ ????????????????$res->end(microtime()); ????????????}?else?{ ????????????????$res->end('error'); ????????????} ????????} ????}; ????$server?=?new?CoHttpServer("0.0.0.0",?9502,?false); ????$server->handle('/order/create',?$orderHandler); ????//?當前協(xié)程容器的終點 ????$server->start(); });
代碼整體上還是很容易理解的,變量 $rqQueue 就是類比電梯,暫留請求等待一定時間的次數(shù) $times 就是類比電梯需要等待人流依次進入。當然最在希望讀者注意的一點是:在協(xié)程環(huán)境下,不要使用共享內(nèi)存而通信,應該使用通信來共享內(nèi)存。
推薦學習:?swoole教程
? 版權聲明
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載。
THE END