DigitalViking wrote a reply+100 XP
4mos ago
DigitalViking wrote a reply+100 XP
4mos ago
How did you know that? Are there any errors/exceptions in storage/logs/laravel.log? Do jobs appear in failed_jobs table?
I’ve added logging whenever a job is dispatched. The number of dispatched jobs (200) matches the number of orders to be imported, and I can also see exactly 200 entries in our job_logs table.
As a test, I added a log entry as the very first line in the handle() method. When using multiple workers, that log entry only appears around 6 times. However, when using only one worker, I get all 200 log entries — so it seems like everything is working correctly in that case.
From this, I’m assuming that the handle() method is not being executed when multiple workers are running. The fact that it works with a single worker tells me that the job logic itself is valid and executable.
Also: • There are no errors or exceptions in the log files • The failed_jobs table is empty — nothing appears there
DigitalViking wrote a reply+100 XP
4mos ago
Hi,
First of all, thank you so much for your response!
This is actually my first post here — and also my first time working on a project of this size and complexity.
To be honest, I’m not quite sure where to start in terms of sharing code, since there are a lot of classes involved in this process. Could you help me understand which parts would be the most relevant? Would it be best to start with the Job class itself, or the place where the job gets dispatched?
I’m happy to share whatever information you need.
I’ve already added some logging inside the Job class, and I can confirm that the job is being dispatched — however, the handle() method is never executed.
To get things started, I’ll post the Job class and its parent classes below.
Actual Job Class
<?php
namespace App\Domain\ERP\Xentral\Jobs;
use App\Common\FileStorage\FileStorageItem;
use App\Common\Jobs\ImportJob;
use App\Common\Models\DeliveryNote;
use App\Common\Models\ERP;
use App\Domain\ERP\Xentral\Import\DeliveryNoteFileProcessor;
use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessDeliveryNoteFile extends ImportJob
{
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;
public int $uniqueFor = 3600;
public function __construct(
private ERP $erp,
private DeliveryNoteFileProcessor $fileProcessor,
private array $file,
) {
}
/**
* @throws Exception
*/
public function handle(): void
{
$this->fileProcessor->process($this->file);
}
protected function getSourceType(): string
{
return ERP::class;
}
protected function getSourceId(): string
{
return $this->erp->id;
}
protected function getReferenceType(): ?string
{
return DeliveryNote::class;
}
protected function getReferenceId(): ?string
{
return null;
}
protected function getMessage(): ?string
{
return 'DeliveryNote File could not be processed.';
}
protected function setUniqueId(): string
{
/** @var FileStorageItem $file */
$file = $this->file['file'];
return $file->path;
}
}
First Layer Parent Class
<?php
namespace App\Common\Jobs;
use App\Common\Events\ImportFailed;
use App\Common\Support\Types\TeamSlug;
use App\Exceptions\UnrecoverableErrorException;
use Throwable;
abstract class ImportJob extends OperationJob
{
public function failed(UnrecoverableErrorException | Throwable $exception): void
{
$teamSlug = TeamSlug::General;
if ($exception instanceof UnrecoverableErrorException) {
$teamSlug = $exception->getTeamSlug();
}
ImportFailed::dispatch(
$this->getSourceType(),
$this->getSourceId(),
$this->getReferenceType(),
$this->getReferenceId(),
$this->getFullClientMessage($this->getMessage(), $exception),
$this->getInternalMessage($exception),
$this->job ?
$this->job->uuid() :
'',
$teamSlug,
);
}
}
Second Layer Parent Class
<?php
namespace App\Common\Jobs;
use App\Common\Jobs\Middleware\PreventRetryOnUnrecoverableError;
use App\Common\Jobs\Middleware\RetryOnRecoverableError;
use App\Common\Support\Traits\ConvertsErrorMessages;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\App;
use Illuminate\Support\Str;
abstract class OperationJob implements ShouldQueue, ShouldBeUnique
{
use ConvertsErrorMessages;
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;
public int $maxExceptions = 3;
abstract protected function getSourceType(): string;
abstract protected function getSourceId(): string;
abstract protected function getReferenceType(): ?string;
abstract protected function getReferenceId(): ?string;
abstract protected function getMessage(): ?string;
abstract protected function setUniqueId(): string;
public function getIntegrationType(): string
{
return $this->getSourceType();
}
public function getIntegrationId(): string
{
return $this->getSourceId();
}
protected function shouldNotOverlap(): bool
{
return false;
}
protected function getUniqueKey(): string
{
return $this->getSourceId() . '-' . ($this->getReferenceId() ?? '');
}
public function middleware(): array
{
$middleware = [
PreventRetryOnUnrecoverableError::class,
RetryOnRecoverableError::class,
];
if (!App::environment(['local']) && $this->shouldNotOverlap()) {
array_unshift(
$middleware,
(new WithoutOverlapping($this->getUniqueKey()))
->dontRelease()
->expireAfter(1800)
);
}
return $middleware;
}
public function uniqueId(): string
{
if (config('queue.default') === 'sync') {
return Str::uuid()->toString();
}
return $this->setUniqueId();
}
}
Job Dispatching (Runs inside another Job)
public function run(): void
{
$deliveryNoteFiles = $this->importClient->getDeliveryNoteFiles();
foreach ($deliveryNoteFiles as $file) {
dispatch(
new ProcessDeliveryNoteFile(
$this->erp,
$this->fileProcessor,
$file,
)
);
}
}
Thanks for your support!
DigitalViking started a new conversation+100 XP
4mos ago
Hi Laracasts community,
I’m currently facing an issue in our application for which I haven’t yet found a fully satisfying solution. Let me give you some context:
I’m working on a multi-integration setup in the e-commerce space. The application connects our ERP system with multiple fulfillment partners. It handles various data flows (product data, stock levels, orders, and shipping information). Communication happens either within an ERP domain or a Fulfiller domain, with different integrations implemented there. All import/export operations run as separate jobs.
Currently, we’re still using the database queue driver, but planning to switch to Redis soon. The jobs are dispatched at scheduled intervals via Laravel’s scheduler. Each job is unique and based on an identifier generated from the resource it’s importing. We also log all processed jobs into a custom job_logs table, which is filled using events in a service provider.
⸻
❗ The Problem:
We’re experiencing race conditions whenever more than one queue worker is active. For example, around 200 orders come in during a single import. But once more than one worker is running, only about 6 orders actually get imported. The jobs appear in the jobs table, they are even logged in the job_logs table with a completed timestamp — but in the end, only 6 records make it into the database.
⸻
🔍 What I’ve tried:
I’ve done quite a bit of research and discussed the issue in depth with ChatGPT. I’ve also tried several things, including: • Switching to Redis (on the test server) • Using ShouldBeUnique and cache-based locking mechanisms • Using withoutOverlapping() • Building a custom cache lock per job
Unfortunately, none of these solved the issue.
Side note: At the moment, the orders are read from XML files on an SFTP server and then processed in individual jobs. This will soon be replaced by a webhook/API solution.
⸻
💡 Current idea:
I’m considering creating dedicated queues per job type, so that only one worker is responsible for each queue. But this feels like a workaround, since Laravel queues are designed to allow multiple workers on the same queue.
I also started looking into Laravel Horizon to better manage these queues.
⸻
🧠 My questions: • Has anyone experienced similar race conditions and has an idea of what might be causing it here? • What would be the best practice for this type of scenario? • Is Horizon a worthwhile solution in this case? • Is separating queues a good idea or more of a band-aid?
⸻
Thanks a lot for reading — I’m really looking forward to your insights!