The Queue plugin provides an easy-to-use interface for the php-queue project, which abstracts dozens of queuing backends for use within your application. Queues can be used to increase the performance of your application by deferring long-running processes - such as email or notification sending - until a later time.
You can install this plugin into your CakePHP application using composer.
The recommended way to install composer packages is:
composer require cakephp/queue
Install the transport you wish to use. For a list of available transports, see this page. The example below is for pure-php redis:
composer require enqueue/redis predis/predis:^1
Ensure that the plugin is loaded in your src/Application.php
file, within
the Application::bootstrap()
function:
$this->addPlugin('Cake/Queue');
The following configuration should be present in the config array of your config/app.php:
// ...
'Queue' => [
'default' => [
// A DSN for your configured backend. default: null
// Can contain protocol/port/username/password or be null if the backend defaults to localhost
'url' => 'redis://myusername:[email protected]:1000',
// The queue that will be used for sending messages. default: default
// This can be overridden when queuing or processing messages
'queue' => 'default',
// The name of a configured logger, default: null
'logger' => 'stdout',
// The name of an event listener class to associate with the worker
'listener' => \App\Listener\WorkerListener::class,
// The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
'receiveTimeout' => 10000,
// Whether to store failed jobs in the queue_failed_jobs table. default: false
'storeFailedJobs' => true,
// (optional) The cache configuration for storing unique job ids. `duration`
// should be greater than the maximum length of time any job can be expected
// to remain on the queue. Otherwise, duplicate jobs may be
// possible. Defaults to +24 hours. Note that `File` engine is only suitable
// for local development.
// See https://book.cakephp.org/4/en/core-libraries/caching.html#configuring-cache-engines.
'uniqueCache' => [
'engine' => 'File',
],
]
],
// ...
The Queue
config key can contain one or more queue configurations. Each of
these is used for interacting with a different queuing backend.
If storeFailedJobs
is set to true
, make sure to run the plugin migrations to create the queue_failed_jobs
table.
Install the migrations plugin:
composer require cakephp/migrations:"^3.1"
Run the migrations:
bin/cake migrations migrate --plugin Cake/Queue
Workloads are defined as ‘jobs’. Job classes can recieve dependencies from your application’s dependency injection container in their constructor just like Controllers or Commands. Jobs are responsible for processing queue messages. A simple job that logs received messages would look like:
<?php
// src/Job/ExampleJob.php
declare(strict_types=1);
namespace App\Job;
use Cake\Log\LogTrait;
use Cake\Queue\Job\Message;
use Cake\Queue\Job\JobInterface;
use Interop\Queue\Processor;
class ExampleJob implements JobInterface
{
use LogTrait;
/**
* The maximum number of times the job may be attempted. (optional property)
*
* @var int|null
*/
public static $maxAttempts = 3;
/**
* Whether there should be only one instance of a job on the queue at a time. (optional property)
*
* @var bool
*/
public static $shouldBeUnique = false;
public function execute(Message $message): ?string
{
$id = $message->getArgument('id');
$data = $message->getArgument('data');
$this->log(sprintf('%d %s', $id, $data));
return Processor::ACK;
}
}
The passed Message
object has the following methods:
getArgument($key = null, $default = null)
: Can return the entire passed
dataset or a value based on a Hash::get()
notation key.
getContext()
: Returns the original context object.
getOriginalMessage()
: Returns the original queue message object.
getParsedBody()
: Returns the parsed queue message body.
A job may return any of the following values:
Processor::ACK
: Use this constant when the message is processed
successfully. The message will be removed from the queue.
Processor::REJECT
: Use this constant when the message could not be
processed. The message will be removed from the queue.
Processor::REQUEUE
: Use this constant when the message is not valid or
could not be processed right now but we can try again later. The original
message is removed from the queue but a copy is published to the queue again.
The job may also return a null value, which is interpreted as
Processor::ACK
. Failure to respond with a valid type will result in an
interpreted message failure and requeue of the message.
Job Properties:
maxAttempts
: The maximum number of times the job may be requeued as a result
of an exception or by explicitly returning Processor::REQUEUE
. If
provided, this value will override the value provided in the worker command
line option --max-attempts
. If a value is not provided by the job or by
the command line option, the job may be requeued an infinite number of times.
shouldBeUnique
: If true
, only one instance of the job, identified by
it’s class, method, and data, will be allowed to be present on the queue at a
time. Subsequent pushes will be silently dropped. This is useful for
idempotent operations where consecutive job executions have no benefit. For
example, refreshing calculated data. If true
, the uniqueCache
configuration must be set.
You can enqueue jobs using Cake\Queue\QueueManager
:
use App\Job\ExampleJob;
use Cake\Queue\QueueManager;
$data = ['id' => 7, 'is_premium' => true];
$options = ['config' => 'default'];
QueueManager::push(ExampleJob::class, $data, $options);
Arguments:
$className
: The class that will have it’s execute method invoked when the
job is processed.
$data
(optional): A json-serializable array of data that will be
passed to your job as a message. It should be key-value pairs.
$options
(optional): An array of optional data for message queueing.
The following keys are valid for use within the options
array:
config
:
default: default
description: A queue config name
type: string
delay
:
default: null
description: Time - in integer seconds - to delay message, after which it will be processed. Not all message brokers accept this.
type: integer
expires
:
default: null
description: Time - in integer seconds - after which the message expires. The message will be removed from the queue if this time is exceeded and it has not been consumed.
type: integer
priority
:
default: null
type: constant
valid values:
\Enqueue\Client\MessagePriority::VERY_LOW
\Enqueue\Client\MessagePriority::LOW
\Enqueue\Client\MessagePriority::NORMAL
\Enqueue\Client\MessagePriority::HIGH
\Enqueue\Client\MessagePriority::VERY_HIGH
queue
:
default: from queue config
array or string default
if empty
description: The name of a queue to use
type: string
Mailer actions can be queued by adding the Queue\Mailer\QueueTrait
to the
mailer class. The following example shows how to setup the trait within a mailer
class:
<?php
declare(strict_types=1);
namespace App\Mailer;
use Cake\Mailer\Mailer;
use Cake\Queue\Mailer\QueueTrait;
class UserMailer extends Mailer
{
use QueueTrait;
public function welcome(string $emailAddress, string $username): void
{
$this
->setTo($emailAddress)
->setSubject(sprintf('Welcome %s', $username));
}
// ... other actions here ...
}
It is now possible to use the UserMailer
to send out user-related emails in
a delayed fashion from anywhere in our application. To queue the mailer action,
use the push()
method on a mailer instance:
$this->getMailer('User')->push('welcome', ['[email protected]', 'josegonzalez']);
This QueueTrait::push()
call will generate an intermediate MailerJob
that handles processing of the email message. If the MailerJob is unable to
instantiate the Email or Mailer instances, it is interpreted as
a Processor::REJECT
. An invalid action
is also interpreted as
a Processor::REJECT
, as will the action throwing
a BadMethodCallException
. Any non-exception result will be seen as
a Processor:ACK
.
The exposed QueueTrait::push()
method has a similar signature to
Mailer::send()
, and also supports an $options
array argument. The
options this array holds are the same options as those available for
QueueManager::push()
.
If your application isn’t using Mailers but you still want to deliver email via
queue jobs, you can use the QueueTransport
. In your application’s
EmailTransport
configuration add a transport:
// in app/config.php
use Cake\Queue\Mailer\Transport\QueueTransport;
return [
// ... other configuration
'EmailTransport' => [
'default' => [
'className' => MailTransport::class,
// Configuration for MailTransport.
]
'queue' => [
'className' => QueueTransport::class,
// The transport name to use inside the queue job.
'transport' => 'default',
]
],
'Email' => [
'default' => [
// Connect the default email profile to deliver
// by queue jobs.
'transport' => 'queue',
]
]
];
With this configuration in place, any time you send an email with the default
email profile CakePHP will generate a queue message. Once that queue message is
processed the default MailTransport
will be used to deliver the email messages.
Once a message is queued, you may run a worker via the included queue worker
shell:
bin/cake queue worker
This shell can take a few different options:
--config
(default: default): Name of a queue config to use
--queue
(default: default): Name of queue to bind to
--processor
(default: null
): Name of processor to bind to
--logger
(default: stdout
): Name of a configured logger
--max-jobs
(default: null
): Maximum number of jobs to process. Worker will exit after limit is reached.
--max-runtime
(default: null
): Maximum number of seconds to run. Worker will exit after limit is reached.
--max-attempts
(default: null
): Maximum number of times each job will be attempted. Maximum attempts defined on a job will override this value.
--verbose
or -v
(default: null
): Provide verbose output, displaying the current values for:
Max Iterations
Max Runtime
Runtime: Time since the worker started, the worker will finish when Runtime is over Max Runtime value
By default, jobs that throw an exception are requeued indefinitely. However, if
maxAttempts
is configured on the job class or via a command line argument, a
job will be considered failed if a Processor::REQUEUE
response is received
after processing (typically due to an exception being thrown) and there are no
remaining attempts. The job will then be rejected and added to the
queue_failed_jobs
table and can be requeued manually.
Your chosen transport may offer a dead-letter queue feature. While Failed Jobs
has a similar purpose, it specifically captures jobs that return a
Processor::REQUEUE
response and does not handle other failure cases. It is
agnostic of transport and only supports database persistence.
The following options passed when originally queueing the job will be preserved:
config
, queue
, and priority
.
Push jobs back onto the queue and remove them from the queue_failed_jobs
table. If a job fails to requeue it is not guaranteed that the job was not run.
bin/cake queue requeue
Optional filters:
--id
: Requeue job by the ID of the FailedJob
--class
: Requeue jobs by the job class
--queue
: Requeue jobs by the queue the job was received on
--config
: Requeue jobs by the config used to queue the job
If no filters are provided then all failed jobs will be requeued.
Delete jobs from the queue_failed_jobs
table.
bin/cake queue purge_failed
Optional filters:
--id
: Purge job by the ID of the FailedJob
--class
: Purge jobs by the job class
--queue
: Purge jobs by the queue the job was received on
--config
: Purge jobs by the config used to queue the job
If no filters are provided then all failed jobs will be purged.
The worker shell may invoke the events during normal execution. These events may
be listened to by the associated listener
in the Queue config.
Processor.message.exception
:
description: Dispatched when a message throws an exception.
arguments: message
and exception
Processor.message.invalid
:
description: Dispatched when a message has an invalid callable.
arguments: message
Processor.message.reject
:
description: Dispatched when a message completes and is to be rejected.
arguments: message
Processor.message.success
:
description: Dispatched when a message completes and is to be acknowledged.
arguments: message
Processor.message.failure
:
description: Dispatched when a message completes and is to be requeued.
arguments: message
Processor.message.seen
:
description: Dispatched when a message is seen.
arguments: message
Processor.message.start
:
description: Dispatched before a message is started.
arguments: message