How to run multiple jobs at the same time in Cake/Queue Plugin

Hello everyone,

I am implementing the Cake/Queue plugin and I wanted to know if it was possible to launch a worker with a --max-jobs 10 without running the same Job several times.

I have to run thousands of tasks that could take months if done one after the other, so I’m looking for a way to run several of them at the same time.

To give an example, each task must translate a title into 12 languages; the task must not be executed several times on the same title.

Thanks for your help.

if I understand the nature your question, the example code in the documentation for Queuing Jobs might be modified to prevent duplicates from being submitted.

You would just have to figure out how you wanted to maintain a list of movies and language translations that had been done, and check to insure they were never submitted again.

$data = ['id' => 7, 'title' => 'It', 'date' => '2013', 'language' => 'fr'];
$options = ['config' => 'default'];

if ($this->CompletedTranslationManager->translationHasNotBeenDone($data) {
    QueueManager::push(ExampleJob::class, $data, $options);
}
else {
   SomeLogger::logTheDuplicateAttempt($data);
}

Hello @dreamingmind,

Thank you for your answer.

This isn’t exactly what I’m looking for, it’s quite the opposite.
Here is a small code snippet for enqueue jobs.

/**
 * We are only looking for untranslated videos.
 */
while ($videos = $this->whileQuery($query, $args, $identifier, ['limit' => $limit])) {
    foreach ($videos as $video) {
        $data = ['id' => $video->id];
        $options = ['config' => 'default'];
        
        QueueManager::push(VideosTranslateJob::class, $data, $options);
    }
}

Now that all the jobs are in the queue, I want to process them but several at a time.

bin/cake queue worker

When I run this command, I have a problem, the jobs are executed one by one.

Can we define several workers at the same time but without executing the same job several times ?

Thanks for your help.

So this is what you are asking?

If so, I wonder if Cake/Queue is actually designed for that. Since a queue is defined as a first-in-first-out sequence, I’m not sure if it allows simultaneous processing.

Possibly your looking for something based on the PHP Thread Class as discussed in this section of the PHP Manual.

I don’t see any discussion of this kind of behavior in this system or in the underlying php-queue project. Possibly I’m misunderstanding the scope and capabilities of this tool.

I suppose it may be the case that many service providers can be grabbing jobs off the queue for processing. You don’t mention that you have things set up this way. If you do, that may be where the problem lies.

Hello,

Thank you for your help.

I developed a solution that allows me to run several jobs at the same time.

Here is a little extract for those who are interested.

1- I create several configs using a loop to have multiple config

// In bootstrap.php
if (Configure::check('Queue.default')) {
    Util::multipleJobs(Configure::read('Queue.default'));
}

// In Util.php
public static function multipleJobs(array $queue): void
{
    if (PHP_SAPI === 'cli') {
        $configs = [
            'videos' => [
                'name' => 'videos',
                'count' => 10,
            ],
            'tags' => [
                'name' => 'tags',
                'count' => 5,
                'storeFailedJobs' => false,
            ],
        ];

        foreach ($configs as $config) {
            foreach (range(1, $config['count']) as $number) {
                $name = sprintf('%s%d', $config['name'], $number);
                $queue['queue'] = $name;

                if (isset($config['storeFailedJobs'])) {
                    $queue['storeFailedJobs'] = $config['storeFailedJobs'];
                }

                Configure::write(sprintf('Queue.%s', $name), $queue);
            }
        }
    }
}

1- I created the workers command to launch several workers at the same time

bin/cake queue workers --config videos


public function execute(Arguments $args, ConsoleIo $io): int
{
    $config = (string)$args->getOption('config');

    /**
     * Queue::getConfigs('videos') returns an array of configs for videos
     * ['videos1', 'videos2', 'videos3', 'videos4', ...]
     */
    foreach (Queue::getConfigs($config) as $config) {
        $name = sprintf('queue worker --config %s', $config);
        $command = sprintf('bin/cake %s', $name);

        $io->out(sprintf('Run Command: %s', $command));

        $PIDs = $this->getPIDs($name);
        if ($PIDs) {
            $io->warning('Worker Already Running!');

            while (count($PIDs) > 1) {
                $pid = array_shift($PIDs);

                $this->killProcess($pid);
            }

            continue;
        }

        $process = Process::fromShellCommandline($command);
        $process->setTimeout(null);
        $process->start();

        sleep(1);
    }
}

3- Save each job in the queue with a different config.

public static function nextConfig(): string
{
    $queue = self::getInstance();

    $next = (string)array_shift($queue->configs);

    if ($next) {
        $queue->configs[] = $next;
    }

    return $next;
}

while ($videos = $this->whileQuery($query, $args, $identifier, ['limit' => $limit])) {
    foreach ($videos as $video) {
        $data = ['id' => $video->id];
        $options = ['config' => Queue::nextConfig()];
        
        QueueManager::push(VideosTranslateJob::class, $data, $options);
    }
}

This works for me, if anyone has any ideas to improve the system please don’t hesitate.