你知道為什么Laravel會重復(fù)執(zhí)行同一個隊列任務(wù)嗎?

下面由laravel教程欄目給大家介紹為什么 laravel 會重復(fù)執(zhí)行同一個隊列任務(wù),希望對需要的朋友有所幫助!

你知道為什么Laravel會重復(fù)執(zhí)行同一個隊列任務(wù)嗎?

在 Laravel 中使用 redis 處理隊列任務(wù),框架提供的功能非常強(qiáng)大,但是最近遇到一個問題,就是發(fā)現(xiàn)一個任務(wù)被多次執(zhí)行,這是為什么呢?

先說原因:因為在 Laravel 中如果一個隊列(任務(wù))執(zhí)行時間大于 60 秒,就會被認(rèn)為執(zhí)行失敗并重新加入隊列中,這樣就會導(dǎo)致重復(fù)執(zhí)行同一個任務(wù)。

這個任務(wù)的邏輯就是給用戶推送內(nèi)容,需要根據(jù)隊列內(nèi)容取出用戶并遍歷,通過請求后端 HTTP 接口發(fā)送。比如有 10000 個用戶,在用戶數(shù)量多或接口處理速度沒那么快的情況下,執(zhí)行時間肯定會大于 60 秒,于是這個任務(wù)就被重新加入隊列。情況更糟糕一點(diǎn),前面的任務(wù)如果都沒有在 60 秒執(zhí)行完,就都會重新加入隊列,這樣同一個任務(wù)就不止重復(fù)執(zhí)行一次了,而是多次。

下面從 Laravel 源代碼找一下罪魁禍?zhǔn)住?/p>

源代碼文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php

/**  * The expiration time of a job.  *  * @var int|null  */ protected $expire = 60;

這個 $expire 成員變量是一個固定的值,Laravel 認(rèn)為一個隊列再怎么 60 秒也該執(zhí)行完了吧。取隊列方法:

public function pop($queue = null) {     $original = $queue ?: $this->default;       $queue = $this->getQueue($queue);       $this->migrateExpiredJobs($queue.':delayed', $queue);       if (! is_null($this->expire)) {         $this->migrateExpiredJobs($queue.':reserved', $queue);     }       list($job, $reserved) = $this->getConnection()->eval(         LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire     );       if ($reserved) {         return new RedisJob($this->container, $this, $job, $reserved, $original);     } }

取隊列有幾步操作,因為隊列執(zhí)行失敗,或執(zhí)行超時等都會放入另外的集合保存起來,以便重試,過程如下:

1.把因執(zhí)行失敗的隊列從 delayed 集合重新 rpush 到當(dāng)前執(zhí)行的隊列中。

2.把因執(zhí)行超時的隊列從 reserved 集合重新 rpush 到當(dāng)前執(zhí)行的隊列中。

3.然后才是從隊列中取任務(wù)開始執(zhí)行,同時把隊列放入 reserved 的有序集合。

這里使用了 eval 命令執(zhí)行這個過程,用到了幾個 lua 腳本。

從要執(zhí)行的隊列中取任務(wù):

local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then     reserved = cjson.decode(job)     reserved['attempts'] = reserved['attempts'] + 1     reserved = cjson.encode(reserved)     redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}

可以看到 Laravel 在取 Redis 要執(zhí)行的隊列的時候,同時會放一份到一個有序集合中,并使用過期時間戳作為分值。

只有當(dāng)這個任務(wù)完成后,再把有序集合中這個任務(wù)移除。從這個有序集合移除隊列的代碼就省略,我們看一下 Laravel 如何處理執(zhí)行時間大于 60 秒的隊列。

也就是這段 lua 腳本執(zhí)行的操作:

local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) if(next(val) ~= nil) then     redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)     for i = 1, #val, 100 do         redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))     end end return true

這里 zrangebyscore 找出分值從無限小到當(dāng)前時間戳的元素,也就是 60 秒之前加入到集合的任務(wù),然后通過 zremrangebyrank 從集合移除這些元素并 rpush 到隊列中。

看到這里應(yīng)該就恍然大悟了。

如果一個隊列 60 秒沒執(zhí)行完,那么進(jìn)程在取隊列的時候從 reserved 集合中把這些任務(wù)又重新 rpush 到隊列中。

相關(guān)推薦:最新的五個Laravel視頻教程

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊7 分享