Multi-Process PHP

Grouped Execution

A while ago I wrote about using exec and shell commands to manage multiple PHP processes. One of the questions that came up was; “how does this differ from what’s out there?”

There are already excellent PHP component libraries, like React and Icicle, which specialise in asynchronous PHP concepts. I would have loved to use these but for two main reasons: I had to support PHP 5.3 and I couldn’t install any extra extensions.

You can read that post for more details, but out of the whole experience came Doorman and Remit.

I’ve since had a lot of reminding that it can be tricky to integrate asynchronous concepts into systems designed under the assumption that deferred processes will happen in a predictable order. We’re used to thinking about synchronous flow. So used to it, in fact, that we assume it all the time!

This got me thinking about how I can allow for groups of child process tasks, in Doorman. Consider the following example:

$manager = new ProcessManager();

$mkdir = new ProcessCallbackTask(function () {
// make a temporary directory
});

$manager->addTask($mkdir);

while ($manager->tick()) {
usleep(25000);
}


foreach ($this->getSlides() as $slide) {
$capture = new ProcessCallbackTask(function () use ($slide) {
// capture this slide as an image
});

$manager->addTask($capture);
}

while ($manager->tick()) {
usleep(25000);
}


$slides = $this->getSlideImagePaths();

$stitch = new ProcessCallbackTask(function() use ($slides) {
// create a new PDF document

foreach ($slides as $slide) {
// create a new page containing the image for a slide
}

// export PDF
});

$manager->addTask($stitch);

while ($manager->tick()) {
usleep(25000);
}
Imagine $this->getSlides() returns an array of slide objects, and
$this->getSlideImagePaths() returns a simple array of image paths.

There are a few logical groups. We need a place to store the temporary slide images, so we have to do that first. Then we need to create images of all the slides. They can be created in any order, so it’s best just to add them all and wait until they’re done. Lastly, we need to stitch all the slide images together in one PDF file. So they all need to be finished before that happens.

In this example, we can process all tasks in one of the groups, in any order. The first task we create has to happen first every time, and the last task we create has to happen last every time.

We need to repeat the manager loop to clear all tasks out before beginning the next group. It’s a pain!

What if we had something that was able to work in the following way?

$mkdir = new ProcessCallbackTask(function () {
// make a temporary directory
});

$captures = array();

foreach ($this->getSlides() as $slide) {
$capture = new ProcessCallbackTask(function () use ($slide) {
// capture this slide as an image
});

$captures[] = $capture;
}

$slides = $this->getSlideImagePaths();

$stitch = new ProcessCallbackTask(function() use ($slides) {
// create a new PDF document

foreach ($slides as $slide) {
// create a new page containing the image for a slide
}

// export PDF
});

$manager = new ProcessManager();

$manager->addTask($mkdir);
$manager->addTaskGroup($captures);
$manager->addTask($stitch);

while ($manager->tick()) {
usleep(25000);
}

Much cleaner! Let’s make a decorator to house this functionality:

namespace AsyncPHP\Doorman\Manager;

use AsyncPHP\Doorman\Manager;
use AsyncPHP\Doorman\Task;

class GroupProcessManager implements Manager
{
/**
* @var Manager
*/
protected $manager;

/**
* @var array
*/
protected $waiting = array();

/**
* @var array
*/
protected $queuing = array();

/**
* @param Manager $manager
*/
public function __construct(Manager $manager)
{
$this->manager = $manager;
}

/**
* @inheritdoc
*
* @param Task $task
*
* @return $this
*/
public function addTask(Task $task)
{
array_push($this->waiting, array($task));

return $this;
}


/**
* @param array $tasks
*
* @return $this
*/
public function addTaskGroup(array $tasks) {
foreach ($tasks as $task) {
assert($task instanceof Task);
}

array_push($this->waiting, $tasks);

return $this;
}


/**
* @inheritdoc
*
* @return bool
*/
public function tick()
{
if (!empty($this->queuing)) {
$this->manager->addTask(array_shift($this->queuing));
}

if ($this->manager->tick()) {
return true;
}

if (empty($this->waiting)) {
return false;
}

$this->queuing = array_shift($this->waiting);

return true;

}
}

The only important thing to notice here is the order of stuff in the tick() method:

  1. We have a property in which we temporarily store the “current” group. If there are tasks in this temporary array then we add one of them to the decorated manager. This gives each task time to boot (depending on the sleep time) so that rules are applied in the same way as with the decorated manager.
  2. Next we run the decorated manager for a tick. If there is still work being done in that manager then it must be tasks of the same group. We can end this method early. This way we’ll keep adding tasks in the queued group, and letting the decorated manager manager them.
  3. When the manager is done with all the tasks in the queued group, we can start to look for more task groups. If there are none left in the waiting property then we’ve finished executing all the waiting and running tasks.
  4. If there are still waiting task groups then we get one of them and store it in the queued property, creating a new queued task group cycle. We know that there is still work to be done, at this point, so we can return true.

It’s kind difficult to test any parallel execution model, but the method I’ve come up with is:

namespace AsyncPHP\Doorman\Tests\Manager;

use AsyncPHP\Doorman\Manager\GroupProcessManager;
use AsyncPHP\Doorman\Manager\ProcessManager;
use AsyncPHP\Doorman\Task\ProcessCallbackTask;
use AsyncPHP\Doorman\Tests\Test;

/**
* @covers AsyncPHP\Doorman\Manager\GroupProcessManager
*/
class GroupProcessManagerTest extends Test
{
/**
* @var GroupProcessManager
*/
protected $manager;

/**
* @inheritdoc
*/
public function setUp()
{
parent::setUp();

$this->manager = new GroupProcessManager(
new ProcessManager()
);

}

/**
* @test
*/
public function groupsExecuteInPredictableOrder()
{
$this->unlink("task1");
$this->unlink("task2");
$this->unlink("task3");
$this->unlink("task4");

$task1 = new ProcessCallbackTask(function () {
GroupProcessManagerTest::dawdle("task1");
});

$task2 = new ProcessCallbackTask(function () {
GroupProcessManagerTest::dawdle("task2");
});

$task3 = new ProcessCallbackTask(function () {
GroupProcessManagerTest::dawdle("task3");
});

$task4 = new ProcessCallbackTask(function () {
GroupProcessManagerTest::dawdle("task4");
});

$this->manager->addTask($task1);
$this->manager->addTaskGroup(array($task2, $task3));
$this->manager->addTask($task4);

while ($this->manager->tick()) {
$exists1 = $this->exists("task1");
$exists2 = $this->exists("task2");
$exists3 = $this->exists("task3");
$exists4 = $this->exists("task4");

if ($exists1 && ($exists2 || $exists3)) {
$this->fail();
}

if ($exists4 && ($exists2 || $exists3)) {
$this->fail();
}

usleep(25000);
}

}

/**
* @param string $name
*/
protected function unlink($name)
{
if ($this->exists($name)) {
unlink(__DIR__ . "/{$name}.temp");
}
}

/**
* @param string $name
*
* @return bool
*/
protected function exists($name)
{
return file_exists(__DIR__ . "/{$name}.temp");
}

/**
* @param string $name
*/
public static function dawdle($name)
{
touch(__DIR__ . "/{$name}.temp");

for ($i = 0; $i < 5; $i++) {
usleep(25000);
}

unlink(__DIR__ . "/{$name}.temp");
}
}

This test adds three task groups. If the first or third task group runs at the same time as the second; the test fails. If you want to check this with your own eyes, you could increase the sleep duration to a second. That’s what I did while I was making the test for this…

Questions or comments? Find me on Twitter!

Like what you read? Give Christopher Pitt a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.