Serg63rus's avatar

Problems with async jobs (or events) in foreach loop

Hi guys)

I have probles with async queue jobs (or events - in events way there is same problem) in foreach loop. Let me explain:

I have large array of items and i want to make calculations with them. But it takes too much time to process it in sync. How can i run async jobs in laravel?

What i want is to split this array to chunks and process all of them in the same time and wait until all of these jobs will be finished and go on to the next operations.

For now what i do:

  1. define main_key for this job
  2. split array to chunks
  3. for every chunk create new job and dispatch it in $uniqueQueueName
  4. for every unique queue run workers with running exactly for that unique queue

Currently I'm running workers that way:

exec('php artisan queue:work --queue=$uniqueQueueName --once --stop-when-empty .... >> asyncWorker.log &')
  1. when job processing is started - it increments value for main_key in cache (In Async)
  2. when job processing is finished or failed with error - it decrements value for main_key in cache and remove main_key from cache when value == 0 (In Async)
  3. while main_key exists in cache wait 1 second
  4. move on...
$syncStatsKey = 'MainJobKey';

$chunks = Collection::make($items)->chunk(100);
        foreach ($chunks as $i => $chunk) {

            $queueName = $syncStatsKey.'_'.$i.'_'.now()->getTimestamp();
            dispatch(new SyncStats($chunk,$syncStatsKey))->onQueue($queueName);
            $artisanPath = base_path('artisan');
            $logPath = storage_path('logs/AsyncWorkers.log');
            $commandString = "php $artisanPath queue:work --queue=$queueName --memory=512 --sleep=0 --timeout=600 --tries=1 --once --stop-when-empty >> $logPath &";

            exec($commandString);
        }

while (\Cache::has($syncStatsKey)) {
     sleep(1)
}

Why not to use Artisan::call('queue:work',[$params]) ? - its returns only when queue work is finished, instead of exec('php artisan ... &')

Why not to send all this jobs to one default queue with workers supervised by Horizon? - This scheme consumes too much resources. Sometimes array are splitted in to 200 chunks and all of them should be processed in the same time. For this at least 200 workers should monitor default queue. But Horizon doesn't create workers dynamically, it keeps all of them running all the time.

I have strong filling that what i'm doing is not the right way to solve my problem. Is there any other ways to do what i want?

Maybe there is some way to run jobs in foreach loop without waiting them complete to go to next loop way?

0 likes
21 replies
Serg63rus's avatar

First I've made everything in "jobs way"

Then I found out that loop continues only when job is done (so if job runs minute for example - next loop will run in a minute!)

Then I thought that maybe "events-way" may help - but there is same problem - if there is somewhere foreach loop - everithings work only as sync

I don't have any ideas more(((

Serg63rus's avatar

If I use async driver - I have memory overload on server after 5-6 days (as I have few commands that runs every minute - so its runs all time...)

devfrey's avatar

You shouldn't run exec() calls from your backend code. Make sure you keep the queue worker active using a supervisor or daemon. Simply dispatch the chunked parts if your items and let the worker handle the job.

michalurva's avatar

Laravel default settings are running jobs on sync connection if you didn't changed it in env file or you don't specify ->onConnection('redis') (or some other queue) on dispatch.

Serg63rus's avatar

@DEVFREY - Problem is that in different moments I cound need 10-20-50-100-200 workers to run simulatenously - but keep that much amount of workers all the time is not right and some of them could be busy - I need every worker to handle one job! Not queue with few jobs - one worker for one job!

michalurva's avatar

@SERG63RUS - I had similar problem and for me setting right connection fixed my issue. And in your posted code you are not setting connection just dispatch(new SyncStats($chunk,$syncStatsKey))->onQueue($queueName); so thats why I asked.

Just to be sure does your Job looks like this?

class SyncUser implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

...

I am using it in foreach and it works perfectly (asynchronously).

Can you post your configs database.php and queue.phph?

Serg63rus's avatar

@MICHALURVA - Yes, job header is the same

Connection set by default to redis, but if I need somewhere I specify needed one (async or sync).

You use async connection in foreach loops?

Serg63rus's avatar

If I use async connection - yes, foreach loops works fine but I have memory overload problem.

My server with 128Gb Ram overloads in 5-6 days(

That's why I decided not to use ASYNC driver and go for Redis

michalurva's avatar

@SERG63RUS - In my queue.php config I have 'default' => env('QUEUE_CONNECTION', 'redis'),

But to be sure I am dispatching my jobs like this:

foreach ($users as $user) {
    $this->info('Syncing user ' . $user->email);
    \App\Jobs\SyncUser::dispatch($user)->onConnection('redis')->onQueue('default');
}
Serg63rus's avatar

@MICHALURVA - If you set SyncUser job to run 1 minute (for example) - your loop will stops until 1 minute ends

I have problem that loop stops and waiting until job is completed And only after job is done loop continues

michalurva's avatar

@SERG63RUS - No my loop is fast. It doesn't wait. It fills the queue in few seconds. Then workers are prcessing it 1-2 hours in my case.

You have problem, because its not running asynchronously. Its done inside loop.

Serg63rus's avatar

If I run job and rerun itself at start - in that way loop continues because first job which was run from loop ended.

Serg63rus's avatar

@MICHALURVA - Problem is that loop is waiting for job to complete.

I can't figure out why this is happens

I understand that your loop is fast - that's why you can't notice this bug - but if you set something to make job run for a minute for example (you can try it with sleep(60) - you will see bug

michalurva's avatar

@SERG63RUS - No, I am sure your jobs are running synchronously. Thats why your foreach is taking forever to finish.

Even if you do sleep(60), worker processing that job will be sleeping, not your loop.

Serg63rus's avatar

No!

Stops exactly loop

I've made markers (info on every loop)

Serg63rus's avatar

@MICHALURVA - Jobs can't run synchronously because of each job has UNIQUE queue name and worker for that UNIQUE queue name

michalurva's avatar

@SERG63RUS - Can you try it with

dispatch(new SyncStats($chunk,$syncStatsKey))->onConnection('redis')->onQueue($queueName);

And post your configs please.

Serg63rus's avatar

@MICHALURVA - Config is default

<?php

return [

    /*
    |--------------------------------------------------------------------------
    | Default Queue Connection Name
    |--------------------------------------------------------------------------
    |
    | Laravel's queue API supports an assortment of back-ends via a single
    | API, giving you convenient access to each back-end using the same
    | syntax for every one. Here you may define a default connection.
    |
    */

    'default' => env('QUEUE_CONNECTION', 'redis'),

    /*
    |--------------------------------------------------------------------------
    | Queue Connections
    |--------------------------------------------------------------------------
    |
    | Here you may configure the connection information for each server that
    | is used by your application. A default configuration has been added
    | for each back-end shipped with Laravel. You are free to add more.
    |
    | Drivers: "sync", "database", "beanstalkd", "sqs", "redis", "null"
    |
    */

    'connections' => [

        'sync' => [
            'driver' => 'sync',
        ],

        'async' => [
            'driver' => 'async',
            'table' => 'jobs',
            'queue' => 'default',
            'expire' => 60,
             'binary' => '/opt/plesk/php/7.3/bin/php',
        ],

        'database' => [
            'driver' => 'database',
            'table' => 'jobs',
            'queue' => 'default',
            'retry_after' => 90,
        ],

        'beanstalkd' => [
            'driver' => 'beanstalkd',
            'host' => 'localhost',
            'queue' => 'default',
            'retry_after' => 90,
            'block_for' => 0,
        ],

        'sqs' => [
            'driver' => 'sqs',
            'key' => env('AWS_ACCESS_KEY_ID'),
            'secret' => env('AWS_SECRET_ACCESS_KEY'),
            'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
            'queue' => env('SQS_QUEUE', 'your-queue-name'),
            'region' => env('AWS_REGION', 'us-east-1'),
        ],

        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => env('REDIS_QUEUE', 'default'),
            'retry_after' => 90,
            'block_for' => null,
        ],

    ],

    /*
    |--------------------------------------------------------------------------
    | Failed Queue Jobs
    |--------------------------------------------------------------------------
    |
    | These options configure the behavior of failed queue job logging so you
    | can control which database and table are used to store the jobs that
    | have failed. You may change them to any database / table you wish.
    |
    */

    'failed' => [
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],

];

michalurva's avatar

@SERG63RUS - Looks good, just to be sure try to clear config cache php artisan config:cache if you recently changed something...

Please or to participate in this conversation.