Appearance
异步队列
MineShop 使用 Hyperf 的异步队列组件处理耗时任务,如订单创建、消息发送、数据导出等。
🎯 队列架构
┌─────────────────────────────────────────────────────────────┐
│ 生产者 (Producer) │
│ Controller / Service / Crontab │
├─────────────────────────────────────────────────────────────┤
│ Redis 队列 │
│ default / export / notification / ... │
├─────────────────────────────────────────────────────────────┤
│ 消费者 (Consumer) │
│ AsyncQueueConsumer Process │
├─────────────────────────────────────────────────────────────┤
│ Job 处理器 │
│ OrderCreateJob / ExportJob / SendMessageJob / ... │
└─────────────────────────────────────────────────────────────┘⚙️ 队列配置
默认队列
php
// config/autoload/async_queue.php
return [
'default' => [
'driver' => RedisDriver::class,
'redis' => [
'pool' => 'default',
],
'channel' => '{queue}',
'timeout' => 2,
'retry_seconds' => 5,
'handle_timeout' => 10,
'processes' => 1,
'concurrent' => [
'limit' => 10,
],
'max_messages' => 0,
],
];导出队列(插件配置)
php
// plugins/export-center/src/ConfigProvider.php
return [
'async_queue' => [
'export' => [
'driver' => RedisDriver::class,
'channel' => '{export-queue}',
'timeout' => 2,
'retry_seconds' => 10,
'handle_timeout' => 600, // 10分钟超时
'processes' => 1,
'concurrent' => [
'limit' => 5,
],
],
],
];📦 Job 定义
订单创建 Job
php
// OrderCreateJob.php
class OrderCreateJob implements JobInterface
{
public function __construct(
public readonly string $tradeNo,
public readonly array $entitySnapshot,
public readonly array $itemsPayload,
public readonly array $addressPayload,
public readonly array $couponUserIds,
public readonly string $orderType,
public readonly string $stockHashKey,
) {}
public function handle(): void
{
try {
// 1. 重建订单实体
$entity = $this->rebuildEntity();
// 2. 创建订单记录
$order = $this->orderRepository->createFromEntity($entity);
// 3. 创建订单商品
$this->createOrderItems($order, $this->itemsPayload);
// 4. 扣减优惠券
$this->deductCoupons($this->couponUserIds);
// 5. 更新缓存状态为成功
$this->pendingCacheService->markCreated($this->tradeNo);
} catch (\Throwable $e) {
// 回滚库存
$this->stockService->rollback($this->itemsPayload, $this->stockHashKey);
// 更新缓存状态为失败
$this->pendingCacheService->markFailed($this->tradeNo, $e->getMessage());
throw $e;
}
}
}消息发送 Job
php
// SendMessageJob.php
class SendMessageJob implements JobInterface
{
public function __construct(
public readonly int $messageId,
public readonly int $userId,
public readonly string $channel,
) {}
public function handle(): void
{
$message = Message::find($this->messageId);
if (!$message) {
return;
}
$this->notificationService->send($message, $this->userId, $this->channel);
}
}秒杀场次启动 Job
php
// SeckillSessionStartJob.php
class SeckillSessionStartJob implements JobInterface
{
public function __construct(
public readonly int $sessionId,
public readonly int $activityId,
) {}
public function handle(): void
{
// 1. 激活场次
$this->sessionService->start($this->sessionId);
// 2. 预热缓存
$this->cacheService->warmSession($this->sessionId);
// 3. 联动激活活动
if ($this->activityService->isPending($this->activityId)) {
$this->activityService->start($this->activityId);
}
}
}🔧 投递任务
立即投递
php
// 获取队列驱动
$driver = $this->driverFactory->get('default');
// 投递任务
$driver->push(new OrderCreateJob(...));延迟投递
php
// 延迟 60 秒执行
$driver->push(new SeckillSessionStartJob($sessionId, $activityId), 60);指定队列
php
// 投递到 export 队列
$driver = $this->driverFactory->get('export');
$driver->push(new ExportJob(...));📊 队列监控
查看队列状态
bash
# 查看 Redis 队列长度
redis-cli LLEN {queue}:waiting
redis-cli LLEN {queue}:delayed
redis-cli LLEN {queue}:failed日志记录
php
// Job 中记录日志
public function handle(): void
{
logger()->info('Job started', ['job' => static::class]);
try {
// 处理逻辑
logger()->info('Job completed', ['job' => static::class]);
} catch (\Throwable $e) {
logger()->error('Job failed', [
'job' => static::class,
'error' => $e->getMessage(),
]);
throw $e;
}
}🔄 失败重试
配置重试
php
'retry_seconds' => [5, 10, 30, 60], // 重试间隔
'max_attempts' => 3, // 最大重试次数手动重试
php
// 获取失败任务
$failed = $redis->lrange('{queue}:failed', 0, -1);
// 重新投递
foreach ($failed as $job) {
$driver->push(unserialize($job));
}⚠️ 注意事项
- 幂等性: Job 应设计为幂等,支持重试
- 超时设置: 根据任务复杂度设置合理的
handle_timeout - 并发控制: 通过
concurrent.limit控制并发数 - 错误处理: 捕获异常并记录日志,必要时回滚操作
- 监控告警: 监控队列积压情况,及时处理