Symfony Legacy: Delete After ACK
Yazılım geliştiriciler olarak her ne kadar kullandığımız dillerin ve framework’lerin güncel sürümleriyle çalışmak istesek de bazen eski sürümlerle uğraşmak zorunda kalabiliyoruz. Bu yazıda ise Symfony 4 sürümünde Redis üzerinden yönetilen kuyrukların Redis’i şişirmesi sorununu ele alacağız.
Symfony Messenger paketinin güncel versiyonlarında delete_after_ack
parametresini true
işaretleyerek işlenen mesajların Redis’ten silinmesini kolayca sağlayabiliyoruz. Eski sürümlerde ise bize böyle bir seçenek sunulmuyor.
Bu sorunu çözmek için yapmamız gereken şey ise bir EventSubscriber
yazmak. Bu subscriber kuyrukta iş tamamlandığı zaman ilgili kaydı Redis’ten silecek.
Symfony Messenger paketi Redis üzerinde işleri tutarken stream
özelliğini kullanıyor. Stream üzerinden bir kaydı silmek için ise streamName
ve messageId
bilgilerine ihtiyacımız var. Aşağıdaki kod bloğunda farklı kuyrukların streamName
ve kuyrukta işlenen bir işin messageId
bilgisini nasıl alabileceğimizi görebilirsiniz. Kendi projenizde receiverName/streamName
eşleşmelerini messenger.yaml
dosyasından teyit edip kod örneğindeki getStreamName
metodunu kendinize göre düzenlemeniz gerekir.
<?php
namespace App\EventSubscriber;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceivedStamp;
use function Sentry\captureException;
class MessageProcessedSubscriber implements EventSubscriberInterface
{
private \Redis $redis;
public function __construct(\Redis $redis)
{
$this->redis = $redis;
}
public function onMessageHandled(WorkerMessageHandledEvent $event)
{
$streamName = $this->getStreamName($event->getReceiverName());
$messageDetails = $event->getEnvelope()->all();
$receivedMessages = $messageDetails[RedisReceivedStamp::class];
/** @var RedisReceivedStamp $message */
foreach ($receivedMessages as $message) {
try {
$this->redis->xDel($streamName, [$message->getId()]);
} catch (\Exception $exception) {
captureException($exception);
}
}
}
public static function getSubscribedEvents(): array
{
return [
WorkerMessageHandledEvent::class => 'onMessageHandled',
];
}
private function getStreamName(string $receiverName): string
{
if ($receiverName === 'mail') {
return 'mail_messages';
}
if ($receiverName === 'export') {
return 'export_messages';
}
return 'messages';
}
}