队列

简介

Laravel 现在为 Redis 驱动的队列提供了一个具有漂亮的后台面板和配置系统的 Horizon。查看完整的 Horizon 文档 了解更多信息。

Laravel 队列为不同的队列后端提供了统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至是关系型数据库。队列可以让一些耗时的任务(比如发送邮件)延迟处理。延迟这些耗时的任务会大幅度提高应用对 Web 请求的响应速度。

队列配置文件存储在 config/queue.php。在此文件中,我们可以找到框架包含的每个队列驱动的连接配置,包括数据库,BeanstalkdAmazon SQSRedis 和一个会立即执行任务的同步驱动(用于本地使用)。还包含一个丢弃所有队列任务的 null 队列驱动。

连接 Vs. 队列

在使用 Laravel 的队列之前,弄清楚「队列」和「连接」的区别是很重要的。在 config/queue.php 配置中,有一个 connections 选项。该选项定义了一些特定的后端服务,例如 Amazon SQS,Beanstalk 或 Redis。但是,任何给定连接都可能会有多个「队列」,队列可以认为是队列任务不同的栈或堆。

要注意的是,queue 配置文件中每个连接的示例配置都包含一个 queue 属性。这是任务被发送到给定连接时分发的默认队列。换句话说,如果您没有明确定义任务应该被分发到哪个队列,任务会被分发到对应连接配置中 queue 属性所定义的队列:

// 此任务会被发送到默认队列
Job::dispatch();

// 此任务会被发送到「emails」队列
Job::dispatch()->onQueue('emails');

有些应用可能不需要把任务添加到多个队列,而是添加到一个简单队列就可以了。但是,当要优先处理或分段处理某些任务时,添加到多个队列就非常有用了,因为 Laravel 队列处理器可以指定队列的优先级。例如,如果将任务添加到 high 队列,可以运行一个指定处理优先级的队列处理器:

php artisan queue:work --queue=high,default

驱动前提 & 注意事项

数据库

使用 database 队列驱动,需要创建一张数据表来存储任务。要生成创建此表的数据库迁移,可以运行 Artisan 命令 queue:table。创建迁移后,就可以使用 migrate 命令运行迁移了:

php artisan queue:table

php artisan migrate

Redis

使用 redis 队列驱动,需要在配置文件 config/database.php 中配置一个 Redis 的数据库连接。

Redis 集群

如果 Redis 队列连接使用 Redis 集群,队列名必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放在同一哈希中:

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],
阻塞

当使用 Redis 队列时,可以使用 block_for 配置项指定驱动在遍历完队列处理器循环并轮询 Redis 数据库之前应该阻塞多久任务才变得可用。

根据队列负载调整此值会比持续轮询 Redis 数据库获取新任务更高效。例如,可以将此值设置为 5 指示驱动应在等待任务可用时阻塞 5 秒:

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],

阻塞是一个实验性功能。如果在 Redis 服务器或处理器崩溃的同时获取任务,有很小的可能丢失一个队列任务。

其它驱动前提

列出的队列驱动需要以下依赖:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • Redis: predis/predis ~1.0

创建任务

生成任务类

默认情况下,应用的所有队列任务都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,它会在运行 Artisan 命令 make:job 时创建。可以使用 Artisan CLI 生成一个新的队列任务:

php artisan make:job ProcessPodcast

生成的类会实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 应该将此任务添加队列中异步运行。

类结构

任务类很简单,通常只包含一个队列在处理任务时调用的 handle 方法。首先,我们看一个任务类的示例。在此示例中,假设我们管理一个播客发布服务,在发布之前需要处理上传的播客文件:

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建新的任务实例
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行任务
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客
    }
}

在上述示例中,请注意我们可以在队列任务的构造函数中直接传递一个 Eloquent 模型。由于任务使用了 SerializesModels Trait,处理任务时 Eloquent 模型会被优雅地序列化和反序列化。如果队列任务的构造函数中了接收一个 Eloquent 模型,只有模型的标识符会被序列化到队列中。当任务实际处理时,队列系统会从数据库中自动获取完整的模型实例。这对应用是完全透明的,可以避免序列化完整的 Eloquent 模型实例带来的问题。

在队列处理任务时会调用 handle 方法。要注意的是,可以在任务的 handle 方法对依赖使用类型提示。Laravel 服务容器 会自动注入这些依赖。

二进制数据(例如原始图片内容)在传递到队列任务之前应该使用 base64_encode 函数编码。否则,当放入队列时,任务无法正确序列化为 JSON。

分发任务

编写任务类后,就可以使用任务自身的 dispatch 方法分发它了。传递给 dispatch 方法的参数会传递给任务的构造函数:

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储新的播客
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客

        ProcessPodcast::dispatch($podcast);
    }
}

延迟分发

如果要延迟执行一个队列任务,可以在分发任务时使用 delay 方法。例如,我们指定一个在分发后的 10 分钟后才处理的任务:

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储新的播客
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客

        ProcessPodcast::dispatch($podcast)
                ->delay(now()->addMinutes(10));
    }
}

Amazon SQS 队列服务最大延迟时间为 15 分钟。

任务链

任务链可以定义一个顺序执行的队列任务。如果其中一个任务失败了,剩下的任务将不会运行。要执行一个队列任务链,可以在任何可分发的任务上使用 withChain 方法:

ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch();

任务链连接 & 队列

如果要为队列任务指定使用的默认连接和队列,可以使用 allOnConnectionallOnQueue 方法。除非队列任务被显示指定了不同的连接/队列,否则应使用上述方法指定队列连接和队列名称:

ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

自定义队列 & 连接

分发到指定队列

可以通过把任务添加到不同的队列,对队列任务进行「分类」,甚至按优先级为各种队列分配不同数量的队列处理器。请记住,这不是把任务添加到队列配置文件中定义的不同队列「连接」,而只是在单个连接中指定队列。要指定队列,在分发任务时使用 onQueue 方法:

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储新的播客
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客

        ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
}

分发到指定连接

如果使用了多个队列连接,可以指定任务添加到哪个连接。要指定连接,可以在分发任务时使用 onConnection 方法:

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储新的播客
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
}

当然,还可以链式调用 onConnectiononQueue 方法为任务指定连接和队列:

ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

指定最大尝试次数/超时时间

最大尝试次数

一个指定任务的最大尝试次数的方法,是可以通过 Artisan 命令行中的 --tries 选项指定:

php artisan queue:work --tries=3

但是,还有一个更细腻的方法是在任务类中定义最大尝试次数。如果任务中指定了最大尝试次数,此值会优先于命令行中提供的值:

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务的最大尝试次数
     *
     * @var int
     */
    public $tries = 5;
}

基于时间的尝试

除了定义任务失败前尝试多少次,还可以定义任务的尝试时间。它允许任务在给定时间限制内尝试任意次数。要定义任务的尝试时间,可以在任务类中添加一个 retryUntil 方法:

/**
 * 决定任务的尝试时间
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addSeconds(5);
}

也可以在队列事件监听器中定义 retryUntil 方法。

超时

timeout 功能为 PHP 7.1+ 和 pcntl PHP 扩展进行了优化。

同样的,任务可以运行的最大秒数可以通过 Artisan 命令行的 --timeout 选项指定:

php artisan queue:work --timeout=30

但是,可能还想在任务中定义任务允许运行的最大秒数。如果任务中指定了超时时间,此值会优先于命令行中提供的值:

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务在超时前可以运行的最大秒数
     *
     * @var int
     */
    public $timeout = 120;
}

速率限制

此功能要求应用可以和 Redis 服务器 交互。

如果应用可以和 Redis 交互,可以通过时间或并发数对队列任务进行节流处理。当队列任务和同样也有速率限制的 API 交互时此功能会很有用。例如,使用 throttle 方法,可以限制给定类型的任务每 60 秒只能运行 10 次。如果不能获得锁,通常应该将任务放回队列以便可以稍后重试:

Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // 任务逻辑
}, function () {
    // 不能获取锁

    return $this->release(10);
});

在上述示例中,key 可以是能唯一标识要进行速率限制的任务类型的任何字符串。例如,可能希望基于任务的类名和操作的 Eloquent 模型的 ID 构造它。

或者,还可以指定可以同时处理给定任务的队列处理器的最多数量。这在队列任务修改一个一次只能被一个任务修改的资源时很有用。例如,使用 funnel 方法,可以限制给定任务一次只能被一个队列处理器处理:

Redis::funnel('key')->limit(1)->then(function () {
    // 任务逻辑
}, function () {
    // 不能获取锁

    return $this->release(10);
});

当使用速率限制时,任务运行成功需要尝试的次数很难决定。因此,将速率限制和 基于时间的尝试 结合使用会很有用。

错误处理

如果任务处理时抛出了一个异常,任务会自动被释放回队列以便再次尝试。任务会不断地被释放直到达到应用允许的最大尝试时间。最大尝试次数可以在 Artisan 命令 queue:work 上通过 --tries 选项定义。或者,在任务类中定义最大尝试次数。有关运行队列处理器的更多信息,可以在 下方 查看。

运行队列处理器

Laravel 包括一个队列处理器,会在新任务添加到队列时自动处理。可以使用 Artisan 命令 queue:work 启动队列处理器。要注意的是,queue:work 命令启动后,会持续运行,直到手动停止或关闭终端:

php artisan queue:work

要保持 queue:work 进程在后台长期运行,应使用进程管理器(如 Supervisor)确保队列处理器不会停止运行。

注意,队列处理器是长期运行的,并在内存中存储已启动的应用状态。所以,队列处理器启动后不会注意到代码的更改。因此,在部署过程中,确保 重启队列处理器

指定连接 & 队列

还可以指定队列处理器要使用的队列连接。传递给 work 命令的链接名称应该与在 config/queue.php 配置文件中定义的连接之一相对应:

php artisan queue:work redis

甚至还可以指定队列处理器只处理给定连接的特殊队列。例如,如果所有邮件都在 redis 队列连接的 emails 队列中处理,可以使用如下命令启动只处理此队列的队列处理器:

php artisan queue:work redis --queue=emails

处理单个任务

--once 选项可用于指示队列处理器只处理队列中的单个任务:

php artisan queue:work --once

处理所有队列任务 & 然后退出

--stop-when-empty 选项可用于指示队列处理器处理完所有任务后优雅地退出。当在 Docker 容器中使用 Laravel 并希望队列为空时关闭容器时此选项很有用:

php artisan queue:work --stop-when-empty

资源注意事项

队列处理器守护进程不会在处理每个任务前「重启」框架。因此,应该在每个任务完成后释放任何占用较大的资源。例如,如果使用 GD 库进行图片处理,应该在处理完成后使用 imagedestroy 释放内存。

队列优先级

有时可能希望为队列处理指定优先级。例如,在 config/queue.php 文件中可以将 redis 连接默认的 queue 设置为 low。然而,偶尔希望将任务添加到 high 优先级的队列,像这样:

dispatch((new Job)->onQueue('high'));

要启动一个队列处理器证明所有 high 队列的任务在接着处理任何 low 队列的任务之前处理,可以传递一个逗号分隔的队列名称列表给 work 命令:

php artisan queue:work --queue=high,low

队列处理器 & 部署

由于队列处理器长期处理,如果不重启就不会应用代码的更改。因此,部署使用队列处理器的应用最简单的方法就是在部署时重启队列处理器。可以使用 queue:restart 命令优雅地重启所有队列处理器:

php artisan queue:restart

此命令会指示所有队列处理器在处理完当前任务后优雅地「终止」,因此不会丢失已有任务。由于队列处理器会在运行 queue:restart 命令时终止,因此应该运行进程管理器(如 Supervisor)来自动重启队列处理器。

队列使用 缓存 存储重启信号,因此在使用此功能前要确保为应用配置了一个正确的缓存驱动。

任务过期 & 超时

任务过期

config/queue.php 配置文件中,每个队列连接都定义了一个 retry_after 选项。此选项指定了重试处理的任务前队列连接应该等待的秒数。例如,如果 retry_after 值设置为 90,那么任务在处理了 90 秒后将会被释放回队列而不是删除。通常情况下,应该将 retry_after 值设置为理论上处理所有任务所需的最多秒数。

唯一一个不包含 retry_after 值的队列连接是 Amazon SQS。SQS 会基于 默认可见超时时间 进行重试,可以在 AWS 控制台中管理。

队列处理器超时

Artisan 命令 queue:work 提供了一个 --timeout 选项。--timeout 指定了在杀死处理队列的队列处理器子进程前,Laravel 队列主进程等待的时间。有时一个队列子进程由于各种原因会「冻结」,例如一个外部 HTTP 调用没有响应。--timeout 选项会结束那些运行超过指定时间而被冻结的进程:

php artisan queue:work --timeout=60

retry_after 配置项和 --timeout CLI 选项是不同的,但是一起工作确保任务不会丢失并且任务只会被成功执行一次。

--timeout 值应该总是比 retry_after 配置值至少少几秒。以确保队列处理器处理给定任务时总在任务重试前被结束掉。如果 --timeout 选项比 retry_after 配置值还长,任务可能执行两次。

队列处理器休眠周期

当队列中有可用任务时,队列处理器会无间隔地持续处理任务。但是,sleep 选项决定了没有可用的新任务时队列处理器会「休眠」多长时间(秒)。当休眠时,队列处理器不会处理任何新任务 —— 任务会在队列处理器再次醒来后处理。

php artisan queue:work --sleep=3

Supervisor 配置

安装 Supervisor

Supervisor 是一个 Linux 下的进程管理器,它会在 queue:work 进程运行失败后自动重启进程。要在 Ubuntu 上安装 Supervisor,可以使用以下命令:

sudo apt-get install supervisor

如果自己配置 Supervisor 听起来有点难以应付,可以考虑使用 Laravel Forge,它会为 Laravel 项目自动安装并配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录。 在此目录下,可以创建任意数量的配置文件指示 Supervisor 应该如何管理进程。例如,我们创建一个 laravel-worker.conf 文件启动并管理 queue:work 进程:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在此示例中,numprocs 指令会告诉 Supervisor 运行 8 个 queue:work 进程并管理它们,当它们进程运行失败时自动重启它们。当然,应该将 command 选项中的 queue:work sqs 部分更改为您希望的队列连接。

启动 Supervisor

配置文件创建后,可以使用下面的命令更新 Supervisor 配置文件并启动进程:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

有关 Supervisor 的更多信息,可以查阅 Supervisor 文档

处理失败任务

有时候队列任务会失败。别担心,事情不总是按计划进行!Laravel 包含一个便捷的方式指定任务应被尝试的最多次数。当任务运行达到了最大尝试次数后,它就会被插入到 failed_jobs 数据表中。要为 failed_jobs 表创建迁移文件,可以使用 queue:failed-table 命令:

php artisan queue:failed-table

php artisan migrate

然后,运行 队列处理器 时,应该在 queue:work 上使用 --tries 选项指定任务应该尝试的最大次数。如果没有为 --tries 指定值,任务将会被无限尝试:

php artisan queue:work redis --tries=3

清除失败任务

可以在任务类中定义一个 failed 方法,允许任务失败后执行指定的清除操作。这是一个给用户发送提醒或撤回任何任务执行的操作的完美位置。导致任务失败的 Exception 会被传递到 failed 方法:

namespace App\Jobs;

use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建新的任务实例
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行任务
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客
    }

    /**
     * 任务处理失败
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // 给用户发送失败通知等等
    }
}

失败任务事件

如果想注册一个在任务失败时调用的事件,可以使用 Queue:failing 方法。此事件是通过邮件或 Stride 通知团队的绝好时机。例如,我们可以在 Laravel 的 AppServiceProvider 中添加一个回调到此事件中:

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 启动任何应用服务
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }

    /**
     * 注册服务提供者
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

重试失败任务

要查看所有被插入到 failed_jobs 数据表的任务,可以使用 Artisan 命令 queue:failed

php artisan queue:failed

queue:failed 命令会列出任务 ID,连接,队列和失败时间。任务 ID 可用于重试失败的任务。例如,要重试一个任务 ID 为 5 的任务,使用如下命令:

php artisan queue:retry 5

要重试所有失败的任务,可以执行 queue:retry 命令并将 all 作为 ID 传入:

php artisan queue:retry all

如果想删除一个失败的任务,可以使用 queue:forget 命令:

php artisan queue:forget 5

要清空所有失败的任务,可以使用 queue:flush 命令:

php artisan queue:flush

任务事件

使用 Queue Facadebeforeafter 方法,可以指定队列任务被执行前后执行的回调。这些回调是记录额外的日志或增加后台统计的绝好时机。通常情况下,应该在 服务提供者 中调用这些方法。例如,我们可以使用 Laravel 的 AppServiceProvider

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 启动任何应用服务
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

    /**
     * 注册服务提供者
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

使用 Queue Facadelooping 方法,可以指定队列处理器尝试从队列中获取任务之前执行的回调。例如,可以注册一个闭包回滚任何之前失败的任务开启的事务:

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});