I’ve been working on a side project which uses cakephp/queue for logging http requests to a database. The job is dispatched as part of a middleware, using redis as its store and I am currently just executing this using the bin/cake queue worker
command inside a docker container. Eventually I will setup as a systemd service or beanstalkd, haven’t decided which yet. This all works locally except for the following issues:
- Occasionally I encounter this exception when running the queue worker command:
2023-02-04 17:18:48 error: [LogicException] Processor was not found. processorName: "Cake\Queue\Queue\Processor63d9a9ce7b3dd" in /srv/app/vendor/enqueue/enqueue/ArrayProcessorRegistry.php on line 33
Stack Trace:
- /srv/app/vendor/enqueue/enqueue/Client/DelegateProcessor.php:38
- /srv/app/vendor/enqueue/enqueue/Consumption/QueueConsumer.php:197
- [internal]:??
- /srv/app/vendor/enqueue/redis/RedisSubscriptionConsumer.php:75
- /srv/app/vendor/enqueue/enqueue/Consumption/QueueConsumer.php:264
- /srv/app/vendor/enqueue/simple-client/SimpleClient.php:209
- /srv/app/vendor/cakephp/queue/src/Command/WorkerCommand.php:204
- /srv/app/vendor/cakephp/cakephp/src/Console/BaseCommand.php:189
- /srv/app/vendor/cakephp/cakephp/src/Console/CommandRunner.php:334
- /srv/app/vendor/cakephp/cakephp/src/Console/CommandRunner.php:172
- /srv/app/bin/cake.php:12
- I have implemented a
Queue.listener
which writes errors and exceptions to error log and everything else to info. I am getting double logs for all events. Example:
2023-02-04 17:25:26 info: Job Queue Processor.message.seen in App\Job\LogUserRequestJob
2023-02-04 17:25:26 info: Job Queue Processor.message.seen in App\Job\LogUserRequestJob
2023-02-04 17:25:26 info: Job Queue Processor.message.start in App\Job\LogUserRequestJob
2023-02-04 17:25:26 info: Job Queue Processor.message.start in App\Job\LogUserRequestJob
My event listener is nothing special with all methods looking similar to the snippet below:
<?php
declare(strict_types=1);
namespace App\Event;
use App\Service\Logger;
use Cake\Event\Event;
use Cake\Queue\Job\Message;
use Cake\Event\EventListenerInterface;
use Psr\Log\LoggerInterface;
class QueueListener implements EventListenerInterface
{
private LoggerInterface $logger;
public function __construct(?LoggerInterface $logger = null)
{
$this->logger = $logger ?? new Logger();
}
public function implementedEvents(): array
{
return [
'Processor.message.exception' => 'handleException',
'Processor.message.invalid' => 'handleError',
'Processor.message.reject' => 'handleError',
'Processor.message.success' => 'handleSuccess',
'Processor.maxIterations' => 'handleError',
'Processor.maxRuntime' => 'handleError',
'Processor.message.failure' => 'handleError',
'Processor.message.seen' => 'handleInfo',
'Processor.message.start' => 'handleInfo',
];
}
public function handleException(Event $event, Message $message, \Throwable $e): void
{
$this->logger->error(
sprintf(
"Job Queue Exception in %s: %s - %s \n%s",
$this->getJobFromMessage($message) ?? 'unknown job',
get_class($e),
$e->getMessage(),
$e->getTraceAsString()
)
);
}
public function handleError(Event $event, Message $message): void
{
$this->logger->error(
sprintf(
"Job Queue Error %s in %s with Message: \n%s",
$event->getName(),
$this->getJobFromMessage($message) ?? 'unknown job',
print_r($message->jsonSerialize(), true)
)
);
}
public function handleSuccess(Event $event, Message $message): void
{
$body = $message->getParsedBody();
$this->logger->info(
sprintf(
"Job Queue Success in %s with Message:\n%s",
$this->getJobFromMessage($message) ?? 'unknown job',
print_r($body['data'] ?? $body, true)
)
);
}
public function handleInfo(Event $event, $message): void
{
$this->logger->info(
sprintf(
"Job Queue %s in %s",
$event->getName(),
$this->getJobFromMessage($message) ?? 'unknown job'
)
);
}
private function getJobFromMessage(Object $message): ?string
{
if ($message instanceof \Interop\Queue\Message) {
$body = json_decode($message->getBody(), true);
} else if ($message instanceof Message) {
$body = $message->getParsedBody();
}
if (isset($body, $body['class'][0])) {
return $body['class'][0];
}
return null;
}
}
Using my debugger. I have verified the job is not being called twice and I am not seeing duplicate records.