Co-operative PHP Multitasking

When is an array like an adventure?

Christopher Pitt
Async PHP

--

Last week I got the opportunity to share recent work with my colleagues, at SilverStripe. I was going to present Async PHP today, but since I covered ReactPHP last week; I decided to talk about something slightly different. So here’s a post about cooperative multitasking.

I’m also planning on adding this stuff to the Async PHP book I’m writing. It’s going to include far more detail than you’ll read in this post, but I still think this post is a good overview of the topic!

Let’s go!

That’s the point of everything we’re going to look at. But we’re going to start somewhere simpler and more familiar.

It All Starts With Arrays

We can use arrays for simple iteration:

$array = ["foo", "bar", "baz"];

foreach ($array as $key => $value) {
print "item: " . $key . "|" . $value . "\n";
}

for ($i = 0; $i < count($array); $i++) {
print "item: " . $i . "|" . $array[$i] . "\n";
}

This is the kind of basic functionality that our daily code depends on. That is; being able to traverse an array and determine the keys and values of each item.

Naturally, we would want to be able to know when we’re working with an array. There’s a handy built-in function for just this purpose:

print is_array($array) ? "yes" : "no"; // yes

Array-like Stuff

There are times when we get hold of things that act in this way, but are not arrays. Consider working with the DOMDocument class:

$document = new DOMDocument();
$document->loadXML("<div></div>");

$elements = $document->getElementsByTagName("div");

print_r($elements); // DOMNodeList Object ( [length] => 1 )

This clearly isn’t an array, but it has a length property. Can we traverse this, in the same way we can traverse arrays? We can find out by checking whether it implements a special interface:

print ($elements instanceof Traversable) ? "yes" : "no"; // yes

That’s really helpful. We don’t have to trigger errors if the thing we want to traverse isn’t traversable. We can just check beforehand.

That leads to another question: could we make our own classes that behave in the same way? The answer is yes! Our first approach might resemble the following:

class MyTraversable implements Traversable
{
// our nonsense here...
}

If we run that, we’ll see an error message:

PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate…

Iterator

So we can’t directly implement Traversable, but we can try one of the other two…

class MyIterator implements Iterator
{
// our nonsense here...
}

This interface requires that we implement 5 methods. Let’s expand on our iterator:

class MyIterator implements Iterator
{
protected $data;

protected $index = 0;


public function __construct(array $data)
{
$this->data = $data;
}

public function current()
{
return $this->data[$this->index];
}

public function next()
{
return $this->data[$this->index++];
}

public function key()
{
return $this->index;
}

public function rewind()
{
$this->index = 0;
}

public function valid()
{
return $this->index < count($this->data);
}
}

Some important things to notice:

  1. We’re storing the constructor $data array so we can return items from it later.
  2. We also need an internal index (or pointer) to track things like current and next.
  3. rewind() just resets the index so that current() and next() will work as expected.
  4. Keys don’t have to be numeric! That’s just what I’m using here, to keep things simple.

We can run this code with:

$iterator = new MyIterator(["foo", "bar", "baz"]);

foreach ($iterator as $key => $value) {
print "item: " . $key . "|" . $value . "\n";
}

So this seems like a lot of work, but it’s a neat way to hook into the foreach/for functionality that arrays have.

IteratorAggregate

Remember the second interface that Traversable exception suggested we implement? Turns out it’s a lot quicker to implement than Iterator:

class MyIteratorAggregate implements IteratorAggregate
{
protected $data;

public function __construct(array $data)
{
$this->data = $data;
}

public function getIterator()
{
return new ArrayIterator($this->data);
}

}

We’re cheating a bit. Instead of implementing a whole new Iterator, we’re decorating ArrayIterator. Still, this is much less code than implementing a whole new Iterator.

Hold your horses! Let’s compare some code. First we’ll read each line in a file without a generator and then with:

$content = file_get_contents(__FILE__);

$lines = explode("\n", $content);

foreach ($lines as $i => $line) {
print $i . ". " . $line . "\n";
}

This reads itself, and for each line it prints the line number followed by a line of code. So let’s make it a generator, because WHY NOT!

function lines($file) {
$handle = fopen($file, "r");

while (!feof($handle)) {
yield trim(fgets($handle));
}


fclose($handle);
}

foreach (lines(__FILE__) as $i => $line) {
print $i . ". " . $line . "\n";
}

Now I know this looks more complicated. It is, but mostly because we’re not using file_get_contents(). A generator looks like a function, but it stops every time it gets to the yield keyword.

Generators look a little bit like iterators:

print_r(lines(__FILE__)); // Generator Object ( )

So it’s not an iterator, it’s a Generator. What methods does it have?

print_r(get_class_methods(lines(__FILE__)));

// Array
// (
// [0] => rewind
// [1] => valid
// [2] => current
// [3] => key
// [4] => next

// [5] => send
// [6] => throw
// [7] => __wakeup
// )

If you read a huge file, and use memory_get_peak_usage(), you’ll notice that the generator code uses a fixed amount of memory, no matter how big the file is. It’s only reading a single line at a time. Reading the whole file, with file_get_contents() uses more memory the bigger the file gets. This is one of the benefits of using iterators for this kind of thing!

Send

It’s possible to send data into a generator. Consider the following generator:

$generator = call_user_func(function() {
yield "foo";
});

print $generator->current() . "\n"; // foo

Notice how we’re wrapping the generator function within call_user_func()? That’s just a shortcut for defining the function and then immediately calling it to get a new generator instance…

We’ve already seen this kind of yield usage. We can extend this generator to accept data as well:

$generator = call_user_func(function() {
$input = (yield "foo");
print "inside: " . $input . "\n";
});

print $generator->current() . "\n";

$generator->send("bar");

Data enters and leaves through the yield keyword. To begin with, current() executes the code until it sees yield, and returns foo. send() pushes it past that yield to where the generator prints the input. This takes some getting used to…

Throw

Since we’re jumping in and out of these functions, we might want to push exceptions into generators. That way they can handle the fallout in their own way.

Consider the following:

$multiply = function ($x, $y) {
yield $x * $y;
};

print $multiply(5, 6)->current(); // 30

Now, let’s wrap this in another function:

$calculate = function ($op, $x, $y) use ($multiply) {
if ($op === "multiply") {
$generator = $multiply($x, $y);

return $generator->current();

}
};

print $calculate("multiply", 5, 6); // 30

So here we have a normal closure, wrapping the multiply generator. Let’s protect against invalid arguments:

$calculate = function ($op, $x, $y) use ($multiply) {
if ($op === "multiply") {
$generator = $multiply($x, $y);

if (!is_numeric($x) || !is_numeric($y)) {
throw new InvalidArgumentException();
}


return $generator->current();
}
};

print $calculate("multiply", 5, "foo"); // PHP Fatal error...

What if we want the generator to be able to handle the exception? We can send it through to the generator!

$multiply = function ($x, $y) {
try {
yield $x * $y;
} catch (InvalidArgumentException $exception) {
print "ERRORS!";
}

};

$calculate = function ($op, $x, $y) use ($multiply) {
if ($op === "multiply") {
$generator = $multiply($x, $y);

if (!is_numeric($x) || !is_numeric($y)) {
$generator->throw(new InvalidArgumentException());
}

return $generator->current();
}
};

That’s pretty neat! So: we can use generators just like iterators. And we can also send data into them and throw exceptions through them. They’re interruptible and resumable functions. Some languages would call these kinds of functions…

Turns out we can use coroutines to model asynchronous code. Let’s make a simple task scheduler. First we’ll need a Task class:

class Task
{
protected $generator;

public function __construct(Generator $generator)
{
$this->generator = $generator;
}

public function run()
{
$this->generator->next();
}

public function finished()
{
return !$this->generator->valid();
}
}

Task is a decorator for ordinary generators. We store the generator for later use, and implement simple run() and finished() methods. run() makes the task tick, while finished() lets the scheduler know when to stop running the task.

Then we need a Scheduler class:

class Scheduler
{
protected $queue;

public function __construct()
{
$this->queue = new SplQueue();
}

public function enqueue(Task $task)
{
$this->queue->enqueue($task);
}

public function run()
{
while (!$this->queue->isEmpty()) {
$task = $this->queue->dequeue();
$task->run();

if (!$task->finished()) {
$this->enqueue($task);
}
}

}
}

Scheduler maintains a queue of running tasks. run() will run until the queue is empty, and pulls a task off the queue to run it. If the task is unfinished, after we run it once, we send it back to the queue.

SplQueue is great for this case. It’s a first-in-first-out structure, which ensures each task will get a fair amount of processor time.

We can run this code, with:

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
for ($i = 0; $i < 3; $i++) {
print "task 1: " . $i . "\n";
yield;
}
}));

$task2 = new Task(call_user_func(function() {
for ($i = 0; $i < 6; $i++) {
print "task 2: " . $i . "\n";
yield;
}
}));

$scheduler->enqueue($task1);
$scheduler->enqueue($task2);

$scheduler->run();

The first time we run this, we should see output resembling:

task 1: 0
task 1: 1
task 2: 0
task 2: 1
task 1: 2
task 2: 2
task 2: 3
task 2: 4
task 2: 5

This is almost exactly what we want to happen. The trouble is the first runs of each task appear to happen twice. We can fix this with a small change to Task:

class Task
{
protected $generator;

protected $run = false;

public function __construct(Generator $generator)
{
$this->generator = $generator;
}

public function run()
{
if ($this->run) {
$this->generator->next();
} else {
$this->generator->current();
}

$this->run = true;

}

public function finished()
{
return !$this->generator->valid();
}
}

We need to adjust the first invocation of run() so that it reads the current generator valid. Subsequent invocations can advance the generator pointer…

Some folks have built wonderful libraries on this idea. We’ll just look at two, for now…

RecoilPHP

RecoilPHP is a set of coroutine-based libraries, the most impressive of which is a kernel for ReactPHP. It’s possible to swap the event loop from ReactPHP with the one from RecoilPHP, without major changes to your application.

Let’s look at some ReactPHP-only, asynchronous DNS resolution:

function resolve($domain, $resolver)
{
$resolver
->resolve($domain)
->then(
function($ip) use ($domain) {
print "domain: " . $domain . "\n";
print "ip: " . $ip . "\n";
},
function($error) {
print $error . "\n";
}
);
}

function run()
{
$loop = React\EventLoop\Factory::create();

$factory = new React\Dns\Resolver\Factory();

$resolver = $factory->create("8.8.8.8", $loop);

resolve("silverstripe.org", $resolver);
resolve("wordpress.org", $resolver);
resolve("wardrobecms.com", $resolver);
resolve("pagekit.com", $resolver);

$loop->run();

}

run();

resolve() accepts a domain name and DNS resolver, and performs a standard ReactPHP DNS lookup. Don’t get too caught up in the internals of resolve(). The important thing is that the function isn’t a generator. It’s just a function!

run() creates a ReactPHP event loop, a DNS resolver (by way of a factory) and resolves a few domain names. Again, not a generator.

Wonder what the RecoilPHP differences are? Wonder no more!

use Recoil\Recoil;

function resolve($domain, $resolver)
{
try {
$ip = (yield $resolver->resolve($domain));

print "domain: " . $domain . "\n";
print "ip: " . $ip . "\n";
} catch (Exception $exception) {
print $exception->getMessage() . "\n";
}
}

function run()
{
$loop = (yield Recoil::eventLoop());

$factory = new React\Dns\Resolver\Factory();

$resolver = $factory->create("8.8.8.8", $loop);

yield [
resolve("silverstripe.org", $resolver),
resolve("wordpress.org", $resolver),
resolve("wardrobecms.com", $resolver),
resolve("pagekit.com", $resolver),
];

}

Recoil::run("run");

It’s doing a few magical things to allow such tight integration with ReactPHP. Each time resolve() runs, RecoilPHP manages the promise, returned from $resolver->resolve(), and sends the data back into the generator. At that point we can use it as though we were writing synchronous code. Just a list of instructions, unlike the callback code we might be accustomed to in other asynchronous models.

RecoilPHP knows it should manage the array of yields, we return in run(), in this way. RecoilPHP also includes coroutine-based database (PDO) and logger libraries.

IcicleIO

IcicleIO is a new attempt to achieve the goals of ReactPHP, using only coroutines. It has fewer secondary libraries than ReactPHP. Still, the core asynchronous stream/server/socket/loop features are there.

Let’s look at an example socket server:

use Icicle\Coroutine\Coroutine;
use Icicle\Loop\Loop;
use Icicle\Socket\Client\ClientInterface;
use Icicle\Socket\Server\ServerInterface;
use Icicle\Socket\Server\ServerFactory;

$factory = new ServerFactory();

$coroutine = Coroutine::call(function (ServerInterface $server) {

$clients = new SplObjectStorage();

$handler = Coroutine::async(
function (ClientInterface $client) use (&$clients) {

$clients->attach($client);

$host = $client->getRemoteAddress();
$port = $client->getRemotePort();

$name = $host . ":" . $port;

try {
foreach ($clients as $stream) {
if ($client !== $stream) {
$stream->write($name . "connected.\n");
}
}

yield $client->write("Welcome " . $name . "!\n");

while ($client->isReadable()) {
$data = trim(yield $client->read());


if ("/exit" === $data) {
yield $client->end("Goodbye!\n");
} else {
$message = $name . ":" . $data . "\n";

foreach ($clients as $stream) {
if ($client !== $stream) {
$stream->write($message);
}
}
}
}
} catch (Exception $exception) {
$client->close($exception);
} finally {
$clients->detach($client);
foreach ($clients as $stream) {
$stream->write($name . "disconnected.\n");
}
}
}
);

while ($server->isOpen()) {
$handler(yield $server->accept());
}

}, $factory->create("127.0.0.1", 6000));

Loop::run();

As far as I can tell, this code is doing the following things:

  1. Creating a server instance, at host 127.0.0.1 and port 6000, and passing that to the outer generator.
  2. The outer generator runs, while the server is open to new connections. When the server accepts a connection it passes it into the inner generator.
  3. The inner generator writes a welcome message to the socket. It then runs while the socket is readable.
  4. Each time the socket sends a message to the server, the inner generator checks if the message is exit. If so, the other sockets are informed. If not, the other sockets are sent the same message.

Open up terminal and type nc localhost 6000 to see this in action!

The example uses SplObjectStorage to keep track of the socket connections. This is so that we can send messages to all the sockets.

This topic can be a lot to take in. Hopefully you see where generators came from, and how they can help with iteration and asynchronous code.

If you’ve got questions, feel free to ask me.

I would like to thank Nikita Popov (especially for his illuminating post on co-opertive multitasking), Anthony Ferrara and Joe Watkins. The work of these gifted developers and teachers inspired me to write this post. Give them a follow, will ya?!

--

--