Issues with cakephp/queue

I’ve been working on a side project which uses cakephp/queue for logging http requests to a database. The job is dispatched as part of a middleware, using redis as its store and I am currently just executing this using the bin/cake queue worker command inside a docker container. Eventually I will setup as a systemd service or beanstalkd, haven’t decided which yet. This all works locally except for the following issues:

  1. Occasionally I encounter this exception when running the queue worker command:
2023-02-04 17:18:48 error: [LogicException] Processor was not found. processorName: "Cake\Queue\Queue\Processor63d9a9ce7b3dd" in /srv/app/vendor/enqueue/enqueue/ArrayProcessorRegistry.php on line 33
Stack Trace:
- /srv/app/vendor/enqueue/enqueue/Client/DelegateProcessor.php:38
- /srv/app/vendor/enqueue/enqueue/Consumption/QueueConsumer.php:197
- [internal]:??
- /srv/app/vendor/enqueue/redis/RedisSubscriptionConsumer.php:75
- /srv/app/vendor/enqueue/enqueue/Consumption/QueueConsumer.php:264
- /srv/app/vendor/enqueue/simple-client/SimpleClient.php:209
- /srv/app/vendor/cakephp/queue/src/Command/WorkerCommand.php:204
- /srv/app/vendor/cakephp/cakephp/src/Console/BaseCommand.php:189
- /srv/app/vendor/cakephp/cakephp/src/Console/CommandRunner.php:334
- /srv/app/vendor/cakephp/cakephp/src/Console/CommandRunner.php:172
- /srv/app/bin/cake.php:12
  1. I have implemented a Queue.listener which writes errors and exceptions to error log and everything else to info. I am getting double logs for all events. Example:
2023-02-04 17:25:26 info: Job Queue Processor.message.seen in App\Job\LogUserRequestJob
2023-02-04 17:25:26 info: Job Queue Processor.message.seen in App\Job\LogUserRequestJob
2023-02-04 17:25:26 info: Job Queue Processor.message.start in App\Job\LogUserRequestJob
2023-02-04 17:25:26 info: Job Queue Processor.message.start in App\Job\LogUserRequestJob

My event listener is nothing special with all methods looking similar to the snippet below:

<?php
declare(strict_types=1);

namespace App\Event;

use App\Service\Logger;
use Cake\Event\Event;
use Cake\Queue\Job\Message;
use Cake\Event\EventListenerInterface;
use Psr\Log\LoggerInterface;

class QueueListener implements EventListenerInterface
{
    private LoggerInterface $logger;

    public function __construct(?LoggerInterface $logger = null)
    {
        $this->logger = $logger ?? new Logger();
    }

    public function implementedEvents(): array
    {
        return [
            'Processor.message.exception' => 'handleException',
            'Processor.message.invalid' => 'handleError',
            'Processor.message.reject' => 'handleError',
            'Processor.message.success' => 'handleSuccess',
            'Processor.maxIterations' => 'handleError',
            'Processor.maxRuntime' => 'handleError',
            'Processor.message.failure' => 'handleError',
            'Processor.message.seen' => 'handleInfo',
            'Processor.message.start' => 'handleInfo',
        ];
    }

    public function handleException(Event $event, Message $message, \Throwable $e): void
    {
        $this->logger->error(
            sprintf(
                "Job Queue Exception in %s: %s - %s \n%s",
                $this->getJobFromMessage($message) ?? 'unknown job',
                get_class($e),
                $e->getMessage(),
                $e->getTraceAsString()
            )
        );
    }

    public function handleError(Event $event, Message $message): void
    {
        $this->logger->error(
            sprintf(
                "Job Queue Error %s in %s with Message: \n%s",
                $event->getName(),
                $this->getJobFromMessage($message) ?? 'unknown job',
                print_r($message->jsonSerialize(), true)
            )
        );
    }

    public function handleSuccess(Event $event, Message $message): void
    {
        $body = $message->getParsedBody();
        $this->logger->info(
            sprintf(
                "Job Queue Success in %s with Message:\n%s",
                $this->getJobFromMessage($message) ?? 'unknown job',
                print_r($body['data'] ?? $body, true)
            )
        );
    }

    public function handleInfo(Event $event, $message): void
    {
        $this->logger->info(
            sprintf(
                "Job Queue %s in %s",
                $event->getName(),
                $this->getJobFromMessage($message) ?? 'unknown job'
            )
        );
    }

    private function getJobFromMessage(Object $message): ?string
    {
        if ($message instanceof \Interop\Queue\Message) {
            $body = json_decode($message->getBody(), true);
        } else if ($message instanceof Message) {
            $body = $message->getParsedBody();
        }
        if (isset($body, $body['class'][0])) {
            return $body['class'][0];
        }
        return null;
    }
}

Using my debugger. I have verified the job is not being called twice and I am not seeing duplicate records.

Your first issue sounds to me like Error: [LogicException] Processor was not found. processorName: "Cake\Queue\Queue\Processor602cdd43961f4" · Issue #50 · cakephp/queue · GitHub

which should be resolvable by using the --processor CLI option for the worker.

About your second issue: Does the event maybe get dispatched twice? Can you check the stacktrace if the event maybe gets emitted somewhere else as well?

Regarding --processor I am not sure I understand the usage or the context of its importance. Can any arbitrary name be used here?

For the second issue, I have code which will automatically load all classes implementing EventListenerInterface. Since the plugin loads this as well, thats why. Finally placed a debugger on the constructor to suss that out.

It should be fine if you use any name

bin/cake queue worker --processor myprocessor

It basically just gets passed to the enqueue/simple-client library as you can see here:

queue/WorkerCommand.php at master · cakephp/queue · GitHub

And that library just falls back to a auto generated processor name if none is given (which for some reason creates problems - can’t really tell why)

But looking at Processor was not found. processorName: "Enqueue\Consumption\CallbackProcessor5c9cae7d01f6a" · Issue #807 · php-enqueue/enqueue-dev · GitHub it seems to be a general problem with the PHP Enqueue library.

Maybe completely resetting your queue also helps (so like remove all jobs which have already been pushed to the queue)

Odd behavior… I’ll run with that for a bit and see if the problem goes away.