文章詳情頁
PHP簡易延時隊列的實現流程詳解
瀏覽:268日期:2022-06-05 17:00:41
目錄
- 需求說明
- 設計思路
- 實現
- 最后說點
需求說明
- 當用戶申請售后,商家未在n小時內處理,系統自動進行退款。
- 商家拒絕后,用戶可申請客服介入,客服x天內超時未處理,系統自動退款。
- 用戶收到貨物,x天自動確認收貨
- 等等需要延時操作的流程……
設計思路
- 設計一張隊列表,記錄所有隊列的參數,執行狀態,重試次數
- 將創建隊列的
id存于redis中,使用zset有序集合。按照時間戳進行排序 - 使用
croontab定時任務每分鐘執行一次
實現
新建隊列表
CREATE TABLE `delay_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `params` varchar(512) DEFAULT NULL, `message` varchar(255) DEFAULT "" COMMENT "執行結果", `ext_string` varchar(255) DEFAULT "" COMMENT "擴展字符串,可用于快速檢索。取消該隊列", `retry_times` int(2) DEFAULT "0" COMMENT "重試次數", `status` int(2) NOT NULL DEFAULT "1" COMMENT "1 待執行, 10 執行成功, 20 執行失敗,30取消執行", `created` datetime DEFAULT NULL, `modified` datetime DEFAULT NULL, PRIMARY KEY (`id`), KEY `ext_idx` (`ext_string`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
部分隊列的操作方法,新增隊列、取消隊列、隊列執行成功、隊列執行失敗、隊列重試【重試時間間隔抄的微信支付的異步通知時間】
class DelayQueueService
{
// 重試時間,最大重試次數 15
private static $retryTimes = [
15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60,
3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60,
];
/**
* @description 增加隊列至redis
* @param $queueId
* @param int $delay 需要延遲執行的時間。單位秒
* @return void
*/
public function addDelayQueue($queueId, int $delay)
{
$time = time() + $delay;
$redis = RedisService::getInstance();
$redis->zAdd("delay_queue_job", $time, $queueId);
}
// 取消redis 隊列
public function cancelDelayQueue($ext)
{
$row = $query->getRow(); // 使用ext_string 快速檢索到相應的記錄
if ($row) {
$redis = RedisService::getInstance();
$redis->zRem("delay_queue_job", $row->id);
$row->status = DelayQueueTable::STATUS_CANCEL;
$table->save($row);
}
}
/**
* @description 執行成功
* @return void
*/
public static function success($id, $message = null)
{
$table->update([
"status" => DelayQueueTable::STATUS_SUCCESS,
"message" => $message ?? "",
"modified" => date("Y-m-d H:i:s"),
], [
"id" => $id,
]);
}
/**
* @description 執行失敗
* @return void
*/
public static function failed($id, $message = null)
{
$table->updateAll([
"status" => DelayQueueTable::STATUS_FAILED,
"message" => $message ?? "",
"modified" => date("Y-m-d H:i:s"),
], [
"id" => $id,
]);
}
/**
* @description 失敗隊列重試,最大重試15次
* @param $id
* @return void
*/
public static function retry($id)
{
$info = self::getById($id);
if (!$info) {
return;
}
$retryTimes = ++$info["retry_times"];
if ($retryTimes > 15) {
return;
}
$entity = [
"params" => $info["params"],
"ext_string" => $info["ext_string"],
"retry_times" => $retryTimes,
];
$queueId = $table->save($entity);
self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]);
}
}
在命令行進行任務的運行
public function execute(Arguments $args, ConsoleIo $io)
{
$startTimestamp = strtotime("-1 days");
$now = time();
$redis = RedisService::getInstance();
$queueIds = $redis->zRangeByScore("delay_queue_job", $startTimestamp, $now);
if ($queueIds) {
foreach ($queueIds as $id) {
$info = // 按照隊列id 獲取相應的信息
if ($info["status"] === DelayQueueTable::STATUS_PADDING) {
$params = unserialize($info["params"]); // 創建記錄的時候,需要試用serialize 將類名,方法,參數序列化
$class = $params["class"];
$method = $params["method"];
$data = $params["data"];
try {
call_user_func_array([$class, $method], [$data]);
$redis->zRem("delay_queue_job", $id);
$msg = date("Y-m-d H:i:s") . " [info] success: $id";
DelayQueueService::success($id, $msg);
$io->success($msg);
} catch (Exception $e) {
$msg = date("Y-m-d H:i:s") . " [error] {$e->getMessage()}";
DelayQueueService::failed($id, $msg);
// 自定義異常code,不進行隊列重試
if (10000 != $e->getCode()) {
DelayQueueService::retry($id);
}
$io->error($msg);
}
}
}
}
}
最后說點
- 我這邊的系統對實時性要求不高,所以直接使用的是
linux的crond服務,每分鐘運行一次。如需精確到秒級,可寫一個shell,一分鐘循環執行<=60次 - 因為目前的數據較少,延時隊列加入的只有小部分。所以就在
command里面直接執行更新操作了,后期如果隊列多,且有比較耗時的操作,可考慮把耗時操作單獨放置一個隊列中。本方法只用于將數據塞進隊列。
附上 shell 腳本 一分鐘執行60次
#!/bin/bash step=2 #間隔的秒數,不能大于60 for (( i = 0; i < 60; i=(i+step) )); do echo $i # do something sleep $step done
到此這篇關于PHP簡易延時隊列的實現流程詳解的文章就介紹到這了,更多相關PHP延時隊列內容請搜索以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持!
標簽:
PHP
上一條:ASP替換、保存遠程圖片實現代碼下一條:PHP中流的定義及作用詳解
排行榜

網公網安備