Be part of JetBrains PHPverse 2026 on June 9 – a free online event bringing PHP devs worldwide together.

riquemuramoto's avatar

AWS SQS Queued Job race condition with multiple workers

Hello!

I have a platform that creates game rooms for a game with limited vacancies. I have a problem when two or more users register in a room while only one vacancy remains.

The scenario is as follows:

  • There is a games room with 16 vacancies;
  • 15 vacancies have already been filled by other users;
  • When 2 or more users register in the room at the same time, the work is performed by several workers, so the check I make to see if the room is full is not verified correctly.

The Job performs a series of other checks and functions that add the user to a room and other resources, therefore, it needs to be executed in a Job, due to the function's execution time.

What I need is for this specific job to be executed in a queue that will execute it in a sequence of arrival (FIFO).

My platform is running using Laravel Vapor, so the queues use AWS SQS and, according to the documentation, around 1000 workers are activated to perform the functions sent to the queue.

What I tried:

When I run the project with "queue-concurrency: 1" in my vapor.yaml, it runs the jobs as expected, but it creates the concurrency for all my queues, which I don't want, I need to run just for that job.

I checked the laravel documentation and there is a rate limit for specific jobs (https://laravel.com/docs/6.x/queues#rate-limiting), but apparently it only works with Redis.

Has anyone had a similar problem and could help me?

Thanks.

0 likes
5 replies
kalemdzievski's avatar
Level 6

Take a look at "Mutex" functionality or so called locks.

At the company where I work we had kind a similar problem where a PDF was generated based on a user action. The PDF was generated in a worker and because it was a "heavy" task (PDF generation + upload), if two or more users triggered this action at the same time, there was a possibility that the generated PDF document would not be up to date.

We didn't wanted the workers to be executed synchronously or to have a dedicated queue for this job, so instead we implemented a Mutex just in that job. The Mutex would have the UUID of the PDF as key, so every time a PDF is generated we would first check if the same PDF is already being generated. Since the key was the UUID of the PDF, other (different) PDF documents would not be affected by this Mutex. Additionally we found a great Mutex implementation that works with Redis which allowed us to be more scalable.

I think that you can solve your problem in this very same way. You would have the whole logic of joining a game room inside a "mutex" which will be identified with the ID of the room. Other rooms would not be affected and joining a game room would be done synchronously.

The PHP library for mutex that we use: https://github.com/php-lock/lock

Example with Redis: https://github.com/php-lock/lock#phpredismutex

If you already use Redis as a queue driver for the workers, you can use the same Redis host for your mutex.

Your code should probably look something like (I recommend a dedicated Redis instance, not the localhost):

$redis = new Client("redis://localhost");

$mutex = new PredisMutex([$redis], $room->id);
$mutex->synchronized(function () use ($your_params ...) {
    // logic for joining a room ...
});
2 likes
riquemuramoto's avatar

Thanks for the reply.

I tested your solution on my local machine and indeed worked!

In my local machine i booted up around 10 workers and test concurrent dispatched of the job. With your solution, it worked and exectued the job sequentially just as i needed.

The problem is, with Laravel Vapor service i'm using the Dynamodb as my cache service and AWS SQS for my queue driver. To use the AWS Elasticache (Redis) for my queue and cache service, i need to pay at least extra $45 to use it, because it needs a private nat gateway and others services to work.

So, with your solution, i read how the implementation of a mutex works, read the code of the repository that you recommend to understand how it works and came with a solution for my case.

Also, the Laravel Vapor support replied me to use the rate limiting feature for queues that is described on the laravel documentation.

https://laravel.com/docs/6.x/queues#rate-limiting

That worked as well, but also needs a Redis Server.

The solution for my case i use the following laravel features and come up with a solution:

First i create a Job Middleware to hold the logic of the cache atomic lock, which will work as the mutex like implementation, but working with the Dynamodb cache driver.

<?php

namespace App\Jobs\Middleware;

use Illuminate\Contracts\Cache\LockTimeoutException;

class RateLimited
{
    /**
     * Process the queued job.
     *
     * @param  mixed  $job
     * @param  callable  $next
     * @return mixed
     */
    public function handle($job, $next)
    {
        $lock = \Cache::lock(\Str::kebab(class_basename($job)), config('queue.rate_limited_middleware.timeout'));

        try {
            $lock->block(config('queue.rate_limited_middleware.timeout'));

            $next($job);
        } catch (LockTimeoutException $e) {
            \Log::error('Unable to acquire lock');
        } finally {
            optional($lock)->release();
        }
    }
}

The code will use the job class name in a kebab case as the lock id, and a configurable timeout. It will wait for the cache lock to be available then continue to exectue the Job, after finishing the job, the lock is released.

With the Job Middleware created, now i just need to use it on the Job that a i wanted to be sequentially executed, inserting the snippet on the job file.

/**
 * Get the middleware the job should be dispatched through.
 *
 * @return array
*/
public function middleware()
{
    return [new RateLimited];
}

And add the config env variables on the queue.php file.

/*
|--------------------------------------------------------------------------
| Rate Limited Middleware
|--------------------------------------------------------------------------
|
| These options configure the behavior of rate limited middleware for Jobs.
|
*/

'rate_limited_middleware' => [
    'timeout' => env('RATE_LIMITED_MIDDLEWARE_TIMEOUT', env('QUEUE_TIMEOUT', 30))
]

So far, after tests on my staging environment, it's working fine on my production environment!

Now i just checking the costs of that implementation, since it uses quite a few Read/Write requests units on the Dynamodb service.

Once again, thanks for the reply, it helped me to come up with this idea!

3 likes
kalemdzievski's avatar

I wasn't aware of Laravel's Atomic Locks but that is much better than an additional library. I like this very much!

For future reference I just wanna note that your solution looks much better and we would definitely try to switch to this.

Thanks for sharing that! Glad I could help.

booni3's avatar

@riquemuramoto Do you not find with this method you would end up releasing and rerunning the queue over and over again?

In my case, I may be pushing a few thousand jobs to the queue and I want to keep concurrency low to aid database performance.

I am considering creating multiple environments with different concurrency settings to get around this... but it does not feel nice. Vapor has the ability to define different queues but it seems you cannot adjust concurrency for each one.

riquemuramoto's avatar

For my case, it's working fine. I believe i will not have more than 5-10 jobs that i need without concurrency.

On my local machine, i tested my solution with 10 queue workers to run 5 of the same job that i need to execute sequentially. Multiple workers get the job to execute, but with the cache atomic lock it just hangs the execution until it's ready/free to execute the job. So, from what i check, it's doesn't rerun the job over and over again, it hangs until it's ready to execute or fails after the timeout.

Just today happened something strange that i looking around now. The job/queue i think has failed, it hangs and no jobs has executed. I had to redeploy my project in order to resolve and normalize the queue/job.

Now i looking though the logs to know what has happened.

About Vapor, yeah i didn't find anything to adjust concurrency for specific queue, i think that would be a better and simpler solution, since i would not need to use Read/Write requests units on the Dynamodb service.

Please or to participate in this conversation.