Skip to content

异步队列

MineShop 使用 Hyperf 的异步队列组件处理耗时任务,如订单创建、消息发送、数据导出等。

🎯 队列架构

┌─────────────────────────────────────────────────────────────┐
│                      生产者 (Producer)                       │
│              Controller / Service / Crontab                  │
├─────────────────────────────────────────────────────────────┤
│                      Redis 队列                              │
│         default / export / notification / ...               │
├─────────────────────────────────────────────────────────────┤
│                      消费者 (Consumer)                       │
│              AsyncQueueConsumer Process                      │
├─────────────────────────────────────────────────────────────┤
│                      Job 处理器                              │
│     OrderCreateJob / ExportJob / SendMessageJob / ...       │
└─────────────────────────────────────────────────────────────┘

⚙️ 队列配置

默认队列

php
// config/autoload/async_queue.php
return [
    'default' => [
        'driver' => RedisDriver::class,
        'redis' => [
            'pool' => 'default',
        ],
        'channel' => '{queue}',
        'timeout' => 2,
        'retry_seconds' => 5,
        'handle_timeout' => 10,
        'processes' => 1,
        'concurrent' => [
            'limit' => 10,
        ],
        'max_messages' => 0,
    ],
];

导出队列(插件配置)

php
// plugins/export-center/src/ConfigProvider.php
return [
    'async_queue' => [
        'export' => [
            'driver' => RedisDriver::class,
            'channel' => '{export-queue}',
            'timeout' => 2,
            'retry_seconds' => 10,
            'handle_timeout' => 600,  // 10分钟超时
            'processes' => 1,
            'concurrent' => [
                'limit' => 5,
            ],
        ],
    ],
];

📦 Job 定义

订单创建 Job

php
// OrderCreateJob.php
class OrderCreateJob implements JobInterface
{
    public function __construct(
        public readonly string $tradeNo,
        public readonly array $entitySnapshot,
        public readonly array $itemsPayload,
        public readonly array $addressPayload,
        public readonly array $couponUserIds,
        public readonly string $orderType,
        public readonly string $stockHashKey,
    ) {}

    public function handle(): void
    {
        try {
            // 1. 重建订单实体
            $entity = $this->rebuildEntity();
            
            // 2. 创建订单记录
            $order = $this->orderRepository->createFromEntity($entity);
            
            // 3. 创建订单商品
            $this->createOrderItems($order, $this->itemsPayload);
            
            // 4. 扣减优惠券
            $this->deductCoupons($this->couponUserIds);
            
            // 5. 更新缓存状态为成功
            $this->pendingCacheService->markCreated($this->tradeNo);
            
        } catch (\Throwable $e) {
            // 回滚库存
            $this->stockService->rollback($this->itemsPayload, $this->stockHashKey);
            
            // 更新缓存状态为失败
            $this->pendingCacheService->markFailed($this->tradeNo, $e->getMessage());
            
            throw $e;
        }
    }
}

消息发送 Job

php
// SendMessageJob.php
class SendMessageJob implements JobInterface
{
    public function __construct(
        public readonly int $messageId,
        public readonly int $userId,
        public readonly string $channel,
    ) {}

    public function handle(): void
    {
        $message = Message::find($this->messageId);
        if (!$message) {
            return;
        }
        
        $this->notificationService->send($message, $this->userId, $this->channel);
    }
}

秒杀场次启动 Job

php
// SeckillSessionStartJob.php
class SeckillSessionStartJob implements JobInterface
{
    public function __construct(
        public readonly int $sessionId,
        public readonly int $activityId,
    ) {}

    public function handle(): void
    {
        // 1. 激活场次
        $this->sessionService->start($this->sessionId);
        
        // 2. 预热缓存
        $this->cacheService->warmSession($this->sessionId);
        
        // 3. 联动激活活动
        if ($this->activityService->isPending($this->activityId)) {
            $this->activityService->start($this->activityId);
        }
    }
}

🔧 投递任务

立即投递

php
// 获取队列驱动
$driver = $this->driverFactory->get('default');

// 投递任务
$driver->push(new OrderCreateJob(...));

延迟投递

php
// 延迟 60 秒执行
$driver->push(new SeckillSessionStartJob($sessionId, $activityId), 60);

指定队列

php
// 投递到 export 队列
$driver = $this->driverFactory->get('export');
$driver->push(new ExportJob(...));

📊 队列监控

查看队列状态

bash
# 查看 Redis 队列长度
redis-cli LLEN {queue}:waiting
redis-cli LLEN {queue}:delayed
redis-cli LLEN {queue}:failed

日志记录

php
// Job 中记录日志
public function handle(): void
{
    logger()->info('Job started', ['job' => static::class]);
    
    try {
        // 处理逻辑
        logger()->info('Job completed', ['job' => static::class]);
    } catch (\Throwable $e) {
        logger()->error('Job failed', [
            'job' => static::class,
            'error' => $e->getMessage(),
        ]);
        throw $e;
    }
}

🔄 失败重试

配置重试

php
'retry_seconds' => [5, 10, 30, 60],  // 重试间隔
'max_attempts' => 3,                  // 最大重试次数

手动重试

php
// 获取失败任务
$failed = $redis->lrange('{queue}:failed', 0, -1);

// 重新投递
foreach ($failed as $job) {
    $driver->push(unserialize($job));
}

⚠️ 注意事项

  1. 幂等性: Job 应设计为幂等,支持重试
  2. 超时设置: 根据任务复杂度设置合理的 handle_timeout
  3. 并发控制: 通过 concurrent.limit 控制并发数
  4. 错误处理: 捕获异常并记录日志,必要时回滚操作
  5. 监控告警: 监控队列积压情况,及时处理

📚 相关文档

基于 Apache-2.0 许可发布 | 感谢 MineAdmin 提供的优秀基础框架