Symfony: PHP asynchronous or enqueue bundle, How to create a queue and progress bar

Azri Bilel
6 min readJun 22, 2019

--

What is Asynchronous Programming?

Asynchronous programs rely on non-blocking code to continuously process available tasks within a single thread of execution. Operations are only made with data that is immediately available. Since all data required by most programs cannot be immediately available, requests need to be made for data outside the program. Asynchronous programs must then use a different strategy for external operations that would normally cause the program to block. To avoid blocking, asynchronous programs use functions that also accept a callback function that is executed once the external request has been completed. Instead of blocking until the request is completed, the program is able to continue execution even though the result of the request is not available.

PHP scripts are generally written using blocking requests. For example, calling the function file_get_contents() to fetch the contents of a file will block the execution of the script until the operation has completed. While the process is blocked, no other code can be run within that PHP process.

To develop async PHP you must build the PHP package, because there are some functions not allowed in simple php (php-dev, php-common ..) like pcntl_async_signals(true).

Build PHP on Ubuntu

#!/usr/bin/env bashINSTALL_PKGS="libzip-dev bison autoconf build-essential pkg-config git-core libltdl-dev libbz2-dev libxml2-dev libxslt1-dev libssl-dev libicu-dev libpspell-dev libenchant-dev libmcrypt-dev libpng-dev libjpeg8-dev libfreetype6-dev libmysqlclient-dev libreadline-dev libcurl4-openssl-dev mysql-server apache2 libxslt-dev libpcre3 libpcre3-dev curl libenchant-dev ibcurl4-gnutls-dev"for i in $INSTALL_PKGS; do
sudo apt-get install -y $i
done
ln -s x86_64-linux-gnu/curl /usr/includewget https://www.php.net/distributions/php-7.2.16.tar.gz -O /tmp/php7.2-lucky.tar.gztar-C /tmp -xf /tmp/php7.2-lucky.tar.gzcd /tmp/php-7.2.16/tmp/php-7.2.16/buildconf --forcemkdir -p ~/bin/php7.2-lucky/mkdir -p /etc/php7.2-lucky/confCONFIGURE_STRING="--prefix=/etc/php7.2-lucky --sysconfdir=/etc --localstatedir=/var \
--datadir=/usr/share/php --mandir=/usr/share/man \
--with-bz2 --with-zlib --enable-zip --disable-cgi \
--enable-soap --enable-intl --with-openssl --with-readline --with-curl \
--enable-ftp --enable-mysqlnd --with-mysqli=mysqlnd --with-pdo-mysql=mysqlnd \
--enable-sockets --enable-pcntl --with-pspell --with-enchant --with-gettext \
--with-gd --enable-exif --with-jpeg-dir --with-png-dir --with-freetype-dir --with-xsl \
--enable-bcmath --enable-mbstring --enable-calendar --enable-simplexml --enable-json \
--enable-hash --enable-session --enable-xml --enable-wddx --enable-opcache \
--with-pcre-regex --with-config-file-path=/etc/php7.2-lucky/conf \
--with-config-file-scan-dir=/etc/php7.2-lucky/etc --enable-cli --enable-maintainer-zts \
--with-tsrm-pthreads --enable-debug --enable-fpm --enable-phar --with-pear \
--with-fpm-user=www-data --with-fpm-group=www-data" --enable-sysvmsg
/tmp/php-7.2.16/./configure $CONFIGURE_STRINGmake -C /tmp/php-7.2.16/
make -C /tmp/php-7.2.16/ install
chmod o+x /etc/php7.2-lucky/bin/phpize
chmod o+x /etc/php7.2-lucky/bin/php-config
git -C /tmp/php-7.2.16/ clone https://github.com/krakjoe/pthreads.gitcd pthreads
/etc/php7.2-lucky/bin/phpize
./configure \
--prefix='/etc/php7.2-lucky' \
--with-libdir='/lib/x86_64-linux-gnu' \
--enable-pthreads=shared \
--with-php-config='/etc/php7.2-lucky/bin/php-config'
make -C /tmp/php-7.2.16/pthreads
make -C /tmp/php-7.2.16/pthreads install
ln -s /etc/php7.2-lucky/bin/* /usr/local/bin/
cp /tmp/php-7.2.16/sapi/fpm/init.d.php-fpm /etc/init.d/php-fpm
cp /tmp/php-7.2.16/sapi/fpm/php-fpm.conf /etc/php-fpm.conf
mkdir -p /etc/php-fpm.d/
cp /tmp/php-7.2.16/sapi/fpm/www.conf /etc/php-fpm.d/
chmod -R 755 /etc/init.d/php-fpm
update-rc.d php-fpm defaults
cp /tmp/php-7.2.16/php.ini* /etc/php7.2-lucky/conf
cp /var/www/html/builder/php.ini /etc/php7.2-lucky/conf/php.ini
cp /var/www/html/builder/php.ini /etc/php7.2-lucky/conf/php-cli.ini
echo "extension=pthreads.so" | sudo tee -a /etc/php7.2-lucky/conf/php-cli.ini
echo "zend_extension=opcache.so" | sudo tee -a /etc/php7.2-lucky/conf/php-cli.ini
# if php build fail with icu error
# install icu for php intl

Notice: the PHP asynchronous work only on the CLI.
To make PHP scripts asynchronous you need to use one of that repositories: https://github.com/icicleio/icicle or: https://github.com/reactphp/react

I guess the PHP asynchronous is not very useful in many cases there are many bugs and not easy to develop asynchronous scripts, there’s no community to share your bugs so you lose a lot of time, the node js is very powerful for those cases.
If you don’t want to develop with nodejs, you can follow these steps to develop real-time services.

Symfony enqueue bundle

The enqueue bundle use a database server like RabbitMQ or Redis for processing event, you can also use files but recommended.
In my application, I used the SncRedisBundle for caching so I exploit the Redis server.

Redis server installation on 18.04 & 16.04 LTS :

apt update
apt upgrade
apt install redis-server
apt install php-redis
systemctl restart redis-server.service

SncRedisBundle: https://github.com/snc/SncRedisBundle/blob/master/Resources/doc/index.md

EnqueueBundle installation:

php composer.phar require enqueue/enqueue-bundle enqueue/redis
php composer.phar require predis/predis

AppKernel.php

class AppKernel extends Kernel
{
public function registerBundles()
{
$bundles = [
// ...
new Enqueue\Bundle\EnqueueBundle(),
]; // ...
}
}

After the installation, you need to create a service to fire your events.

<?phpnamespace YourBundle\AsyncTask;use Enqueue\Client\ProducerInterface;class FireEventService
{
/**
*
@var ProducerInterface
*/
private $producer;
/**
*
FireEventService constructor.
*
@param ProducerInterface $producer
*/
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
/**
*
@param $userId
*
@param $parameters
*/
public function fireEvent($userId, $data)
{
$this->producer->sendEvent("process_data_command", [
'user_id' => $userId,
'data' => $data
]);
}

}

Now you need to create an Event processor :

<?phpnamespace YourBundle\AsyncTask;
use Doctrine\ORM\EntityManagerInterface;
use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Util\JSON;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
class EventProcessor implements Processor, TopicSubscriberInterface
{
/** @var EntityManagerInterface */
private $em;
public function __construct(EntityManagerInterface $em)
{
$this->em = $em;
}
public function process(Message $message, Context $context)
{
$decodedMessage = JSON::decode($message->getBody());
$userId = $decodedMessage['user_id'];
$yourData = $decodedMessage['data'];
$fooQueue = $context->createQueue('your_queue_name_'.$userId);
$producer = $context->createProducer();
$progress = 0;
$total = count($yourData);
foreach ($yourData as $data) {
$processed = round((($progress) / $total) * 100, 2);
$progress++;
try {
// do wat you want with data $message = "Update data: $data success !"; } catch (\Exception $e) { $message = "Update data: $data failed ! Exception: " . $e->getMessage();
}
$response = array('progress_status' => $processed . '%',
'progress' => $processed,
'message' => $message ,
);

$queueMessage = $context->createMessage(json_encode($response));
$producer->send($fooQueue, $queueMessage);
return self::ACK; }
}
public static function getSubscribedTopics()
{
return ['process_data_command'];
}
}

Now you must create a function to consume the messages of processor and return a JSON response, you can create a Trait to use it in many controllers.

<?phpnamespace YourBundle\Helpers;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Redis\RedisDestination;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Response;
trait AsyncTaskObserverTrait
{
private function observe($queueName)
{
/*
* check if queue service is running
* */
$ps = (int) trim(shell_exec('ps -ax | grep -v grep | grep -c "enqueue:consume"'));
if ($ps > 0) {
$redisFactory = new RedisConnectionFactory();
$context = $redisFactory->createContext();
$destination = new RedisDestination($queueName);
$consumer = $context->createConsumer($destination);
if ($message = $consumer->receive(5000)) {
$messageBody = $message->getBody();
$decodedMsg = json_decode($messageBody);
foreach ($decodedMsg as $response) {
if ($response->progress_status == 'complete') {
$context->purgeQueue($destination);
}
}
return new JsonResponse(json_decode($messageBody));
}
return new JsonResponse('waiting');
}
return new JsonResponse('Queues services is down',Response::HTTP_INTERNAL_SERVER_ERROR);
}
}

Use the trait in the controller:

use AsyncTaskObserverTrait;/**
* Create Xtream Database
*
@Route("/observe-process", name="observe_process", options={"expose"=true})
*
@Method({"GET"})
*/
public function observeProcess()
{
return $this->observe('your_queue_name_'.$this->getUser()->getId());
}

Then you can do an ajax polling or ServerSentEvent on this route to create a pretty progress bar and show the messages to the user.

After all, don’t forget the service.yml

fire.event.service:
class: YourBundle\AsyncTask\FireEventService
arguments: ["@enqueue.client.default.producer"]
YourBundle\AsyncTask\EventProcessor:
arguments: ["@service_container","@doctrine.orm.entity_manager"]
tags:
- { name: 'enqueue.topic_subscriber' }

Consumer command

php bin/console enqueue:consume --setup-broker -vvv

--

--