tomcodes's avatar

How to avoid Allowed memory size of xxx bytes exhausted when passing a big batch to Bus::chain?

Hello!

I am working on a bank reconciliation system where I have to upload a .csv file that is quite big (current file is around 6700 lines).

I use Laravel Bus::chain to have the import done in chunks. It works really well for smaller files, but when the file gets big I get the following error:

Allowed memory size of 268435456 bytes exhausted (tried to allocate 42505800 bytes) {"exception":"[object] (Symfony\\Component\\ErrorHandler\\Error\\FatalError(code: 0): Allowed memory size of 268435456 bytes exhausted (tried to allocate 42505800 bytes) at /var/www/html/vendor/predis/predis/src/Connection/StreamConnection.php:364)

I understand that the memory to handle is too big to hand to redis.

Here is my current code:

$filePath = $$file->store('reconciliation');
$reader = Reader::createFromPath(storage_path('app/' . $filePath), 'r');
$reader->addStreamFilter('convert.iconv.ISO-8859-15/UTF-8');
$reader->setDelimiter(',');
$reader->setHeaderOffset(0);

$records = Statement::create()->process($reader);

$batches = collect(
        (new ChunkIterator($records->getRecords(), 100))
            ->get()
    )->map(function ($chunk) {
        return new ImportCsvLines($chunk);
    })->toArray();

Bus::chain($batches)->dispatch();

Any idea on how to fix this? Thank you ^_^

0 likes
4 replies
rodrigo.pedra's avatar

You could consider a similar implementation to the one described in the docs:

https://laravel.com/docs/9.x/queues#dispatching-batches

Where you dispatch several jobs with the line bounds you want each process.

For example:

new ImportCsvLines($filename, 1, 100);

Would process a CSV file named $filename from lines 1 to 100.

It seems you are using The PHP League CSV package. If so, you can get the line count of a file using PHP's count() function. See how to do it on its docs:

https://csv.thephpleague.com/9.0/reader/#records-count

I tested with this code:

<?php

use App\Jobs\ImportCsvLines;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\LazyCollection;
use League\Csv\Reader;

Artisan::command('test', function () {
    // $filePath = $file->store('reconciliation');
    $filePath = 'reconciliation.csv'; // manually created a file with 6735 lines
    $reader = Reader::createFromPath(storage_path('app/' . $filePath), 'r');
    $reader->setHeaderOffset(0);
    $count = count($reader);

    $batches = LazyCollection::make(function () use ($count) {
        for ($offset = 0; $offset < $count; $offset += 100) {
            yield [$offset, min($count, $offset + 99)];
        }
    })->map(function ($chunk) use ($filePath) {
        return new ImportCsvLines($filePath, $chunk[0], $chunk[1]);
    });

    Bus::chain($batches->all())->dispatch();

    $this->info(memory_get_peak_usage(true) / 1024 / 1024);
});

And got a memory usage of 22 MB.

For reference, my test job is:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use League\Csv\Reader;
use League\Csv\Statement;

class ImportCsvLines implements ShouldQueue
{
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;
    use SerializesModels;

    public function __construct(
        public string $filePath,
        public int $from,
        public int $to,
    ) {}

    public function handle()
    {
        $reader = Reader::createFromPath(storage_path('app/' . $this->filePath), 'r');
        $reader->addStreamFilter('convert.iconv.ISO-8859-15/UTF-8');
        $reader->setDelimiter(',');
        $reader->setHeaderOffset(0);

        $records = Statement::create()
            ->offset($this->from)
            ->limit(100)
            ->process($reader);

        foreach ($records as $offset => $record) {
            logger()->info($offset, $record);
        }
    }
}
rodrigo.pedra's avatar

As a note: on your implementation, you are still loading the whole file in memory, as you build the jobs passing down the records.

So to build up the whole chain, you end up loading all the records on memory before dispatching the chain.

Snapey's avatar

You are chunking the reads, but just adding the data into the batches collection.

really you should process each batch of rows in their entirety before moving on to the next chunk.

Tippin's avatar

As mentioned above, you are loading all jobs for the batch into memory before dispatching it. You can overcome this by using a generator to add more jobs to an already dispatched batch, which I implemented myself using an article from Spatie/Freek in how they solved this:

https://freek.dev/1734-how-to-group-queued-jobs-using-laravel-8s-new-batch-class

One "issue' with the above is the fact you are dispatching an empty batch to start, thus your finally method on the batch may be called more than once. But that would be the case for any batch that has a chance to finish to only then have jobs added after the fact.

  • Some things in the article are outdated now, such as the ability to add a queued closure to the batch directly vs needing a separate job, eg:
$batch->add(function () use ($model) {
    $model->update([
        'is_dispatching' => false,
    ]);
});

Please or to participate in this conversation.