Dispatch job from another job
Hello everyone,
I am facing a hair-pulling situation and could use some help...
I am working on an application interacting with an external data provider (Apache Druid). I have 3 types of jobs to manage my data :
- IngestHour : will send an hour of data points to Druid (from my PostgreSQL database that acts as a buffer)
- AggregateHour : will request Druid aggregate the data points of an hour and store the result
- AggregateDay : will request Druid aggregate the data points of a day and store the result
I have two ways to acquire the raw data points : API endpoints (external services can send me data points) and scheduled jobs fetching them from JSON files. Each of them will trigger the IngestHour job for a specific hour. Once the ingest hour is done, it will trigger the AggregateHour for the touched hour, and once the AggregateHour job is done, it will trigger the AggregateDay for the touched day.
Each job is on a separate queue of descending priority (IngestHour is highest, AggregateDay is lowest) in my horizon configuration.
To ensure it does not compute multiple times for the same source, I have developed a DebouncedJob class, which when placed multiple times on the queue would ignore all attempts at running except the last one (in opposition to using the ShouldBeUnique trait, that would only keep the first one). This way, when I'm aggregating all hours of a same day, it will wait until every hour is computed before computing the day.
That class contains a afterHandle method where I want to dispatch the next job (IngestHour dispatches AggregateHour, and AggregateHour dispatches AggregateDay). Everything works, except that method, because the job is dispatched synchronously and does not appear in Horizon.
Here is the code for the DebouncedJob class :
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Foundation\Bus\PendingDispatch;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Cache;
abstract class DebouncedJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Get the unique ID for the job.
*/
abstract public function debounceId(): string;
protected function cacheKey(): string
{
return $this->debounceId() . '-last-dispatched';
}
// This method should be overridden in the child class with specific job logic
abstract protected function handleJob();
public function handle()
{
// Check if a more recent job has been dispatched
$lastDispatchTime = Cache::get($this->cacheKey());
if ($lastDispatchTime && $lastDispatchTime > now()) {
// If a more recent job exists, exit without processing
return;
}
// Proceed with the job logic implemented in the child class
$this->handleJob();
// Clear the cache key after the job is done
Cache::forget($this->cacheKey());
// Optionally dispatch the next job (e.g., AggregateHourJob) if needed
$this->afterHandle();
}
// Optionally override this method in child classes to chain next jobs
protected function afterHandle()
{
// Default implementation is empty
}
protected function shouldReschedule()
{
$lastDispatchTime = Cache::get($this->cacheKey());
return $lastDispatchTime && $lastDispatchTime > $this->job->createdAt();
}
// Static method for dispatching jobs with tracking
public static function dispatchDebounced(...$arguments)
{
$instance = new static(...$arguments);
Cache::put($instance->cacheKey(), now());
return new PendingDispatch($instance);
}
}
If anyone has any idea of what I'm doing wrong, I'd love to hear it. Even if you have feedback on my current "debounced" implementation, it would be great. It's the first time I'm doing something so complex with queues, so I'm not sure I went in the right direction...
Thanks !
Please or to participate in this conversation.