AWS’ de Canlı Veri ile Uçtan Uca ETL Veri Hattı -2 (Extract)
AWS platformunda gerçekleştirilen ETL sürecinin veri çıkarma (Extract) aşaması ayrıntılı olarak ele alınacaktır. İlk yazımızda, ETL sürecinin genel hatları ve AWS hizmetlerinin bu süreçte nasıl kullanıldığı üzerinde durulmuştur. Bu yazıda ise canlı veri gönderimi, AWS IoT, Kinesis Stream ve Kinesis Firehose kullanarak veri toplama, veri akışı sağlama ve veri yükleme konuları üzerinde durulacaktır. Bu kapsamda, bir veri üretici ile simüle edilen sensör verilerinin MQTT servisi aracılığıyla nasıl gönderileceğini ve bu verilerin AWS IoT ve Kinesis Stream kullanılarak nasıl işleneceğini adım adım açıklayacağım. Daha sonra, AWS Firehose ve Lambda hizmetlerinin entegrasyonu ile farklı sensör verilerinin uygun S3 bucket’ larına nasıl yönlendirileceğini detaylı bir şekilde ele alacağım.
Bu makalenin sonunda, AWS platformunu kullanarak canlı verileri nasıl alacağınızı ve onları bir sonraki işlem için, yani dönüştürme aşamasına hazır hale getirmeyi öğrenmiş olacaksınız.
Sentetik Canlı Veri Oluşturma
Günümüzde, özellikle endüstri ve IoT alanlarında, sensörlerden gelen verilerin toplanması ve analiz edilmesi zaman alıcı ve maliyetli olabiliyor. Bu durumda, sentetik canlı veri devreye giriyor. Sentetik veri, gerçek verilerle aynı özelliklere sahip, yapay olarak üretilen verilerdir. Bu sayede, sistemlerimizi gerçek verilerle uğraşmadan test edebilir, hataları önceden tespit edebilir ve süreçlerimizi daha verimli hale getirebiliriz.
MQTT Protokolünün Kullanımı
Veri iletimi için MQTT (Message Queuing Telemetry Transport) protokolü kullanılmaktadır. MQTT, düşük bant genişliği ve düşük enerji tüketimi ile veri iletimi yapabilen hafif bir mesajlaşma protokolüdür. Bu özellikleri sayesinde, özellikle IoT cihazları ve sensörler arasında veri iletimi için idealdir. MQTT protokolü, verilerin güvenilir bir şekilde iletilmesini sağlar ve ağ üzerinden veri aktarımını kolaylaştırır.
Sentetik canlı veri oluşturma sürecinde, iki farklı sensörü temsil eden veriler simüle edilmiştir. Bu sensörler, sıcaklık ve nem sensörleri ile ışık şiddeti ve hava kalitesi sensörleridir. Her bir sensör, belirli aralıklarla rastgele değerler üretir ve bu veriler MQTT protokolü aracılığıyla gönderilir. Bu veri simülasyonu, gerçek dünyadaki sensörlerin davranışını taklit eder ve ETL sürecinin her adımında bu verilerin nasıl işlendiğini ve yönetildiğini görmemize olanak tanır.
Örnek olarak, aşağıdaki Python kodu, sıcaklık ve nem sensörleri ile ışık şiddeti ve hava kalitesi sensörlerini verilerini temsil etmektedir.
Şekil 4' te AWS IoT MQTT Client’ ının nasıl yapılandırıldığını ve kullanıldığını inceleyeceğiz. Aşağıdaki kod parçası, bir MQTT istemcisinin AWS IoT Core ile güvenli bir bağlantı kurması ve yapılandırılması için gerekli adımları içerir.
Şekil 4' te, bir AWS IoT MQTT Client’ının yapılandırılması ve AWS IoT Core endpointine bağlanmasını göstermektedir. İlk olarak, AWSIoTMQTTClient
sınıfından bir MQTT istemcisi oluşturulur ve configureEndpoint
fonksiyonu ile AWS IoT endpointine bağlanır. Alınan client ve endpoint bilgileri AWS IoT servisinin “MQTT test client” sekmesinden alınmıştır. Ardından, configureCredentials
fonksiyonu ile güvenlik sertifikaları ve özel anahtar dosyalarının yolları belirtilir. İstemcinin otomatik yeniden bağlanma (configureAutoReconnectBackoffTime
), çevrimdışı yayın kuyruğu (configureOfflinePublishQueueing
), kuyruğun boşaltılma sıklığı (configureDrainingFrequency
), bağlantı zaman aşımı (configureConnectDisconnectTimeout
) ve MQTT için zaman aşımı (configureMQTTOperationTimeout
) fonksiyonlarıyla ayarlanır. Son olarak, connect
fonksiyonu çağrılarak istemcinin AWS IoT endpointine bağlanması sağlanır. Bu yapılandırmalar, istemcinin güvenilir ve sürekli bir bağlantı kurmasını sağlar.
Şekil 5 iki sensörden gelen verilerin sürekli olarak üretilip MQTT aracılığıyla AWS IoT Core’a gönderilmesini sağlar. Aynı zamanda, kullanıcı müdahalesi ile bağlantının güvenli bir şekilde kesilmesini yönetir.
Bu kod bloğu, sürekli veri akışı sağlayan bir döngü ve kullanıcı tarafından kesildiğinde bağlantıyı düzgün bir şekilde sonlandıran bir hata yakalama yapısı içerir. try
bloğunda, (generate_sensor_data
) fonksiyonu ile iki sensörden veri üretilir ve (myMQTTClient.publish
) fonksiyonu ile bu veriler (sensor_data_topic
) bağlantısına JSON formatında gönderilir ve konsola yazdırılır. Bu işlem her saniye tekrarlanır. (except KeyboardInterrupt
) bloğu, kullanıcı tarafından bir kesinti (Ctrl+C) gerçekleştirildiğinde döngüyü sonlandırır ve (myMQTTClient.disconnect
) fonksiyonu çağrılarak MQTT istemcisinin bağlantısı güvenli bir şekilde kesilir. Bu yapı, veri akışı sağlarken, kullanıcı müdahalesiyle bağlantının düzgün bir şekilde sonlandırılmasını sağlar.
Canlı Verilerin AWS IoT ile Sisteme Alınması
AWS IoT (Internet of Things), cihazların AWS bulutuna güvenli bir şekilde bağlanmasını ve bu cihazlardan veri toplamasını sağlayan bir platformdur. Bu hizmet, birçok cihazın veri gönderip almasına, veri işleme ve analiz yapmasına olanak tanır.
Detaylı Kurulum
Sensörlerden gelen veriler, MQTT protokolü aracılığıyla AWS IoT Core’a gönderilir.
- IAM Role Oluşturma:
- AWS Management Console’ da “IAM” (Identity and Access Management) servisine gidin.
- “Roles” sekmesine tıklayın ve “Create role” butonuna tıklayın.
- “AWS service” seçeneğini seçin ve “IoT” servisini seçin.
- İzin politikalarını ekleyin.
- Role bir isim verin ve rolü oluşturun.
2. Cihaz Kaydı:
- IoT Core dashboardunda “Manage” sekmesine gidin ve “Things” seçeneğine tıklayın.
- “Create things” butonuna tıklayın.
- “Create a single thing” seçeneğini seçin.
- Cihaz adını girin ve “Next” butonuna tıklayın. Ardından cihaz sertifikası konfigrasyon alanında “Auto-generate a new certficate” seçeneğine tıklayın.
- Yapılan işlemlerin ardından “Policies” oluşturulması gerekmektedir.
- Oluşturulan “Policies” izinlerinde tüm kaynaklardan gelen eylemlere izin vermektedir. Bu durum güvenlik zafiyeti yaratabileceğinden daha spesifik izinlerin verilmesi sistem güvenliği açısından daha faydalı olacaktır.
- Sertifikaları indirin (AmazonRootCA1, Private key, Device certificate).
3. Sertifika Yapılandırma
- Oluşturulan “Policies” sertifikaya ekleyin.
4. MQTT Test Yayını
- IoT Core dashboardunda “MQTT test client” alanında sentetik veri gönderiminin yapıldığı topic girilerek bağlantı test edilir.
Canlı Verilerin AWS IoT’ den Kinesis Stream ve Kinesis Firehose ile Alınması
AWS Kinesis Stream ve Kinesis Firehose, gerçek zamanlı veri akışlarını toplamak, işlemek ve analiz etmek için kullanılan AWS hizmetleridir.
- AWS Kinesis Stream: Bu hizmet, gerçek zamanlı veri akışlarını toplamak ve işlemek için kullanılır. Veriler, Kinesis Stream’e gönderildikten sonra bir veya daha fazla tüketici (consumer) tarafından işlenebilir. Kinesis Stream, yüksek hacimli verilerin düşük gecikme ile toplanmasını sağlar.
- AWS Kinesis Firehose: Bu hizmet, gerçek zamanlı veri akışlarını toplamak ve hedeflere (Amazon S3, Amazon Redshift, Amazon Elasticsearch Service ve Splunk) yüklemek için kullanılır. Firehose, verileri dönüştürme ve sıkıştırma gibi işlemleri gerçekleştirebilir. Ayrıca, verilerin otomatik olarak yüklenmesini sağlar.
AWS IoT’den Kinesis Stream ve Kinesis Firehose’a Veri Aktarımı
AWS IoT Core’a gelen canlı verilerin Kinesis Stream ve Kinesis Firehose ile nasıl alındığını ve işlendiğini adım adım inceleyeceğiz.
Detaylı Kurulum
Sensörlerden gelen veriler, MQTT protokolü aracılığıyla AWS IoT Core’a gönderilir ve burada işlenmek üzere Kinesis Stream ve Firehose gibi diğer AWS hizmetlerine yönlendirilir.
- Kinesis Stream Oluşturma:
- Servisler menüsünden “Kinesis” seçeneğine tıklayın.
- “Create data stream” butonuna tıklayın.
- Stream adı ve shard sayısını belirleyin (örneğin,
sensor_data
). - “Create” butonuna tıklayarak Kinesis Stream’i oluşturun.
2. AWS IoT Rol Oluşturma:
- AWS IoT Core dashboard’unda “Message routing” sekmesine gidin ve “Rules” seçeneğine tıklayın.
- “Create” butonuna tıklayın.
- Kural adı ve açıklamasını girin.
- SQL sorgusu yazın. Örneğin, tüm verileri almak için
SELECT * FROM 'sensor_data_topic'
.
- “Rule actions” alanından “Kinesis Stream” eylemini seçin. Ardından oluşturulan kinesis stream seçilmelidir. IAM rol ise yazının başında oluşturulan IAM roldür.
Oluşturulan kinesis stream rolünün ardından ilgili stream alanına gidilerek alınan veriler görüntülenir.
3. Kinesis Firehose Oluşturma:
- AWS Management Console “Kinesis” servisine gidin.
- Sol menüden “Amazon Data Firehouse” seçeneğine tıklayın.
- Firehose kaynak ve hedef bilgilerini girin (örneğin, kaynak olarak Kinesis Stream, hedef olarak Amazon S3).
- Firehouse lambda fonksiyonu ekleyerek gelen sensör verilerini anlık olarak farklı bucketlara kaydedilmesi sağlanmaktadır. “Transform and convert records” alanından “Turn on data transformation” seçilmelidir.
- AWS lambda fonksiyonu alanında “create function” seçilmeldir. Şeçilen alandan sonra “General Amazon Data Firehose Processing” seçilerek fonksiyon oluşturulmalıdır.
- AWS lambda fonksiyonu oluştururken, “Author from scratch” seçeneğini kullanın ve “Process records sent to a Kinesis stream” blueprint’ini seçin. Fonksiyona isim verin, çalışacağı programlama dilini seçin (örneğin, Python 3.10) ve işlemci mimarisi olarak genellikle x86_64 seçeneğini tercih edin. Önceden oluşturulmuş bir IAM rolünü kullanabilir veya yeni bir tane oluşturabilirsiniz.
- AWS lambda fonksiyonu oluştururken triger olarak kinesis stream verilmelidir. Lambda fonksiyonu ile firehouse üzerinden veriler oluşturulan “sensor1datav1” ve “sensor2datav2” bucketlarına veri akışı sağlanacaktır. Lambda servisi ekranından iligli kod deploy edilerek sistem çalışır hale getirilir.
- Oluşturulan lambda fonksiyonu kinesis firehouse üzerinden seçilmelidir.
Kinesis firehouse oluşturma işlemi tamamlanmıştır. Artık sentetik olarak üretilen canlı sensör verileri başarılı bir şekilde ilgili bucket üzerinden depolanıyor.
Sonuç
Bu yazıda, canlı sentetik sensör verilerinin oluşturulması, toplanması, işlenmesi ve depolanması için uçtan uca bir çözüm hayata geçirildi. AWS IoT platformu kullanılarak simüle edilen sensör verileri gerçek zamanlı olarak toplanarak Kinesis Stream aracılığıyla güvenilir bir veri akışına aktarıldı. Kinesis Firehose ile veri akışının yönetimi sağlandı ve Lambda fonksiyonu kullanılarak veriler dinamik olarak ilgili S3 bucket’larına yönlendirildi. Bu aşamada, sensör verilerinin toplanmasından depolanmasına kadar olan tüm süreç başarıyla otomatikleştirilmiş ve verilerin ETL süreçlerinden transform ve load uygulamalarında kullanılmasın için hazır hale getirilmiştir.
Bir sonraki yazımızda, AWS ETL sürecinin aşaması olan Transform (dönüştürme) aşamasını derinlemesine inceleyeceğiz. Bu aşamada, verilerimizi nasıl temizleyeceğimizi, zenginleştireceğimizi ve analiz için hazır hale getireceğimizi adım adım göreceğiz.
Bir sonraki yazıda görüşmek üzere…