Arakatmanlar -3. MQTT ile Adım Adım Programlama

Huseyin Kutluca
Yazılım Mimarileri
4 min readMay 27, 2019

Ortamı Hazırlama

Şimdi diyelim ki karar verdik ve MQTT’yi kullanacağız. Nereden başlamak gerekiyor. Tabi ki ilk önce broker yani aracı yazılımı indirip kurmalıyız. Bu konuda benim önerim Mosquitto olur. Windows ortamına zip dosyasını indirip Mosquitto.exe uygulamasını komut satırından çalıştırmak yetiyor. Doğru mu kurduğunuzdan emin olmak istiyorsanız başka bir komut satırı açıp:

mosquitto_sub.exe -t selamlama_konusu

Yazarak selamlama_konusu’na abone olursunuz. Hata almayıp komut satırı bekliyorsa tamamdır. Bir başka komut satırından ise:

mosquitto_pub.exe -t selamlama_konusu –m -m “Nasilsiniz Bakalim”

yazın. Böylelikle selamlama_konusu üzerinden bir mesaj yayımlamış olursunuz. Eğer bu mesaj bir önceki mosquitto_sub uygulamasında da görülüyorsa ortamınız tamam demektir.

Şimdi programlamaya hazırsınız demektir. İnternette açık kaynak olarak farklı MQTT istemcileri (client) mevcut. C++ için Mosquitto’nun kendi istemcisi da mevcuttur. Ben bu istemci ile başladım ama aynı anda hem veri yayımlama hem de veriye abone olma konusunda sorunlar ile karşılaştım. Daha sonra geçtiğim PAHO istemci kütüphanesi sorunsuz olarak işimi gördü.

Java tarafında yine PAHO istemcisi kullandık.

Uygulamayı C++ ile geliştirdiğim halde C kütüphanesi kulandım fakat kendim ilgili kütüphaneyi C++ sınıfı ile sarmaladım. Üçüncü parti kütüphanelerini tek bir sınıf içinde tutarak diğer sınıflara yaymamak tavsiye edilen bir yaklaşımdır. Böylelikle farklı bir kütüphane ye geçiş yapıldığında daha kolay ve sorunsuz bir geçiş yapılabilir.

İlklendirme

Ara katman kütüphanesini projenize ekledikten sonra ilk yazmanız gereken ilklendirme kodu. Bu kapsamda ben initialize methodu içine bu kodları koydum. Aşağıda bu kodu görebilirsiniz.

void MQTTCommunicator::Initialize(const char *id, const char *host, int port)
{
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts =
MQTTAsync_disconnectOptions_initializer;
MQTTAsync_create(&client, host,
“DeviceInterface”,MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, client, connectionLost,
messageArrived, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) !=
MQTTASYNC_SUCCESS)
{
spdlog::get(“LOGGER”)->error(
“Failed to Connect, return code {}”, rc);
exit(EXIT_FAILURE);
}
}

Bu kod ilklendirmenin yanı sıra asenkron olarak bağlanmaya çalışıyor ve bağlandığı takdirde çağrılacak olan OnConnect methodunu bağlantı opsiyonları (connectOptions) olarak verdim. Benzer şekilde mesaj geldiğinde çağırmak üzere messageArrived methosunu verdim. Burada onConnect ve messageArrived metotlarının statik olduğunu hatırlatmak isterim.

void MQTTCommunicator::onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts =
MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
opts.onSuccess = NULL;
opts.onFailure = onSubscribeFailure;
opts.context = client;
MQTTAsync_subscribe(client, “Sselamlama_konusu”, 1, &opts);}

Abone Olma

Bağlantı kurulunca hangi konulara abone olunacak ise o konular için MQTT?Asynch_subscribe fonksiyonu çağrılıyor.

Yukarıdaki örnekte selamlama_konusu’na abone olunuyor.

Peki abone olduk ve veri geldiğinde ne olacak diye soruyorsanız messageArrived metodumuz ara katman tarafından çağrılacaktır. Burada hem mesaj adı hem de mesaj içeriği gelmektedir. Burada mesaj çok basit olduğu için tabi ki yazdırıp geçiyoruz. Ama normal bir uygulamada ne yapmanız gerekir diyorsanız öncelikle mesajı geldiği biçimden struct ya da sınıf yapısına çözmeniz (decode) daha sonra oluşan sınıf ya da structı veriyi işleyecek olan başka bir thread’in iş kuyruğuna(active object pattern) atmalısınız. Veriyi işleyen thread tarafında visitor pattern uygulayarak veriyi daha etkin işleyebilirsiniz. Konuyu dağıtmamak için bunların detaylarını vermiyorum.

Yazılımı derleyip çalıştırdıktan sonra mosquitto_pub ile veri yayımlayıp yazılımı test edebilirsiniz.

int MQTTCommunicator::messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
char messageData[256];
memcpy(messageData, message->payload, message->payloadlen);
messageData[message->payloadlen] = 0;
if(!strcmp(topicName, “selamlama_konusu”))
{
printf(message.payload);
// Burada veriyi önce decode edip sınıf ya da struct oluşturun
// sonra veriyi işlediğiniz thread'in fifo kuyruğuna atın.
// Veriyi işlediğiniz yerde visitor pattern kullanabilirsiniz.
}
}

Yayımlama

Veri yayımlama işlemi MQTT de daha kolay olmaktadır. Öncelikle MQTT veriyi yayımlamak için önceden ara katmana bildirmeyi gerektirmiyor. Veriyi yayımlayacak iken MQTTAsynch_sendMessage fonksiyonu çağrılarak veri yayımlanır. Tabi ki yayımlayacağınız konuyu ve mesaj içeriğini parametre olarak veriyoruz.

void MQTTCommunicator::PublishSelamlamaMessage(char* buffer)
{
int msgSize = strlen(buffer);
MQTTAsync_responseOptions opts =
MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
opts.onSuccess = 0;
opts.context = client;
pubmsg.payload = buffer;
pubmsg.payloadlen = msgSize;
pubmsg.qos = 0;
pubmsg.retained = 0;
int rc;
if ((rc = MQTTAsync_sendMessage(client, “selamlama konusu”,
&pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Error Sending Message");
}
}

Burada da basit bir test mesajı yayımladık. Daha kompleks bir veriyi yayımlarken önce ilgili veriyi arakatman seviyesinde yayımlayacağımız formata örneğin json formatına çevirip daha sonra json textini yayımlayacağız.

Test ve Entegrsyon

Arakatmanlar ile çalışır iken hayatımızı kolaylaştıran geliştirme ve test araçları çok kritiktir. Örnek uygulama yapar iken bir konuyu yayımlayacağı bir konuya abone olup işleri kolay görebiliriz. Özellikle onlarca konunun olduğu orta karmaşıklıkta bir sistemde ya da yüzlerce konunun olduğu yüksek karmaşıklıktaki sistemlerde bu araçlar ne kadar gelişmiş ise bizim yazılımı geliştirebilmemiz de o kadar kolay olur. MQTT için bu kapsamda MQTT-SPY ürünü bulunmaktadır.

Java ile geliştirilmiş olan bu ürünü tek bir jar dosyasını indirerek çalıştırıyorsunuz. Brorker uygulamasının çalıştığı bilgisayarın IP bilgilerini vs. güncelledikten sonra bağlantı var ise yeşil olan düğmeye basıp yeni bir tab açabilirsiniz.

Öncelikle sizin uygulamanın yayımladığı mesajları görmek için mesajın konu ismini Topic alanına yazıp abone olmalısınız. Bunu denediğinizde veriler yayımlandıkça bu uygulamadan da görebildiğinizi ve bunun hayatınızı çok kolaylaştırdığını göreceksiniz.

Varsayalım ki sizin uygulama bir mesaj bekliyor ve gelen mesaja göre bir veya birden fazla konuda veri yayımlıyor. Bu durumda ihtiyacınız olan veri yayımlamayı da bu SPY aracı ile kolayca yapıyorsunuz. Çoğu durumda simulatör vb. geliştirmeniz gerekir iken SPY gibi bir ürün ile yazılımı kolayca test edebilir ve diğer yazılımcıların geliştireceği diğer modüller ile entegre edebilirsiniz.

Ben MQTT SPY ürünün orta boy bir projenin ihtiyacını karşılayabilecek kadar yetenekli buldum. Ama gelişmesi gereken daha çok özellik var.

Güvenlik:

MQTT ile isterseniz kullanıcı adı ve şifre ile güvenlik sağlayabileceğiniz gibi SSL TLS kullanarak veri güvenliği de sağlayabilirsiniz.

--

--

Huseyin Kutluca
Yazılım Mimarileri

Highly motivated Software Architect with hands-on experience in design and development of mission critical distributed systems.