Laravel Job Queue: Peeking Behind the Curtain (Part 2)

Sayantan Das
codelogicx
Published in
12 min readJun 24, 2022
Photo by Max Duzij on Unsplash

In part 1, we talked about how jobs are sent to the queue. In this section, we’ll find out how jobs are picked up from the queue and processed. We’ll also look into what happens if the job throws some exception or how timeouts are implemented. So let’s get started.

Job Processing

Up until now, we only dispatched the job to the queue, if there is no one processing them, they will just keep accumulating in the queue storage. This is where the queue worker comes into play. Worker processes are long-running processes that are started separately, usually monitored by some process monitor like supervisor so that it can be restarted if any of the processes stop. We start the worker by running php artisan queue:work, so this command is the first thing we’ll look into.

The queue:work command is defined inside Illuminate\Queue\Console\WorkCommand class, let’s look at the handle() method of this class:

WorkCommand::handle()

The first part just checks whether the application is in maintenance mode and the --once flag (this will process a single job and exit) was provided, and if so, the process will sleep for the configured duration and exit. The listendForEvents() method registers event handlers for job-related events and prints the output to the console. The next part determines from which connection the worker process should fetch the jobs, if the connection name was provided, it uses that connection, otherwise it will use the default connection. Similarly, if the queue was provided it is used otherwise the default queue will be used to fetch jobs. It is possible to provide multiple queues separating them by comma (e.g. --queue=emails,default ), here the queue name that comes first has a higher priority. We’ll see how that works a little bit later. Finally, it runs the runWorker() method passing in the connection name and queue name as arguments.

WorkCommand::runWorker()

Processing of jobs is offloaded to the Illuminate\Queue\Worker class, depending on if the --once flag was provided, the runWorker() method will call runNextJob() or daemon() on the underlying worker instance. The runNextJob() takes one job off of the queue and processes it and then exits. The daemon() method on the other hand will start an infinite loop that keeps on pulling jobs from the queue and processing them. We’ll look into this daemon() method more closely.

Worker::daemon()

The supportsAsyncSignals() methods checks if the pcntl extension is loaded and sets up event listeners for SIGTERM, SIGUSER2 and SIGCONT. The SIGTERM signal is probably used for graceful shutdown of the worker process and the other two seem to be used for pausing and continuing the worker.

Worker::daemonShouldRun()

The first thing that happens inside the infinite loop is to check whether the current iteration of the loop should continue or not. The daemonShouldRun() method basically checks if the application is in maintenance mode (which you can override by passing the --force flag), whether the worker is paused, and if any event listener is preventing the loop or not. The line

$this->events->until(new Events\Looping($connectionName, $queue)) === false)

will trigger Events\Looping event and if any listener for this event returns false, then the loop will not proceed further. So if you want to pause the worker from processing jobs temporarily, you can use this event for that.
If the condition returns true, then it will call the pauseWorker() method which makes the process sleep for the configured duration and checks to see if the execution should continue or not. If the worker process is consuming more memory than configured, or you have set the --stop-when-empty flag and there is no job in the list to process, or you have run the queue:restart command then the current process will exit. Another interesting thing here is that the queue:restart command doesn’t actually restart the process, it just causes the current process to exit. You have to have a separate process monitor (supervisor) to start the worker back up. The way it works is like this: when the daemon process starts, it will fetch the last time the queue was restarted from the cache and store it in a local variable. Now if at any point the value in the cache differs from the value in the local variable, the worker will exit. And all the queue:restart command does is update the last restart timestamp in the cache with the current timestamp. This will cause all the running workers to exit and if you have supervisor set up, then supervisor will restart those workers again.

Anyway, at this point, it will call the getNextJob() method to get the next job to be processed from the queue. Before we go into details about the getNextJob() method, I want to first discuss how timeouts are implemented.

Worker::registerTimeoutHandler()

The registerTimeoutHandler() will register a handler for the SIGALRMsignal. The handler will kill the current process if the alarm is raised. In the next lines, an alarm is configured to be triggered at the timeout duration for the job. If you have set the timeout in your job class that is used, otherwise it will use the default timeout for the worker. The markJobAsFailedIfWillExceedMaxAttempt() will make the current job fail if it has crossed the number of times it can be retried or the timeout duration has been crossed. If the job has not exceeded the max attempts, only the worker process will exit and this job will be picked up again by a different worker process to be retried. What happens when a job fails, we’ll see later. Now let’s dig into how jobs are fetched from the queue.

Worker::getNextJob()

Remember, that to prioritize a particular queue you need to pass it like this --queue=high,low when you start the worker? The jobs from high queue will always be processed before processing jobs fromlow queue. Here is how it happens: after exploding the queue name against , , the queue names are looped and the queue driver would try to retrieve the job from the first available queue with $connection->pop($queue). That means if there are jobs in the high priority queue, jobs from the lower priority queues won’t be processed at all. Only after the high priority queue is empty, the worker would fetch jobs from the low priority queues. I am not sure what are the chances of that happening, but it is something to keep in mind.

RedisQueue::pop()

The pop() method is implemented inside Illuminate\Queue\RedisQueue class. The first thing it does is migrate the expired jobs from the delayed queue and if you have configured retry_after, and the job is stuck at processing more than that duration, it will be added back to the main queue to be processed again.

RedisQueue::migrate() and RedisQueue::migrateExpiredJobs()

Remember when we talked about dispatching delayed jobs, that the delayed jobs are added in a sorted set with the time it should be available for processing as the score? This is where those jobs are pulled back to the main queue. All the jobs that have expired at the current time are pulled back from the delayed queue and put back into the main queue. If you remember the ZRANGEBYSCORE command, seeing the Lua script will make things clear for you. The pulled items are also removed from the previous queue using ZREMRANGEBYSCORE command. You’ll find out about the reserved queue just a bit later.

LuaScripts::migrateExpiredJobs()

After all the jobs that are eligible to be added to the main queue are added back to it, it will now retrieve the next job from the queue.

RedisQueue::retrieveNextJob()

To pop a job off of the queue a Lua script is run. It basically does three tasks:
1) Calls LPOP on the main queue list.
2) If LPOP returned a job it is added to the reserved queue. If you remember from earlier, reserved queues are also sorted sets. Here the expiration time for the reserved jobs is the time after addingretry_after duration to the current timestamp. As seen in the earlier section, reserved jobs are added back to the main queue after they are expired for retrying. Every time a job is added to the reserve queue, the attempt count is incremented for that job.
3) An item from the notify list is removed.

So far you’ve seen this notify queue everywhere, to know its purpose, let’s look at the following snippet from the retrieveNextJob() method:

notify queue

Laravel provides a block_for config option for Redis queue, from the docs:

When using the Redis queue, you may use the block_for configuration option to specify how long the driver should wait for a job to become available before iterating through the worker loop and re-polling the Redis database.

This is where the notify queue comes into play. If you have specified a block_for option in your config, then the worker will callBLPOP on the notify queue and this call will block for the specified duration. As we’ve seen during job dispatching, when a new job is queued, a new element is added to the notify queue as well. When a new item is added to the notify queue, the BLPOP call returns and the method recursively calls itself to pick up the new job from the actual queue. Nifty, huh?
If no job is returned from the queue, the method calls just returns. Otherwise, an instance of Illuminate\Queue\Jobs\RedisJob is created and returned back to the worker process. Now it’s time to process the job.

Once the job is fetched from the queue, the worker daemon will call the runJob() method which in turn will call the process() method and catch any exception or error that occurred during processing.

Worker::process()

It will first trigger the before job event and then check if the job has exceeded the number of times it can be re-attempted.

Worker::markJobAsFailedIfAlreadyExceedsMaxAttempts()

First, it retrieves the max tries config option, which is either configured in your job class or during starting the worker process. You can also configure a timeout duration instead of the max tries, if timeout duration is configured that takes precedence and if the current time has not crossed the timeout duration it’s considered to be eligible for processing no matter how many times it has been tried before. Otherwise, it will cause the job to fail. What happens when a job fails we’ll look into it a bit later, for now just know that it will be marked as deleted and the process() in the previous step will check the deleted status and skip processing it. If all is okay, then the job is eligible for processing and it’s fired off by $job->fire() call. Before we see what happens when a job is fired, let me first show you what happens if an exception is thrown from inside the job.

Worker::handleJobException()

When an exception occurs during processing a job, be it thrown directly from the job or from somewhere else while processing it, it first checks if the job has reached the maximum number of retries and if it has, it will cause the job to fail. The fail() method can be found in Illuminate\Queue\Jobs\Job class.

Job::fail()

When a job is considered failed it means all the available retry options are exhausted and no further attempt would be done to process the job anymore. Failing a job involves marking it as failed and deleting it. $this->delete() method is overridden in the child class RedisJob, which calls the deleteReserved method on the RedisQueue instance. As we have seen earlier, when a job is picked up from processing, it is added to the reserved queue; now when it fails, it gets removed from the queue altogether.

RedisQueue::deleteReserved()

As the last step of exception handling, if the job can be retried at a later time, it is released back into the delayed queue. The $job->release() call, implemented in RedisJob::release(), will call the deleteAndRelease() method on the queue driver RedisQueue.

RedisQueue::deleteAndRelease()

If you have configured a delay for the failed jobs, the job would be available for processing after that delay period. Otherwise, it will be available immediately to be retried by the next iteration of the worker loop. It essentially removes the job from the reserved queue and adds it to the delayed queue after adding the delay duration as expiration.

LuaScripts::release()

Now that we’ve seen how failures are handled, the only thing left to explore is how the logic from the job class is run. That process is triggered by the $job->fire() method. This method is defined in theIlluminate\Queue\Jobs\Job class.

Job::fire()

The payload here is the raw payload that is stored in Redis. If you remember our earlier discussion about the payload generation during job dispatching, then you’ll know that $payload['job'] was set to Illuminate\Queue\CallQueuedHandler@call and $payload['data'] was an array containing the fully qualified class name of the job and serialized version of the job instance. This is where the actual processing begins. The fire() method simply instantiates the Illuminate\Queue\CallQueuedHandler class and calls the call() method on it. Let’s see what happens there.

CallQueuedHandler::call()

At first, it will unserialize the job instance and if the job includes the Illuminate\Queue\InteractsWithQueue trait, it will set the instance of RedisJob into the unserialized job instance. Then it will call the dispatchNow() method on the Dispatcher. Now we’re back to the Dispatcher. This same dispatchNow() method is called if you had dispatched the job synchronously.

Dispatcher::dispatchNow()

This method basically creates a callback function that executes the handle() method on the job class instance and sends it through the pipeline. Laravel pipeline is beyond the scope of this post, but what it essentially does is ultimately call the callback which in turn executes the handle method. Thus our journey from the job dispatching to its execution completes here. If any exception occurs here it is handled by the exception handling discussed earlier. And if the job runs successfully, the control now goes back to the daemon() method on the worker and it will iterate again trying to find a new job to process.

Parting Words

I hope you have enjoyed taking this journey with me. And I sincerely hope that I could explain all these details as good as I wanted to. If something is not clear you can ask in the comments, or, you could roll up your sleeves and dig through the source code just like I did. It’s definitely a rewarding journey in the end. To see the Redis commands run by the queue system you can run redis-cli monitor and you’ll be able to see all the commands that are processed by the Redis server. Do let me know what you think about this article in the comments section, and if you have any feedback, that is welcome too. Until then, sayonara.

--

--