Symfony Legacy: Delete After ACK

Mücahit Cücen
istanbulphp
Published in
2 min readNov 6, 2023

Yazılım geliştiriciler olarak her ne kadar kullandığımız dillerin ve framework’lerin güncel sürümleriyle çalışmak istesek de bazen eski sürümlerle uğraşmak zorunda kalabiliyoruz. Bu yazıda ise Symfony 4 sürümünde Redis üzerinden yönetilen kuyrukların Redis’i şişirmesi sorununu ele alacağız.

Symfony Messenger paketinin güncel versiyonlarında delete_after_ack parametresini true işaretleyerek işlenen mesajların Redis’ten silinmesini kolayca sağlayabiliyoruz. Eski sürümlerde ise bize böyle bir seçenek sunulmuyor.

Bu sorunu çözmek için yapmamız gereken şey ise bir EventSubscriber yazmak. Bu subscriber kuyrukta iş tamamlandığı zaman ilgili kaydı Redis’ten silecek.

Symfony Messenger paketi Redis üzerinde işleri tutarken stream özelliğini kullanıyor. Stream üzerinden bir kaydı silmek için ise streamName ve messageId bilgilerine ihtiyacımız var. Aşağıdaki kod bloğunda farklı kuyrukların streamName ve kuyrukta işlenen bir işin messageId bilgisini nasıl alabileceğimizi görebilirsiniz. Kendi projenizde receiverName/streamName eşleşmelerini messenger.yaml dosyasından teyit edip kod örneğindeki getStreamName metodunu kendinize göre düzenlemeniz gerekir.

<?php

namespace App\EventSubscriber;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceivedStamp;

use function Sentry\captureException;

class MessageProcessedSubscriber implements EventSubscriberInterface
{
private \Redis $redis;

public function __construct(\Redis $redis)
{
$this->redis = $redis;
}

public function onMessageHandled(WorkerMessageHandledEvent $event)
{
$streamName = $this->getStreamName($event->getReceiverName());

$messageDetails = $event->getEnvelope()->all();
$receivedMessages = $messageDetails[RedisReceivedStamp::class];
/** @var RedisReceivedStamp $message */
foreach ($receivedMessages as $message) {
try {
$this->redis->xDel($streamName, [$message->getId()]);
} catch (\Exception $exception) {
captureException($exception);
}
}
}

public static function getSubscribedEvents(): array
{
return [
WorkerMessageHandledEvent::class => 'onMessageHandled',
];
}

private function getStreamName(string $receiverName): string
{
if ($receiverName === 'mail') {
return 'mail_messages';
}

if ($receiverName === 'export') {
return 'export_messages';
}

return 'messages';
}
}

--

--