laravel8中l(wèi)aravel-swoole的擴展不兼容消息隊列怎么辦?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

下面由laravel教程欄目給大家介紹laravel-swoole消息隊列,希望對需要的朋友有所幫助!

這段時間用laravel8+laravel-swoole做項目,可發(fā)現(xiàn)laravel-swoole的擴展不兼容消息隊列;
laravel8中l(wèi)aravel-swoole的擴展不兼容消息隊列怎么辦?

思來想去這咋辦呢,這咋辦呢.咋辦那就自己寫咯!還好thinkphp-swoole擴展已經(jīng)兼容了,那不就嘿嘿嘿!
laravel8中l(wèi)aravel-swoole的擴展不兼容消息隊列怎么辦?

直接上修改的思路和代碼!開干!

一種是增加另外啟動的命令或者在swoole啟動的時候一起啟動消息隊列進行消費,我這么懶的人一個命令能解決的,絕不寫兩命令.

首先重寫swoole啟動命令

<?php namespace crmebswoolecommand;   use IlluminateSupportArr; use SwooleProcess; use SwooleTWhttpServerFacadesServer; use SwooleTWHttpServerManager; use crmebswooleserverInteractsWithQueue; use crmebswooleserverFileWatcher; use SwooleRuntime;  class HttpServerCommand extends SwooleTWHttpCommandsHttpServerCommand {     use InteractsWithQueue;      /**      * The name and signature of the console command.      *      * @var string      */     protected $signature = &#39;crmeb:http {action : start|stop|restart|reload|infos}&#39;;      /**      * Run swoole_http_server.      */     protected function start()     {         if ($this->isRunning())?{ ????????????$this-&gt;error('Failed!?swoole_http_server?process?is?already?running.');  ????????????return; ????????}  ????????$host?????????????=?Arr::get($this-&gt;config,?'server.host'); ????????$port?????????????=?Arr::get($this-&gt;config,?'server.port'); ????????$hotReloadEnabled?=?Arr::get($this-&gt;config,?'hot_reload.enabled'); ????????$queueEnabled?????=?Arr::get($this-&gt;config,?'queue.enabled'); ????????$accessLogEnabled?=?Arr::get($this-&gt;config,?'server.access_log'); ????????$coroutineEnable??=?Arr::get($this-&gt;config,?'coroutine.enable');  ????????$this-&gt;info('Starting?swoole?http?server...'); ????????$this-&gt;info("Swoole?http?server?started:?<http:>"); ????????if?($this-&gt;isDaemon())?{ ????????????$this-&gt;info( ????????????????'&gt;?(You?can?run?this?command?to?ensure?the?'?. ????????????????'swoole_http_server?process?is?running:?ps?aux|grep?"swoole")' ????????????); ????????}  ????????$manager?=?$this-&gt;laravel-&gt;make(Manager::class); ????????$server??=?$this-&gt;laravel-&gt;make(Server::class);  ????????if?($accessLogEnabled)?{ ????????????$this-&gt;registerAccessLog(); ????????} ????????//熱更新重寫 ????????if?($hotReloadEnabled)?{ ????????????$manager-&gt;addProcess($this-&gt;getHotReloadProcessNow($server)); ????????} ????????//啟動消息隊列進行消費 ????????if?($queueEnabled)?{ ????????????$this-&gt;prepareQueue($manager); ????????}  ????????if?($coroutineEnable)?{ ????????????Runtime::enableCoroutine(true,?Arr::get($this-&gt;config,?'coroutine.flags',?SWOOLE_HOOK_ALL)); ????????}  ????????$manager-&gt;run(); ????}  ????/** ?????*?@param?Server?$server ?????*?@return?Process|void ?????*/ ????protected?function?getHotReloadProcessNow($server) ????{ ????????return?new?Process(function?()?use?($server)?{ ????????????$watcher?=?new?FileWatcher( ????????????????Arr::get($this-&gt;config,?'hot_reload.include',?[]), ????????????????Arr::get($this-&gt;config,?'hot_reload.exclude',?[]), ????????????????Arr::get($this-&gt;config,?'hot_reload.name',?[]) ????????????);  ????????????$watcher-&gt;watch(function?()?use?($server)?{ ????????????????$server-&gt;reload(); ????????????}); ????????},?false,?0,?true); ????}  }</http:>

InteractsWithQueue 類

<?php namespace crmebswooleserver;   use crmebswoolequeueManager as QueueManager; use SwooleTWHttpServerManager;  /**  * Trait InteractsWithQueue  * @package crmebswooleserver  */ trait InteractsWithQueue {     public function prepareQueue(Manager $manager)     {         /** @var QueueManager $queueManager */         $queueManager = $this->laravel-&gt;make(QueueManager::class);  ????????$queueManager-&gt;attachToServer($manager,?$this-&gt;output); ????} }

Manager類

<?php namespace crmebswoolequeue;   use IlluminateContractsContainerContainer; use SwooleConstant; use SwooleProcess; use SwooleProcessPool; use SwooleTimer; use IlluminateSupportArr; use IlluminateQueueEventsJobFailed; use IlluminateQueueWorker; use crmebswooleserverWithContainer; use IlluminateQueueJobsJob; use function SwooleCoroutinerun; use IlluminateQueueWorkerOptions; use SwooleTWHttpServerManager as ServerManager; use IlluminateConsoleOutputStyle;  class Manager {     use WithContainer;      /**      * Container.      *      * @var IlluminateContractsContainerContainer      */     protected $container;      /**      * @var OutputStyle      */     protected $output;      /**      * @var Closure[]      */     protected $workers = [];      /**      * Manager constructor.      * @param Container $container      */     public function __construct(Container $container)     {         $this->container?=?$container; ????}  ????/** ?????*?@param?ServerManager?$server ?????*/ ????public?function?attachToServer(ServerManager?$server,?OutputStyle?$output) ????{ ????????$this-&gt;output?=?$output; ????????$this-&gt;listenForEvents(); ????????$this-&gt;createWorkers(); ????????foreach?($this-&gt;workers?as?$worker)?{ ????????????$server-&gt;addProcess(new?Process($worker,?false,?0,?true)); ????????} ????}  ????/** ?????*?運行消息隊列命令 ?????*/ ????public?function?run():?void ????{ ????????@cli_set_process_title("swoole?queue:?manager?process");  ????????$this-&gt;listenForEvents(); ????????$this-&gt;createWorkers();  ????????$pool?=?new?Pool(count($this-&gt;workers));  ????????$pool-&gt;on(Constant::EVENT_WORKER_START,?function?(Pool?$pool,?int?$workerId)?{ ????????????$process?=?$pool-&gt;getProcess($workerId); ????????????run($this-&gt;workers[$workerId],?$process); ????????});  ????????$pool-&gt;start(); ????}  ????/** ?????*?創(chuàng)建執(zhí)行任務(wù) ?????*/ ????protected?function?createWorkers() ????{ ????????$workers?=?$this-&gt;getConfig('queue.workers',?[]);  ????????foreach?($workers?as?$queue?=&gt;?$options)?{  ????????????if?(strpos($queue,?'@')?!==?false)?{ ????????????????[$queue,?$connection]?=?explode('@',?$queue); ????????????}?else?{ ????????????????$connection?=?null; ????????????}  ????????????$this-&gt;workers[]?=?function?(Process?$process)?use?($options,?$connection,?$queue)?{  ????????????????@cli_set_process_title("swoole?queue:?worker?process");  ????????????????/**?@var?Worker?$worker?*/ ????????????????$worker?=?$this-&gt;container-&gt;make('queue.worker'); ????????????????/**?@var?WorkerOptions?$option?*/ ????????????????$option?=?$this-&gt;container-&gt;make(WorkerOptions::class);  ????????????????$option-&gt;sleep?=?Arr::get($options,?"sleep",?3); ????????????????$option-&gt;maxTries?=?Arr::get($options,?"tries",?0); ????????????????$option-&gt;timeout?=?Arr::get($options,?"timeout",?60);  ????????????????$timer?=?Timer::after($option-&gt;timeout?*?1000,?function?()?use?($process)?{ ????????????????????$process-&gt;exit(); ????????????????});  ????????????????$worker-&gt;runNextJob($connection,?$queue,?$option);  ????????????????Timer::clear($timer); ????????????}; ????????} ????}  ????/** ?????*?注冊事件 ?????*/ ????protected?function?listenForEvents() ????{ ????????$this-&gt;container-&gt;make('events')-&gt;listen(JobFailed::class,?function?(JobFailed?$event)?{ ????????????$this-&gt;writeOutput($event-&gt;job);  ????????????$this-&gt;logFailedJob($event); ????????}); ????}  ????/** ?????*?記錄失敗任務(wù) ?????*?@param?JobFailed?$event ?????*/ ????protected?function?logFailedJob(JobFailed?$event) ????{ ????????$this-&gt;container['queue.failer']-&gt;log( ????????????$event-&gt;connection, ????????????$event-&gt;job-&gt;getQueue(), ????????????$event-&gt;job-&gt;getRawBody(), ????????????$event-&gt;exception ????????); ????}  ????/** ?????*?Write?the?status?output?for?the?queue?worker. ?????* ?????*?@param?Job?$job ?????*?@param?????$status ?????*/ ????protected?function?writeOutput(Job?$job,?$status) ????{ ????????switch?($status)?{ ????????????case?'starting': ????????????????$this-&gt;writeStatus($job,?'Processing',?'comment'); ????????????????break; ????????????case?'success': ????????????????$this-&gt;writeStatus($job,?'Processed',?'info'); ????????????????break; ????????????case?'failed': ????????????????$this-&gt;writeStatus($job,?'Failed',?'error'); ????????????????break; ????????} ????}  ????/** ?????*?Format?the?status?output?for?the?queue?worker. ?????* ?????*?@param?Job?$job ?????*?@param?string?$status ?????*?@param?string?$type ?????*?@return?void ?????*/ ????protected?function?writeStatus(Job?$job,?$status,?$type) ????{ ????????$this-&gt;output-&gt;writeln(sprintf( ????????????"[%s][%s]?%s{$type}&gt;?%s", ????????????date('Y-m-d?H:i:s'), ????????????$job-&gt;getJobId(), ????????????str_pad("{$status}:",?11),?$job-&gt;getName() ????????)); ????}  }

增加CrmebServiceProvider類

<?php namespace crmebswoole;   use IlluminateContractsDebugExceptionHandler; use IlluminateContractsHttpKernel; use crmebswoolecommandHttpServerCommand; use IlluminateQueueWorker; use SwooleTWHttpHttpServiceProvider; use SwooleTWHttpMiddlewareAccessLog; use SwooleTWHttpServerManager;  /**  * Class CrmebServiceProvider  * @package crmebswoole  */ class CrmebServiceProvider extends HttpServiceProvider {        /**      * Register manager.      *      * @return void      */     protected function registerManager()     {         $this->app-&gt;singleton(Manager::class,?function?($app)?{ ????????????return?new?Manager($app,?'laravel'); ????????});  ????????$this-&gt;app-&gt;alias(Manager::class,?'swoole.manager');  ????????$this-&gt;app-&gt;singleton('queue.worker',?function?($app)?{ ????????????$isDownForMaintenance?=?function?()?{ ????????????????return?$this-&gt;app-&gt;isDownForMaintenance(); ????????????};  ????????????return?new?Worker( ????????????????$app['queue'], ????????????????$app['events'], ????????????????$app[ExceptionHandler::class], ????????????????$isDownForMaintenance ????????????); ????????}); ????}  ????/** ?????*?Boot?websocket?routes. ?????* ?????*?@return?void ?????*/ ????protected?function?bootWebsocketRoutes() ????{ ????????require?base_path('vendor/swooletw/laravel-swoole')?.?'/routes/laravel_routes.php'; ????}  ????/** ?????*?Register?access?log?middleware?to?container. ?????* ?????*?@return?void ?????*/ ????protected?function?pushAccessLogMiddleware() ????{ ????????$this-&gt;app-&gt;make(Kernel::class)-&gt;pushMiddleware(AccessLog::class); ????}  ????/** ?????*?Register?commands. ?????*/ ????protected?function?registerCommands() ????{ ????????$this-&gt;commands([ ????????????HttpServerCommand::class, ????????]); ????}  ????/** ?????*?Merge?configurations. ?????*/ ????protected?function?mergeConfigs() ????{ ????????$this-&gt;mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_http.php',?'swoole_http'); ????????$this-&gt;mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_websocket.php',?'swoole_websocket'); ????}  ????/** ?????*?Publish?files?of?this?package. ?????*/ ????protected?function?publishFiles() ????{ ????????$this-&gt;publishes([ ????????????base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_http.php'?=&gt;?base_path('config/swoole_http.php'), ????????????base_path('vendor/swooletw/laravel-swoole')?.?'/config/swoole_websocket.php'?=&gt;?base_path('config/swoole_websocket.php'), ????????????base_path('vendor/swooletw/laravel-swoole')?.?'/routes/websocket.php'?=&gt;?base_path('routes/websocket.php'), ????????],?'laravel-swoole'); ????} }

然后再把crmebswooleCrmebServiceProvider::class放入config/app.php中的providers中加載重寫了swoole的命令啟動方式

配置config/swoole_http.php

return [     'queue'        => [         //是否開啟自動消費隊列         'enabled' => true,         'workers' => [             //隊列名稱             'CRMEB' => []         ]     ],];

輸入命令:
php artisan crmeb:http restart

swoole啟動后就可以自動消費隊列了。

相關(guān)推薦:最新的五個Laravel視頻教程

以上就是laravel8中l(wèi)aravel-

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點贊12 分享