下面由laravel教程欄目給大家介紹laravel-swoole消息隊列,希望對需要的朋友有所幫助!
這段時間用laravel8+laravel-swoole做項目,可發(fā)現(xiàn)laravel-swoole的擴展不兼容消息隊列;
思來想去這咋辦呢,這咋辦呢.咋辦那就自己寫咯!還好thinkphp-swoole擴展已經(jīng)兼容了,那不就嘿嘿嘿!
直接上修改的思路和代碼!開干!
一種是增加另外啟動的命令或者在swoole啟動的時候一起啟動消息隊列進行消費,我這么懶的人一個命令能解決的,絕不寫兩命令.
首先重寫swoole啟動命令
<?php namespace crmebswoolecommand; use IlluminateSupportArr; use SwooleProcess; use SwooleTWhttpServerFacadesServer; use SwooleTWHttpServerManager; use crmebswooleserverInteractsWithQueue; use crmebswooleserverFileWatcher; use SwooleRuntime; class HttpServerCommand extends SwooleTWHttpCommandsHttpServerCommand { use InteractsWithQueue; /** * The name and signature of the console command. * * @var string */ protected $signature = 'crmeb:http {action : start|stop|restart|reload|infos}'; /** * Run swoole_http_server. */ protected function start() { if ($this->isRunning())?{ ????????????$this->error('Failed!?swoole_http_server?process?is?already?running.'); ????????????return; ????????} ????????$host?????????????=?Arr::get($this->config,?'server.host'); ????????$port?????????????=?Arr::get($this->config,?'server.port'); ????????$hotReloadEnabled?=?Arr::get($this->config,?'hot_reload.enabled'); ????????$queueEnabled?????=?Arr::get($this->config,?'queue.enabled'); ????????$accessLogEnabled?=?Arr::get($this->config,?'server.access_log'); ????????$coroutineEnable??=?Arr::get($this->config,?'coroutine.enable'); ????????$this->info('Starting?swoole?http?server...'); ????????$this->info("Swoole?http?server?started:?<http:>"); ????????if?($this->isDaemon())?{ ????????????$this->info( ????????????????'>?(You?can?run?this?command?to?ensure?the?'?. ????????????????'swoole_http_server?process?is?running:?ps?aux|grep?"swoole")' ????????????); ????????} ????????$manager?=?$this->laravel->make(Manager::class); ????????$server??=?$this->laravel->make(Server::class); ????????if?($accessLogEnabled)?{ ????????????$this->registerAccessLog(); ????????} ????????//熱更新重寫 ????????if?($hotReloadEnabled)?{ ????????????$manager->addProcess($this->getHotReloadProcessNow($server)); ????????} ????????//啟動消息隊列進行消費 ????????if?($queueEnabled)?{ ????????????$this->prepareQueue($manager); ????????} ????????if?($coroutineEnable)?{ ????????????Runtime::enableCoroutine(true,?Arr::get($this->config,?'coroutine.flags',?SWOOLE_HOOK_ALL)); ????????} ????????$manager->run(); ????} ????/** ?????*?@param?Server?$server ?????*?@return?Process|void ?????*/ ????protected?function?getHotReloadProcessNow($server) ????{ ????????return?new?Process(function?()?use?($server)?{ ????????????$watcher?=?new?FileWatcher( ????????????????Arr::get($this->config,?'hot_reload.include',?[]), ????????????????Arr::get($this->config,?'hot_reload.exclude',?[]), ????????????????Arr::get($this->config,?'hot_reload.name',?[]) ????????????); ????????????$watcher->watch(function?()?use?($server)?{ ????????????????$server->reload(); ????????????}); ????????},?false,?0,?true); ????} }</http:>
InteractsWithQueue 類
<?php namespace crmebswooleserver; use crmebswoolequeueManager as QueueManager; use SwooleTWHttpServerManager; /** * Trait InteractsWithQueue * @package crmebswooleserver */ trait InteractsWithQueue { public function prepareQueue(Manager $manager) { /** @var QueueManager $queueManager */ $queueManager = $this->laravel->make(QueueManager::class); ????????$queueManager->attachToServer($manager,?$this->output); ????} }
Manager類
<?php namespace crmebswoolequeue; use IlluminateContractsContainerContainer; use SwooleConstant; use SwooleProcess; use SwooleProcessPool; use SwooleTimer; use IlluminateSupportArr; use IlluminateQueueEventsJobFailed; use IlluminateQueueWorker; use crmebswooleserverWithContainer; use IlluminateQueueJobsJob; use function SwooleCoroutinerun; use IlluminateQueueWorkerOptions; use SwooleTWHttpServerManager as ServerManager; use IlluminateConsoleOutputStyle; class Manager { use WithContainer; /** * Container. * * @var IlluminateContractsContainerContainer */ protected $container; /** * @var OutputStyle */ protected $output; /** * @var Closure[] */ protected $workers = []; /** * Manager constructor. * @param Container $container */ public function __construct(Container $container) { $this->container?=?$container; ????} ????/** ?????*?@param?ServerManager?$server ?????*/ ????public?function?attachToServer(ServerManager?$server,?OutputStyle?$output) ????{ ????????$this->output?=?$output; ????????$this->listenForEvents(); ????????$this->createWorkers(); ????????foreach?($this->workers?as?$worker)?{ ????????????$server->addProcess(new?Process($worker,?false,?0,?true)); ????????} ????} ????/** ?????*?運行消息隊列命令 ?????*/ ????public?function?run():?void ????{ ????????@cli_set_process_title("swoole?queue:?manager?process"); ????????$this->listenForEvents(); ????????$this->createWorkers(); ????????$pool?=?new?Pool(count($this->workers)); ????????$pool->on(Constant::EVENT_WORKER_START,?function?(Pool?$pool,?int?$workerId)?{ ????????????$process?=?$pool->getProcess($workerId); ????????????run($this->workers[$workerId],?$process); ????????}); ????????$pool->start(); ????} ????/** ?????*?創(chuàng)建執(zhí)行任務(wù) ?????*/ ????protected?function?createWorkers() ????{ ????????$workers?=?$this->getConfig('queue.workers',?[]); ????????foreach?($workers?as?$queue?=>?$options)?{ ????????????if?(strpos($queue,?'@')?!==?false)?{ ????????????????[$queue,?$connection]?=?explode('@',?$queue); ????????????}?else?{ ????????????????$connection?=?null; ????????????} ????????????$this->workers[]?=?function?(Process?$process)?use?($options,?$connection,?$queue)?{ ????????????????@cli_set_process_title("swoole?queue:?worker?process"); ????????????????/**?@var?Worker?$worker?*/ ????????????????$worker?=?$this->container->make('queue.worker'); ????????????????/**?@var?WorkerOptions?$option?*/ ????????????????$option?=?$this->container->make(WorkerOptions::class); ????????????????$option->sleep?=?Arr::get($options,?"sleep",?3); ????????????????$option->maxTries?=?Arr::get($options,?"tries",?0); ????????????????$option->timeout?=?Arr::get($options,?"timeout",?60); ????????????????$timer?=?Timer::after($option->timeout?*?1000,?function?()?use?($process)?{ ????????????????????$process->exit(); ????????????????}); ????????????????$worker->runNextJob($connection,?$queue,?$option); ????????????????Timer::clear($timer); ????????????}; ????????} ????} ????/** ?????*?注冊事件 ?????*/ ????protected?function?listenForEvents() ????{ ????????$this->container->make('events')->listen(JobFailed::class,?function?(JobFailed?$event)?{ ????????????$this->writeOutput($event->job); ????????????$this->logFailedJob($event); ????????}); ????} ????/** ?????*?記錄失敗任務(wù) ?????*?@param?JobFailed?$event ?????*/ ????protected?function?logFailedJob(JobFailed?$event) ????{ ????????$this->container['queue.failer']->log( ????????????$event->connection, ????????????$event->job->getQueue(), ????????????$event->job->getRawBody(), ????????????$event->exception ????????); ????} ????/** ?????*?Write?the?status?output?for?the?queue?worker. ?????* ?????*?@param?Job?$job ?????*?@param?????$status ?????*/ ????protected?function?writeOutput(Job?$job,?$status) ????{ ????????switch?($status)?{ ????????????case?'starting': ????????????????$this->writeStatus($job,?'Processing',?'comment'); ????????????????break; ????????????case?'success': ????????????????$this->writeStatus($job,?'Processed',?'info'); ????????????????break; ????????????case?'failed': ????????????????$this->writeStatus($job,?'Failed',?'error'); ????????????????break; ????????} ????} ????/** ?????*?Format?the?status?output?for?the?queue?worker. ?????* ?????*?@param?Job?$job ?????*?@param?string?$status ?????*?@param?string?$type ?????*?@return?void ?????*/ ????protected?function?writeStatus(Job?$job,?$status,?$type) ????{ ????????$this->output->writeln(sprintf( ????????????"[%s][%s]?%s{$type}>?%s", ????????????date('Y-m-d?H:i:s'), ????????????$job->getJobId(), ????????????str_pad("{$status}:",?11),?$job->getName() ????????)); ????} }
增加CrmebServiceProvider類
<?php namespace crmebswoole; use IlluminateContractsDebugExceptionHandler; use IlluminateContractsHttpKernel; use crmebswoolecommandHttpServerCommand; use IlluminateQueueWorker; use SwooleTWHttpHttpServiceProvider; use SwooleTWHttpMiddlewareAccessLog; use SwooleTWHttpServerManager; /** * Class CrmebServiceProvider * @package crmebswoole */ class CrmebServiceProvider extends HttpServiceProvider { /** * Register manager. * * @return void */ protected function registerManager() { $this->app->singleton(Manager::class,?function?($app)?{ ????????????return?new?Manager($app,?'laravel'); ????????}); ????????$this->app->alias(Manager::class,?'swoole.manager'); ????????$this->app->singleton('queue.worker',?function?($app)?{ ????????????$isDownForMaintenance?=?function?()?{ ????????????????return?$this->app->isDownForMaintenance(); ????????????}; ????????????return?new?Worker( ????????????????$app['queue'], ????????????????$app['events'], ????????????????$app[ExceptionHandler::class], ????????????????$isDownForMaintenance ????????????); ????????}); ????} ????/** ?????*?Boot?websocket?routes. ?????* ?????*?@return?void ?????*/ ????protected?function?bootWebsocketRoutes() ????{ ????????require?base_path('vendor/swooletw/laravel-swoole')?.?'/routes/laravel_routes.php'; ????} ????/** ?????*?Register?access?log?middleware?to?container. ?????* ?????*?@return?void ?????*/ ????protected?function?pushAccessLogMiddleware() ????{ ????????$this->app->make(Kernel::class)->pushMiddleware(AccessLog::class); ????} ????/** ?????*?Register?commands. ?????*/ ????protected?function?registerCommands() ????{ ????????$this->commands([ ????????????HttpServerCommand::class, ????????]); ????} ????/** ?????*?Merge?configurations. ?????*/ ????protected?function?mergeConfigs() ????{ ????????$this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_http.php',?'swoole_http'); ????????$this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_websocket.php',?'swoole_websocket'); ????} ????/** ?????*?Publish?files?of?this?package. ?????*/ ????protected?function?publishFiles() ????{ ????????$this->publishes([ ????????????base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_http.php'?=>?base_path('config/swoole_http.php'), ????????????base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_websocket.php'?=>?base_path('config/swoole_websocket.php'), ????????????base_path('vendor/swooletw/laravel-swoole')?.?'/routes/websocket.php'?=>?base_path('routes/websocket.php'), ????????],?'laravel-swoole'); ????} }
然后再把crmebswooleCrmebServiceProvider::class放入config/app.php中的providers中加載重寫了swoole的命令啟動方式
配置config/swoole_http.php
return [ 'queue' => [ //是否開啟自動消費隊列 'enabled' => true, 'workers' => [ //隊列名稱 'CRMEB' => [] ] ],];
輸入命令:
php artisan crmeb:http restart
swoole啟動后就可以自動消費隊列了。
相關(guān)推薦:最新的五個Laravel視頻教程
? 版權(quán)聲明
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載。
THE END
喜歡就支持一下吧
相關(guān)推薦