bjugan's avatar

Maximum one job waiting in queue

Hi,

I have a job that synchronizes data from system A to system B, by finding and synchronizes all changed data in system A since the last job run. This job is dispatched to queue when data in system A changes. This can sometimes happen multiple times per second.

I can only run one of these jobs at a time to prevent a race condition, and I use the WithoutOverlapping middleware to prevent this from happening. At the same time, It's not necessary to put a lot of these jobs on hold in the queue. I need maximum one job waiting to run when the previous one finishes.

The ShouldBeUniqueUntilProcessing contract could help me prevent a lot of unecessay jobs waiting in the queue. But the problem is that the next job starts while the previous is running (=race condition) and not after the previous one finishes.

What I need is a ShouldBeUniqueUntilProcessingButDoNotOverlap solution.

Hope someone can help. I have a feeling I'm missing a abvious solution here :-)

Thanks!

Marius

1 like
14 replies
Glukinho's avatar

the problem is that the next job starts while the previous is running (=race condition) and not after the previous one finishes

How many queue workers you have? Maybe leave only one, then jobs will be executed strictly one after another.

Make syncronize job set a global flag (using cache for example) synchronized = 1 right after successful synchronization. A job should check this flag before exectuion and proceed only if synchronized = 0, otherwise exit. System A, when it's data changes, sets this flag to 0 along with dispatching a job.

With this approach only first job actually syncs data, others end quickly without any actions, no matter how many jobs are queued.

1 like
bjugan's avatar

Thanks for you suggestions!

How many queue workers you have?

300 :)

The problem is not executing the jobs one after the other. The WithoutOverlapping middleware handles this. The problem is how to prevent more than one job "waiting in line" to be executed.

But your suggestion of setting a global flag inspired me to the following solution which I think should work:

  • When I dispatch the job I set the global flag "queued" to 1
  • When the job is executed (starting) the flag is reset to 0
  • Before I dispatch a new job I check this flag and skip dispatching if the flag is set

In combination with WithoutOverlapping middleware I think this should ensure that there is maximum one job in queue at a time, and that they are executed one after the other.

Thanks again!

Glukinho's avatar

Why do you care how many jobs are "waiting in the line"? What is wrong when there are more than 1?

Maybe you better provide idempotency in a job (so no matter how many jobs were executed, 1 or 100 - result must be the same) instead of trying to limit number of jobs in queue with some strange methods?

bjugan's avatar

It's not "wrong", but every job involves doing multiple database queries, and I want to prevent unecessary queries to save resources and speed up things. The system is sending live data to end users, and speed is important. And resources costs money :-) (I'm using Laravel Vapor)

Example:

  • There is a change in system A and the job is dispatched to sync this changes to system B (one small change can actually trigger å series of database changes, so the job must do a db query to fetch all changed db rows since last job execution)
  • During the execution of this job there is five new changes in system A. This would dispatch 5 new jobs to the queue waiting for the previous to finish. But this is four jobs more than I actually need, because I only need one job and one query to fetch and sync all changes based on this five changes.
Glukinho's avatar

Thanks for the explanation. But I still think my first suggestion of having flag synced = 1/0 fulfills your situation:

  1. when a change occurs, System A sets flag synced = 0 and dispatches a job.
  2. at the very beginning a job checks the flag, if synced = 1 a job exits immediately. If synced = 0 a job proceeds execution.
  3. after successful synchronization a job sets synced = 1.

So, first job does all the work. Even if 5 more changes occured while first job was running, all these changes will be processed by first job. Subsequent jobs will see synced = 1 set by first job and will skip execution.

Glukinho's avatar

There is even more precise implementation.

Have two timestamp flags:

  • last_change_at
  • last_sync_at

System A sets last_change_at to current timestamp at the moment a change occurs and job is created.

A job gets current timestamp at the beginning of the execution (or at the moment latest data was "taken" from System A) and sets last_sync_at to that timestamp upon successful sync.

Next job compares two timestamps and proceeds if last_sync_at < last_change_at (in other words, when there are changes not synced yet), otherwise exits.

bjugan's avatar

Thanks for the follup ups! :-)

Lets split the job in four steps:

  1. Check synced flag (or compare timestamps). Continue if 0.
  2. Fetch changed rows from database
  3. Sync rows to system B
  4. Set synced = 1 (or set last_sync_at)

Even if 5 more changes occured while first job was running, all these changes will be processed by first job.

What if the 5 changes occure during step 3? Then they wont be processed by the running job (no more fetching from database), and I guess 5 new jobs will dispatch to queue because synced is still 0 (or last_sync_at is not updated)?

Glukinho's avatar
  1. Set synced = 1 (or set last_sync_at)

It is important that last_sync_at shoud be set to timestamp of step 2 (Fetch changed rows from database). So, meaning of last_sync_at is "timestamp of last synced change", not "timestamp of last job end".

What if the 5 changes occure during step 3?

If changes occured while Job1 was runnning then Job2 will see that last_sync_at < last_change_at and will sync remaining changes. After that it sets last_sync_at to new value.

If no new changes occured subsequent Job3, Job4, Job5 will exit immediately as they will see last_sync_at >= last_change_at

If there are new changes occured while Job2 was running, they will be synced in Job3, otherwise Job3 just exits. And so on with Job4, and so on with Job5, etc...

What a job may look like, in pseudo code:

// job start

if ( Cache::get('last_sync_at') > Cache::get('last_change_at') ) {
	// no pending changes, exiting immediately
	return;
}

$last_synced_rows_timestamp = now();
$changes = fetch_changed_rows();
sync_changes($changes);

Cache::set('last_sync_at', $last_synced_rows_timestamp);

// job end
Glukinho's avatar
Level 31

See the diagram (better open in new tab):

jobs diagram

bjugan's avatar

Wow! :-)

I understand everything now. For some reason I didn't think about your solution in combination with the WithoutOverlapping middleware. Thanks to your awesome diagram, I figured it out.

Thanks for taking the time to help me! Really appreciate it!

Marius

1 like
Snapey's avatar

I have a very similar scenario.

I listen to events from a third party. There are multiple types of event and for some events I need to make sure I have the latest copy of an order.

A naive implementation collects the order by api call and upserts it to the database. This is done in a job triggered by the receipt of the event. However sometimes 3 or 4 events are received at the same time, jobs created to handle each, and all need the latest order.

Im facing a similar challenge in that I want only one API call but potentially, several jobs need the result.

I should say also that other jobs are simultaneously collecting other orders (different id) and thats fine, so limiting the api is not an option

Im wondering about some sort of a proxy layer that internally knows what orders are being collected and makes threads wait until it has serviced previous requests for the same data.?

Would this serve your needs also?

bjugan's avatar

If I understand you correctly, it seems like a similar scenario, yes! :-)

But for me it is not important to keep track of what data changes, but to make sure there is always a job running after the latest data changes, and at the same time prevent more the one job waiting in queue to fetch those changes.

I guess you should use the order id a part of the jobs unique id to make sure the jobs don't overlap for the same order, but you will still get 4 jobs/API calls if you get 4 events at the same time.

Snapey's avatar

my challenge is that each of the jobs need to do their own work with the order

Something needs to say, just hang fire, we're just fetching that order...

bjugan's avatar

Ah, I think understand. Not an easy task.

Just thinking out loud here. What if each event dispatches two jobs:

  1. Job A: Fetching the latest order from API.
  2. Job B: Do the work with the order.
  • Job B can be different types of jobs based on the type of the event
  • All jobs involved (A+B) must use the WithoutOverlapping middleware with order id as unique id to make sure the jobs based on same order do not overlap.
  • In addition Job A implement the ShouldBeUniqueUntilProcessing contract to prevent unecessary API calls. "If there is already a Job A waiting in the queue, do not dispatch another Job A." Unique id must be something unique for this job.

Please or to participate in this conversation.