Create a websocket server with Symfony and Ratchet

I’m developing a big Symfony 3 project with listeners. I tried to create a notification server for users with websocket.

My problem : send real-time notifications for connected users. So I need to send notifications to a specific authenticated user.

The solution

Websocket server

I’m using Ratchet in order to handle the websocket server : http://socketo.me/

The installation is pretty simple :

composer require cboden/ratchet

And create a new classe implementing MessageComponentInterface.

In symfony, I create a commande line process to create the websocket server :

<?php
namespace
NotificationsBundle\Command;


use NotificationsBundle\Server\Notification;
use Ratchet\Http\HttpServer;
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ServerCommand extends ContainerAwareCommand
{

/**
* Configure a new Command Line
*/
protected function configure()
{
$this
->setName('Project:notification:server')
->setDescription('Start the notification server.');
}

protected function execute(InputInterface $input, OutputInterface $output)
{

$server = IoServer::factory(new HttpServer(
new WsServer(
new Notification($this->getContainer())
)
), 8080);

$server->run();

}

}

My server class is looking like this :

<?php
namespace
NotificationsBundle\Server;



use Lexik\Bundle\JWTAuthenticationBundle\Security\Authentication\Token\JWTUserToken;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use UserCrmBundle\Entity\User;

class Notification implements MessageComponentInterface
{

protected $connections = array();

protected $container;

public function __construct(ContainerInterface $container)
{
$this->container = $container;
}

/**
* A new websocket connection
*
*
@param ConnectionInterface $conn
*/
public function onOpen(ConnectionInterface $conn)
{
$this->connections[] = $conn;
$conn->send('..:: Hello from the Notification Center ::..');
echo "New connection \n";
}

/**
* Handle message sending
*
*
@param ConnectionInterface $from
*
@param string $msg
*/
public function onMessage(ConnectionInterface $from, $msg)
{
$messageData = json_decode(trim($msg));
        if(isset($messageData->userData)){
//1st app message with connected user
$token_user = $messageData->userData;

//a user auth, else, app sending message auth
echo "Check user credentials\n";
//get credentials
$jwt_manager = $this->container->get('lexik_jwt_authentication.jwt_manager');
$token = new JWTUserToken();
$token->setRawToken($token_user);
$payload = $jwt_manager->decode($token);

//getUser by email
if(!$user = $this->getUserByEmail($payload['username'])){
$from->close();
}
echo "User found : ".$user->getFirstname() . "\n";
$index_connection = $payload['username'];

$all_connections = $this->connections;
foreach($all_connections as $key => $conn){
if($conn === $from){
$this->connections[$index_connection] = $from;
$from->send('..:: Connected as '.$index_connection.' ::..');
unset($this->connections[$key]);
break;
} else {
continue;
}

}
} else {
//error
$from->send("You're not able to do that!");
}

}

/**
* A connection is closed
*
@param ConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn)
{
foreach($this->connections as $key => $conn_element){
if($conn === $conn_element){
unset($this->connections[$key]);
break;
}
}
}

/**
* Error handling
*
*
@param ConnectionInterface $conn
*
@param \Exception $e
*/
public function onError(ConnectionInterface $conn, \Exception $e)
{
$conn->send("Error : " . $e->getMessage());
$conn->close();
}


/**
* Get user from email credential
*
*
@param $email
*
@return false|User
*/
protected function getUserByEmail($email)
{
if(!filter_var($email, FILTER_VALIDATE_EMAIL)){
return false;
}

$em = $this->container->get('doctrine')->getManager();
$repo = $em->getRepository('UserCrmBundle:User');

$user = $repo->findOneBy(array('email' => $email));

if($user && $user instanceof User){
return $user;
} else {
return false;
}

}

}

Explanation :

We can’t send some custom header in a websocket server. So you need to find a way to authenticate users. My user are authenticated to my API with a JWToken thanks to lexik/jwt-authentication-bundle bundle. When a user connect to the websocket server, he needs to send me a message with their token.

I assign a websocket to a user email after the authentification. And i’m now able to send a message to a specific user.

The client side :

You just need to follow this example :

//lancement serveur chat
function NotifServer(){
notif = new WebSocket("ws://IP_SERVER:8080");

notif.onmessage = function (event) {
console.log(event.data);
$('.content').append('<p>'+ event.data +'</p>');

}

notif.onopen = function() {
$('.content').append('<p> >>> Connected</p>');
var token_user = *TOKEN_USER*;
var authElements = "{\"userData\":\""+token_user+"\"}";
notif.send(authElements);
}

notif.onerror = function(error) {
$('.content').append('<p> >>> Error...</p>');
//alert('error');
}


}

NotifServer();

And that’s it!

To run your server :

php bin/console Project:notification:server

In order to deploy it in production, you need something to Check if the notification server is alive. You can use supervisod for that purpose.

A simple tutorial using Supervisord for a NodeJS server. You can use it to create a Websocket server daemon : https://serversforhackers.com/monitoring-processes-with-supervisord

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.