Implementing simple job queue with Mysql 8.0 and PHP PDO

Denys Golotiuk
DataDenys
Published in
4 min readAug 3, 2022

--

There’re plenty of queue services out there. Simple and fast, featured and powerful. Sometimes, especially when working on prototypes, we need to get feature working as fast as possible. Adding new tech to stack can take time. This is a moment to consider lightweight implementation on top of current tools in your stack. Since Mysql is so popular, let’s implement job queue based on it. We’ll use PHP for scripting examples.

How job queue should work

Job queue is a simple stack storage. It has 2 basic features:

  • Adding data objects (or, like smart-looking people say, sending jobs/tasks/events/messages),
  • Getting objects in the order of addition so that each object is returned only once.

Job queue is a popular way to make multiple clients (multiple apps or multiple app components) give tasks (jobs) to each other:

There can be any number of clients who add jobs and any number of clients who get jobs:

Mysql InnoDB engine is a great platform to implement simple job queues.

Job queue table structure

Let’s build the most simple job queue one can imagine. We will need a table with the following structure:

We’ll need ID column to identify each job stored and maintain insert order (on top of AUTO_INCREMENT functionality). data column will be used to store job data. As this can be anything, we're going to serialize it and store as a string. So TEXT type should be ok (use LONGTEXT if you have large job data objects).

Let’s create the table:

CREATE TABLE queue (
id SERIAL PRIMARY KEY,
data TEXT
);

Writing to Job queue

In order to write data to our queue table we’ll just use standard INSERT query. We'll use json_encode() method for serializing data into string as it's the most universal data type:

<?php
# connect to Mysq
l$pdo = new PDO('mysql:host=localhost;dbname=db', 'usr', 'pwd');
# convert job data to JSON
$job = json_encode(['do' => 'something', 'with' => 'data']);
# insert job into queue
$statement = $pdo->prepare('INSERT INTO queue SET data = :data');
$statement->execute([':data' => $job]); # using prepared statements

After couple of inserts our table will look like:

> SELECT * FROM queue;
+----+---------------------------------------------+
| id | data |
+----+---------------------------------------------+
| 1 | {"do":"something","with":"data"} |
| 2 | {"do":"something_else","with":"other_data"} |
+----+---------------------------------------------+

As InnoDB is a great engine for large amounts of simultaneous writes, we don’t need to do anything extra to allow multiple clients to write to our queue.

Reading from job queue

Now the fun part. Why? Because we need to do 2 operations for every read from queue:

  • actually read job data,
  • remove fetched job from queue.

Easy if we have only one client:

<?php
$statement = $pdo->prepare('SELECT * FROM queue ORDER BY id LIMIT 1'); # select one job in the order of insertion
$statement->execute(); # execute select statement
$job = $statement->fetch(); # fetch selected data into php variable
# remove fetched job from queue
$pdo->exec('DELETE FROM queue WHERE id = ' . $job['id']);
$job_data = json_decode($job['data'], 1); # unserialize JSON
print_r($job_data); # do something with job data

If we have any jobs in the queue we get:

Array
(
[do] => something
[with] => data
)

But if we have multiple simultaneous clients we can end up giving the same job data to multiple clients if they request it in parallel:

In this case we’ll want to use InnoDB locking mechanism to make sure we give unique task to unique client. That’s what we do:

  1. Begin transaction.
  2. Select another job and lock it instantly using row-level locking (available from version 8.0).
  3. Remove returned job from queue table.
  4. Commit transaction.

In this scenario we will lock selected job and not allow access to it for other clients:

Let’s code that now:

<?php
$pdo->exec('START TRANSACTION'); # begin transaction
$statement = $pdo->prepare('SELECT * FROM queue ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED'); # get job and lock it
$statement->execute(); # execute select statement
$job = $statement->fetch(); # fetch selected data into php variable# remove fetched job from queue
$pdo->exec('DELETE FROM queue WHERE id = ' . $job['id']);
$pdo->exec('COMMIT'); # commit transaction
$job_data = json_decode($job['data'], 1); # unserialize JSON data
print_r($job_data); # do something with job data

Summary

At some point you might choose quick job queue implementation on top of Mysql instead of plugining in separate solution for that. Described implementation can be used with pretty large number of simultaneous clients as Mysql has proven performance and stability. This approach is used for the queue component of Mysqly data framework.

--

--