Laravel 队列

老牛浏览 454评论 0发表于

1. 简介

Laravel 队列为不同的队列后台服务(驱动)提供了统一的 API,比如 Beanstalk,Amazon SQS,Redis,甚至是关系型数据库。队列可以让一些耗时的任务延迟处理,比如发送邮件。延迟这些耗时的任务会大幅度提高应用对 Web 请求的处理速度。

2. 连接 Vs. 队列

在使用 Laravel 的队列之前,尤其要搞懂「队列」和「连接」的区别。在 config/queue.php 配置中,有一个 connections 选项。该选项定义了一些特定的后台服务,例如 Amazon SQS,Beanstalk 或 Redis。任何一个连接都可能会有多个「队列」用于提供不同的任务堆栈。

注意「队列」配置文件中每一个连接配置中都包含一个 queue 属性。这是默认的队列,当任务被分发给选定的连接时就会进入该队列。也就是说,如果你没有明确定义任务将会被分发到哪个队列,它就会被分发到对应连接的 queue 属性配置中定义的队列。

php
// 这个任务将会被分发到默认队列
Job::dispatch();

// 这个任务将会被分发到 「emails」队列
Job::dispatch()->onQueue('emails');

一些应用也许不需要把任务推送到多个队列,只分发到一个简单的队列就可以了。然而,当要优先处理某些任务或是对其进行分类时队列就非常有用了,因为 Laravel 队列处理器可以指定队列的优先级。例如,如果将任务分发到 high 队列,队列处理器会优先处理这些任务。

bash
php artisan queue:work --queue=high,default

3. 驱动设置

3.1 Database

使用 database 队列驱动,需要创建一张数据表来存储任务。

bash
php artisan queue:table

php artisan migrate

3.2 Redis

使用 Redis 作为队列驱动时,需要在配置文件 config/database.php 中配置 Redis 的连接信息。

3.3 依赖的扩展包

在使用列表里的队列服务之前,必须安装以下依赖扩展包:

  • Amazon SQS : aws/aws-sdk-php ~3.0

  • Beanstalkd : pda/pheanstalk ~3.0

  • Redis : predis/predis ~1.0

4. 创建任务

4.1 生成任务类

在应用程序中,队列的任务类都默认放在 app/Jobs 目录下。创建一个新的队列任务:

bash
php artisan make:job ProcessPodcast

生成的类实现了 Illuminate\Contracts\Queue\ShouldQueue 接口,意味着该任务会被推送到队列中,而不是同步执行。

4.2 任务类结构

任务类的结构很简单,一般来说只有一个队列调用此任务时执行的 handle 方法。下面的示例中,假设我们管理着一个播客发布服务,在发布之前需要处理上传播客文件:

php
<?php

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的任务实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 运行任务。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // Process uploaded podcast...
    }
}

上述示例,我们在任务类的构造函数中直接传递了一个 Eloquent 模型。因为任务处理类中引用了 SerializesModels trait,使得 Eloquent 模型在处理任务时可以被优雅地序列化和反序列化。如果队列任务类在构造器中接收了一个 Eloquent 模型,那么只有可识别出该模型的属性(比如主键 ID)会被实例化到队列里。当任务被实际运行时,队列系统便会自动从数据库中重新取回完整的模型。整个过程对于应用程序来说是完全透明的,这样可以避免序列化完整的 Eloquent 模型实例时带来的一些问题。

在队列处理任务时,会调用 handle 方法,这里我们也可以通过 handle 方法的参数提示,让 Laravel 的服务容器自动注入依赖对象。

像图片这样的二进制数据,在放入队列任务之前必须使用 base64_encode 方法转换一下。否则,当该任务被放置到队列中时,可能无法正确序列化为 JSON。

4.3 分发任务

一旦写好了任务类,就可以使用它自带的 dispatch 方法进行分发。传递给 dispatch 方法的参数将会被传递给任务的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客节目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast);
    }
}

延迟分发

如果想延迟执行队列任务,可以在分发任务时使用 delay 方法。例如,指定一个十分钟后才会执行的任务:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客节目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)
                ->delay(now()->addMinutes(10));
    }
}

Amazon SQS 队列服务最大延迟 15 分钟的时间。

工作链

工作链可以定义一个按顺序执行的队列任务列表。一旦列表中的任务失败了,剩余的任务将不会执行。

php
ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch();
工作链连接和队列
php
ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

自定义连接和队列

分发任务到指定队列
php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客节目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
}
分发任务到指定连接
php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新播客节目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
}

也可以链式调用同时指定连接和队列:

php
ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

指定最大任务尝试次数/超时时间

最大尝试次数

通过 --tries 选项指定:

bash
php artisan queue:word --tries=3

如果最大尝试次数是在任务类中定义的,它将优先于命令行中的值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务可以尝试的最大次数。
     *
     * @var int
     */
    public $tries = 5;
}
基于时间的尝试

在给定的时间范围内,任务可以无限次尝试。定义任务的超时时间:

php
/**
 * 定义任务超时时间
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addSeconds(5);
}

也可以在队列事件监听器中使用 retryUntil 方法。

超时

任务执行最大秒数可以通过 --timeout 选项指定:

bash
php artisan queue:work --timeout=30

同样,也可以在任务类中指定,优先级会高于命令行:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 超时时间。
     *
     * @var int
     */
    public $timeout = 120;
}

频率限制

如果应用使用了 Redis,可以通过时间或并发限制任务队列。特别对于有速率限制的 API 使用时,会很有用。例如,使用 throttle 方法,可以限制一个指定类型的任务每 60 秒只执行 10 次。如果没有获得锁,一般情况下应该将任务放回队列使其可以在稍后尝试。

php
Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // 任务逻辑...
}, function () {
    // 无法获得锁...

    return $this->release(10);
});

key 可以是任何你想要限制的任务类型的唯一标识符。例如,基于任务类名或者 Eloquent 模型的 ID。

或者,指定一个任务可以同时执行的最大数量。

php
Redis::funnel('key')->limit(1)->then(function () {
    // 任务逻辑...
}, function () {
    // 无法获得锁...

    return $this->release(10);
});

错误处理

如果任务执行过程中出现异常,任务将会被自动释放到队列中等待再次尝试,直到达到允许的最大重试次数。

5. 运行队列处理器

Laravel 包含了一个队列处理器,它用来执行推送到队列中的任务。可以使用 queue:work 运行处理器。一旦开始执行,会一直运行直到被手动停止或终端被关闭。

bash
php artisan queue:work

要使队列处理器一直在后台运行,可以使用进程管理器(如 Supervisor)来确保队列处理器不会停止运行。

队列处理器是一个常驻的进程,并在内存中保存着已经启动的应用状态。因此,不会在启动后注意到代码的更改。所以,在重新部署过程中,需要重启队列处理器。

5.1 执行单一任务

--once 选项用于使队列处理器只处理队列中的单个任务。

php
php artisan queue:work --once

5.2 指定连接和队列

可以具体指定队列处理器应该使用哪个队列连接。传递给 work 的连接名称应该和 config/queue.php 配置文件中定义的连接符之一相符。

bash
php artisan queue:work redis

还可以指定队列处理器使其只执行连接中指定的队列。例如,如果所有邮件都由 redis 连接的 emails
队列处理,可以使用如下命令启动一个仅执行此队列的处理器:

bash
php artisan queue:work redis --queue=emails

5.3 资源注意事项

后台驻留的队列处理器不会在执行完每个任务后「重启」框架。因此,应该在每个任务完成后释放任何占用过大的资源。例如,如果正在使用 GD 库执行图像处理,应该在完成之后使用 imagedestroy 释放内存。

5.4 队列优先级

可以在配置文件中修改队列执行的优先顺序。例如在 config/queue.php 中将 redis 连接的 queue 队列的优先级从 default 设置成 low。也可以将一个任务推送到 high 队列:

bash
dispatch((new Job)->onQueue('high'));

要运行一个处理器来确保 low 队列中的任务在 high 队列中所有任务完成后才继续执行,可以传递一个逗号分隔的队列名称列表作为 work 命令的参数。

bash
php artisan queue:work --queue=high,low

5.5 队列处理器及部署

因为队列处理器是常驻进程,它们在重启前不会应用代码的更改。因此,部署应用最简单的方法是在部署进程中重启队列处理器。可以通过下面的命令,平滑地重启所有队列处理器:

bash
php artisan queue:restart

该命令会引导所有的队列处理器在完成当前任务后平滑「中止」,这样不会有丢失的任务。同时还应该运行一个进程管理器(如 Supervisor)来自动重启队列处理器。

队列使用缓存来保存重启信号,所以应该事先配置好缓存驱动。

6. Supervisor 配置

6.1 安装 Supervisor

Supervisor 是一个 Linux 下的进程管理器,它会在 queue:work 进程关闭后自动重启。要在 CentOS 下安装 Supervisor,可以使用以下命令:

bash
yum install supervisor

6.2 配置 Supervisor

一般 Supervisor 的配置文件存储在 /etc/supervisord.d 目录。在这个目录下,你可以创建任意数量的配置文件控制 Supervisor 对于进程的管理。例如,我们创建一个 laravel-worker.conf 文件开始管理 queue:work 进程:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在这个示例中,numprocs 告诉 Supervisor 运行 8 个 queue:work 进程并且管理它们,当它们关闭时会将其自动重启。当然,你应该将 command 选项中的 queue:work sqs 部分修改为你的队列连接。

6.3 启动 Supervisor

当配置文件创建好后,可以使用下面的命令更新 Supervisor 配置文件并且启动进程:

bash
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

更多关于 Supervisor 的信息,可以查询 Supervisor 文档

7. 处理失败的任务

有时候队列任务会失败。Laravel 包含一个便捷的方式指定任务会被最多尝试的次数。在一个任务到达了它的最大尝试次数后,就会被放入 failed_jobs 表。可以使用下面的命令创建:

bash
php artisan queue:failed-table

php artisan migrate

然后运行队列处理器,这时应该使用 --tries 选项。如果没有指定 --tries 值,任务将会被尝试无数次。

bash
php artisan queue:work redis --tries=3

7.1 清除失败的任务

可以在任务类中定义一个 failed 方法,它允许你在一个任务失败后清除它。这是一个提醒用户或撤回任何任务做出的修改的绝佳时机。导致任务失败的 Exception 会被传入到 failed 方法:

php
<?php

namespace App\Jobs;

use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的任务实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行任务。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 执行上传播客...
    }

    /**
     * 执行失败的任务。
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // 给用户发送失败的通知等等...
    }
}

7.2 任务失败事件

如果想注册一个在任务失败时调用的事件,可以使用 Queue:failing 方法。这是通过 emails 或 Stride 通知团队的绝佳时机。例如,我们可以从 Laravel 中的 AppServiceProvider 注册一个此事件的回调。

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 启动任意服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }

    /**
     * 注册服务提供者。
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

7.3 重试失败的任务

要查看所有被放入 failed_jobs 数据表中的任务,可以使用 queue:failed 命令:

bash
php artisan queue:failed

queue:failed 命令会列出任务 ID,队列,以及失败的事件。任务 ID 可能会被用于重试失败的任务。例如,要重试一个任务 ID 为 5 的任务,使用如下命令:

bash
php artisan queue:retry 5

要重试所有失败的任务,执行 queue:retry 命令,将 all 作为 ID 传入:

bash
php artisan queue:retry all

如果想删除一个失败的任务,使用 queue:forget 命令:

bash
php artisan queue:forget 5

要清空所有失败的任务,使用命令:

bash
php artisan queue:flush

8. 任务事件

通过在队列 facade 中使用 beforeafter 方法,指定一个队列任务被执行前后的回调。这些回调是添加额外的日志或增加统计的绝好时机。通常,你应该在服务提供者中调用这些方法。例如,我们可以使用 Laravel 的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 引导启动任意应用服务
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

    /**
     *  注册服务提供者
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

在队列 facade 使用 looping 方法可以在处理器尝试获取任务之前执行回调。例如,使用一个闭包来回滚之前失败的任务尚未关闭的事务。

php
Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});
点赞
收藏
暂无评论,快来发表评论吧~