AWS’ de Canlı Veri ile Uçtan Uca ETL Veri Hattı -2 (Extract)

Tarik Yilmaz
8 min readJun 20, 2024

--

AWS platformunda gerçekleştirilen ETL sürecinin veri çıkarma (Extract) aşaması ayrıntılı olarak ele alınacaktır. İlk yazımızda, ETL sürecinin genel hatları ve AWS hizmetlerinin bu süreçte nasıl kullanıldığı üzerinde durulmuştur. Bu yazıda ise canlı veri gönderimi, AWS IoT, Kinesis Stream ve Kinesis Firehose kullanarak veri toplama, veri akışı sağlama ve veri yükleme konuları üzerinde durulacaktır. Bu kapsamda, bir veri üretici ile simüle edilen sensör verilerinin MQTT servisi aracılığıyla nasıl gönderileceğini ve bu verilerin AWS IoT ve Kinesis Stream kullanılarak nasıl işleneceğini adım adım açıklayacağım. Daha sonra, AWS Firehose ve Lambda hizmetlerinin entegrasyonu ile farklı sensör verilerinin uygun S3 bucket’ larına nasıl yönlendirileceğini detaylı bir şekilde ele alacağım.

Şekil 1. ETL pipline diyagramı.

Bu makalenin sonunda, AWS platformunu kullanarak canlı verileri nasıl alacağınızı ve onları bir sonraki işlem için, yani dönüştürme aşamasına hazır hale getirmeyi öğrenmiş olacaksınız.

Sentetik Canlı Veri Oluşturma

Günümüzde, özellikle endüstri ve IoT alanlarında, sensörlerden gelen verilerin toplanması ve analiz edilmesi zaman alıcı ve maliyetli olabiliyor. Bu durumda, sentetik canlı veri devreye giriyor. Sentetik veri, gerçek verilerle aynı özelliklere sahip, yapay olarak üretilen verilerdir. Bu sayede, sistemlerimizi gerçek verilerle uğraşmadan test edebilir, hataları önceden tespit edebilir ve süreçlerimizi daha verimli hale getirebiliriz.

Şekil 2. Mqtt client örneği.

MQTT Protokolünün Kullanımı

Veri iletimi için MQTT (Message Queuing Telemetry Transport) protokolü kullanılmaktadır. MQTT, düşük bant genişliği ve düşük enerji tüketimi ile veri iletimi yapabilen hafif bir mesajlaşma protokolüdür. Bu özellikleri sayesinde, özellikle IoT cihazları ve sensörler arasında veri iletimi için idealdir. MQTT protokolü, verilerin güvenilir bir şekilde iletilmesini sağlar ve ağ üzerinden veri aktarımını kolaylaştırır.

Sentetik canlı veri oluşturma sürecinde, iki farklı sensörü temsil eden veriler simüle edilmiştir. Bu sensörler, sıcaklık ve nem sensörleri ile ışık şiddeti ve hava kalitesi sensörleridir. Her bir sensör, belirli aralıklarla rastgele değerler üretir ve bu veriler MQTT protokolü aracılığıyla gönderilir. Bu veri simülasyonu, gerçek dünyadaki sensörlerin davranışını taklit eder ve ETL sürecinin her adımında bu verilerin nasıl işlendiğini ve yönetildiğini görmemize olanak tanır.

Örnek olarak, aşağıdaki Python kodu, sıcaklık ve nem sensörleri ile ışık şiddeti ve hava kalitesi sensörlerini verilerini temsil etmektedir.

Şekil 3. Sentetik veri örneği.

Şekil 4' te AWS IoT MQTT Client’ ının nasıl yapılandırıldığını ve kullanıldığını inceleyeceğiz. Aşağıdaki kod parçası, bir MQTT istemcisinin AWS IoT Core ile güvenli bir bağlantı kurması ve yapılandırılması için gerekli adımları içerir.

Şekil 4. Mqtt bağlantı oluşturma örneği.

Şekil 4' te, bir AWS IoT MQTT Client’ının yapılandırılması ve AWS IoT Core endpointine bağlanmasını göstermektedir. İlk olarak, AWSIoTMQTTClient sınıfından bir MQTT istemcisi oluşturulur ve configureEndpoint fonksiyonu ile AWS IoT endpointine bağlanır. Alınan client ve endpoint bilgileri AWS IoT servisinin “MQTT test client” sekmesinden alınmıştır. Ardından, configureCredentials fonksiyonu ile güvenlik sertifikaları ve özel anahtar dosyalarının yolları belirtilir. İstemcinin otomatik yeniden bağlanma (configureAutoReconnectBackoffTime), çevrimdışı yayın kuyruğu (configureOfflinePublishQueueing), kuyruğun boşaltılma sıklığı (configureDrainingFrequency), bağlantı zaman aşımı (configureConnectDisconnectTimeout) ve MQTT için zaman aşımı (configureMQTTOperationTimeout) fonksiyonlarıyla ayarlanır. Son olarak, connect fonksiyonu çağrılarak istemcinin AWS IoT endpointine bağlanması sağlanır. Bu yapılandırmalar, istemcinin güvenilir ve sürekli bir bağlantı kurmasını sağlar.

Şekil 5 iki sensörden gelen verilerin sürekli olarak üretilip MQTT aracılığıyla AWS IoT Core’a gönderilmesini sağlar. Aynı zamanda, kullanıcı müdahalesi ile bağlantının güvenli bir şekilde kesilmesini yönetir.

Şekil 5. Sensör verilerinin gönderimi örneği.

Bu kod bloğu, sürekli veri akışı sağlayan bir döngü ve kullanıcı tarafından kesildiğinde bağlantıyı düzgün bir şekilde sonlandıran bir hata yakalama yapısı içerir. try bloğunda, (generate_sensor_data) fonksiyonu ile iki sensörden veri üretilir ve (myMQTTClient.publish) fonksiyonu ile bu veriler (sensor_data_topic) bağlantısına JSON formatında gönderilir ve konsola yazdırılır. Bu işlem her saniye tekrarlanır. (except KeyboardInterrupt) bloğu, kullanıcı tarafından bir kesinti (Ctrl+C) gerçekleştirildiğinde döngüyü sonlandırır ve (myMQTTClient.disconnect) fonksiyonu çağrılarak MQTT istemcisinin bağlantısı güvenli bir şekilde kesilir. Bu yapı, veri akışı sağlarken, kullanıcı müdahalesiyle bağlantının düzgün bir şekilde sonlandırılmasını sağlar.

Şekil 6. MQTT ile gönderilen Sensör verileri örneği.

Canlı Verilerin AWS IoT ile Sisteme Alınması

AWS IoT (Internet of Things), cihazların AWS bulutuna güvenli bir şekilde bağlanmasını ve bu cihazlardan veri toplamasını sağlayan bir platformdur. Bu hizmet, birçok cihazın veri gönderip almasına, veri işleme ve analiz yapmasına olanak tanır.

Detaylı Kurulum

Sensörlerden gelen veriler, MQTT protokolü aracılığıyla AWS IoT Core’a gönderilir.

  1. IAM Role Oluşturma:
  • AWS Management Console’ da “IAM” (Identity and Access Management) servisine gidin.
  • Roles” sekmesine tıklayın ve “Create role” butonuna tıklayın.
  • AWS service” seçeneğini seçin ve “IoT” servisini seçin.
  • İzin politikalarını ekleyin.
  • Role bir isim verin ve rolü oluşturun.
Şekil 7. IAM rol oluşturma örneği.

2. Cihaz Kaydı:

  • IoT Core dashboardunda “Manage” sekmesine gidin ve “Things” seçeneğine tıklayın.
Şekil 8. Cihaz oluşturma örneği -1.
  • Create things” butonuna tıklayın.
  • Create a single thing” seçeneğini seçin.
Şekil 9. Cihaz oluşturma örneği -2.
  • Cihaz adını girin ve “Next” butonuna tıklayın. Ardından cihaz sertifikası konfigrasyon alanında “Auto-generate a new certficate” seçeneğine tıklayın.
Şekil 10. Cihaz oluşturma örneği -3.
  • Yapılan işlemlerin ardından “Policies” oluşturulması gerekmektedir.
Şekil 11. Policies oluşturma örneği.
  • Oluşturulan “Policies” izinlerinde tüm kaynaklardan gelen eylemlere izin vermektedir. Bu durum güvenlik zafiyeti yaratabileceğinden daha spesifik izinlerin verilmesi sistem güvenliği açısından daha faydalı olacaktır.
Şekil 12. Oluşturulan sertifika örneği.
  • Sertifikaları indirin (AmazonRootCA1, Private key, Device certificate).

3. Sertifika Yapılandırma

  • Oluşturulan “Policies” sertifikaya ekleyin.
Şekil 13. Policies sertifikaya ekleme örneği.

4. MQTT Test Yayını

  • IoT Core dashboardunda “MQTT test client” alanında sentetik veri gönderiminin yapıldığı topic girilerek bağlantı test edilir.
Şekil 14. MQTT test yayını örneği.

Canlı Verilerin AWS IoT’ den Kinesis Stream ve Kinesis Firehose ile Alınması

AWS Kinesis Stream ve Kinesis Firehose, gerçek zamanlı veri akışlarını toplamak, işlemek ve analiz etmek için kullanılan AWS hizmetleridir.

  • AWS Kinesis Stream: Bu hizmet, gerçek zamanlı veri akışlarını toplamak ve işlemek için kullanılır. Veriler, Kinesis Stream’e gönderildikten sonra bir veya daha fazla tüketici (consumer) tarafından işlenebilir. Kinesis Stream, yüksek hacimli verilerin düşük gecikme ile toplanmasını sağlar.
  • AWS Kinesis Firehose: Bu hizmet, gerçek zamanlı veri akışlarını toplamak ve hedeflere (Amazon S3, Amazon Redshift, Amazon Elasticsearch Service ve Splunk) yüklemek için kullanılır. Firehose, verileri dönüştürme ve sıkıştırma gibi işlemleri gerçekleştirebilir. Ayrıca, verilerin otomatik olarak yüklenmesini sağlar.

AWS IoT’den Kinesis Stream ve Kinesis Firehose’a Veri Aktarımı

AWS IoT Core’a gelen canlı verilerin Kinesis Stream ve Kinesis Firehose ile nasıl alındığını ve işlendiğini adım adım inceleyeceğiz.

Detaylı Kurulum

Sensörlerden gelen veriler, MQTT protokolü aracılığıyla AWS IoT Core’a gönderilir ve burada işlenmek üzere Kinesis Stream ve Firehose gibi diğer AWS hizmetlerine yönlendirilir.

  1. Kinesis Stream Oluşturma:
  • Servisler menüsünden “Kinesis” seçeneğine tıklayın.
  • Create data stream” butonuna tıklayın.
  • Stream adı ve shard sayısını belirleyin (örneğin, sensor_data).
  • Create” butonuna tıklayarak Kinesis Stream’i oluşturun.
Şekil 15. Kinesis stream oluşturma örneği.

2. AWS IoT Rol Oluşturma:

  • AWS IoT Core dashboard’unda “Message routing” sekmesine gidin ve “Rules” seçeneğine tıklayın.
  • Create” butonuna tıklayın.
  • Kural adı ve açıklamasını girin.
Şekil 16. Iot rol oluşturma örneği -1.
  • SQL sorgusu yazın. Örneğin, tüm verileri almak için SELECT * FROM 'sensor_data_topic'.
Şekil 17. Iot rol oluşturma örneği -2.
  • Rule actions” alanından “Kinesis Stream” eylemini seçin. Ardından oluşturulan kinesis stream seçilmelidir. IAM rol ise yazının başında oluşturulan IAM roldür.
Şekil 18. Iot rol oluşturma örneği -3.

Oluşturulan kinesis stream rolünün ardından ilgili stream alanına gidilerek alınan veriler görüntülenir.

Şekil 19. Kinesis stream alınan veriler örneği.

3. Kinesis Firehose Oluşturma:

  • AWS Management Console “Kinesis” servisine gidin.
  • Sol menüden “Amazon Data Firehouse” seçeneğine tıklayın.
  • Firehose kaynak ve hedef bilgilerini girin (örneğin, kaynak olarak Kinesis Stream, hedef olarak Amazon S3).
Şekil 20. Kinesis firehouse oluşturma örneği -1.
  • Firehouse lambda fonksiyonu ekleyerek gelen sensör verilerini anlık olarak farklı bucketlara kaydedilmesi sağlanmaktadır. “Transform and convert records” alanından “Turn on data transformation” seçilmelidir.
Şekil 21. Kinesis firehouse oluşturma örneği -2.
  • AWS lambda fonksiyonu alanında “create function” seçilmeldir. Şeçilen alandan sonra “General Amazon Data Firehose Processing” seçilerek fonksiyon oluşturulmalıdır.
Şekil 22. Kinesis firehouse oluşturma örneği -3.
  • AWS lambda fonksiyonu oluştururken, “Author from scratch” seçeneğini kullanın ve “Process records sent to a Kinesis stream” blueprint’ini seçin. Fonksiyona isim verin, çalışacağı programlama dilini seçin (örneğin, Python 3.10) ve işlemci mimarisi olarak genellikle x86_64 seçeneğini tercih edin. Önceden oluşturulmuş bir IAM rolünü kullanabilir veya yeni bir tane oluşturabilirsiniz.
Şekil 23. Lambda fonksiyonu oluşturma örneği.
  • AWS lambda fonksiyonu oluştururken triger olarak kinesis stream verilmelidir. Lambda fonksiyonu ile firehouse üzerinden veriler oluşturulan “sensor1datav1” ve “sensor2datav2” bucketlarına veri akışı sağlanacaktır. Lambda servisi ekranından iligli kod deploy edilerek sistem çalışır hale getirilir.
Şekil 24. Lambda fonksiyonu örneği.
  • Oluşturulan lambda fonksiyonu kinesis firehouse üzerinden seçilmelidir.
Şekil 25. Kinesis firehouse oluşturma örneği -4.

Kinesis firehouse oluşturma işlemi tamamlanmıştır. Artık sentetik olarak üretilen canlı sensör verileri başarılı bir şekilde ilgili bucket üzerinden depolanıyor.

Şekil 26. Gerçek zamanlı kaydedilen sensör 1 verileri örneği .
Şekil 27. Gerçek zamanlı kaydedilen sensör 2 verileri örneği .

Sonuç

Bu yazıda, canlı sentetik sensör verilerinin oluşturulması, toplanması, işlenmesi ve depolanması için uçtan uca bir çözüm hayata geçirildi. AWS IoT platformu kullanılarak simüle edilen sensör verileri gerçek zamanlı olarak toplanarak Kinesis Stream aracılığıyla güvenilir bir veri akışına aktarıldı. Kinesis Firehose ile veri akışının yönetimi sağlandı ve Lambda fonksiyonu kullanılarak veriler dinamik olarak ilgili S3 bucket’larına yönlendirildi. Bu aşamada, sensör verilerinin toplanmasından depolanmasına kadar olan tüm süreç başarıyla otomatikleştirilmiş ve verilerin ETL süreçlerinden transform ve load uygulamalarında kullanılmasın için hazır hale getirilmiştir.

Bir sonraki yazımızda, AWS ETL sürecinin aşaması olan Transform (dönüştürme) aşamasını derinlemesine inceleyeceğiz. Bu aşamada, verilerimizi nasıl temizleyeceğimizi, zenginleştireceğimizi ve analiz için hazır hale getireceğimizi adım adım göreceğiz.

Bir sonraki yazıda görüşmek üzere…

--

--