Spark ile Veri Transferi: AWS S3 Bucket, MinIO, PostgreSQL ve PySpark İle Depolama Çözümleri ve Büyük Veri Analizi

Enes Öztürk
Miuul AWS Community
13 min readJan 24, 2024

--

Veri bilimi ve büyük veri analitiği uygulamalarında, verileri işlemek ve depolamak için güçlü araçlara duyulan ihtiyaç giderek artmaktadır. Bu yazıda, Apache Spark’ın güçlü büyük veri işleme yeteneklerini kullanarak, bir Docker Compose projesindeki PySpark tabanlı bir Spark servisinden, PostgreSQL veritabanına veri transferi sürecini keşfedeceğiz. Ayrıca, MinIO servisi üzerinden Amazon S3 uyumlu depolama ve Kafka mesajlaşma sistemi ile entegrasyonu göreceğiz.

Oluşturulan Docker Compose dosyasında, üç ana servis bulunmaktadır: PySpark tabanlı bir Spark servisi, Amazon S3 Bucket uyumlu depolama için MinIO servisi, ve Kafka mesajlaşma sistemini sağlayan bir Kafka servisi. Ayrıca, PostgreSQL veritabanı servisi de bulunmaktadır.

Bu terminal çıktısı, belirli bir dizin içinde bulunan docker-compose.yaml adlı Docker Compose dosyasını kullanarak bir Docker projesini başlattığınızı göstermektedir.

docker-compose up -d komutu, Docker Compose'u kullanarak proje servislerini başlatır. Docker Compose, docker-compose.yaml dosyasını okur ve projedeki servisleri ayağa kaldırmak için gerekli konteynerleri oluşturur.

  • sprk_ext_vbo adlı bir Docker ağı oluşturur.
  • PostgreSQL, Kafka, MinIO ve Spark servislerini ayrı konteynerlerde başlatır.

İşlem başarıyla tamamlandı ve her servis belirtilen Docker içinde başlatıldı. Her servis, aynı Docker ağında birbirleriyle iletişim kurabilir ve belirtilen portlardan dış dünyaya açılmış durumdadır.

Docker konteynerine bağlanarak JupyterLab yükleyip, çalıştırılır.

docker exec -it spark bash: Bu komut, spark adlı Docker konteynerine interaktif bir terminal bağlantısı yapar. -it bayrakları, interaktif bir terminal bağlantısı sağlar ve bash komutu, konteyner içinde Bash kabuğunu başlatır. Böylece, konteyner içinde komutlar çalıştırabilirsiniz.

pip list | grep jupyter: Bu komut, Python paketlerini listeler ve bu listeyi grep komutuyla filtreler. grep jupyter, listede "jupyter" kelimesini içeren paketleri bulur. Bu, konteyner içinde yüklü olan Jupyter ile ilgili paketleri görmek için kullanılır.

pip install jupyterlab: Bu komut, konteyner içindeki Python ortamına JupyterLab adlı Python paketini yükler.

jupyter lab --ip 0.0.0.0 --port 8888 --allow-root: Bu komut, JupyterLab'ı başlatır.

  • --ip 0.0.0.0: JupyterLab'ın tüm ağ arayüzlerinden erişilebilir olmasını sağlar.
  • --port 8888: JupyterLab'ı 8888 portunda başlatır.
  • --allow-root: JupyterLab'ın root kullanıcı olarak çalışmasına izin verir.

PostgreSQL

Spark’ı kullanmak için gerekli ayarlamalar yapılır ve PostgreSQL JDBC sürücüsünü Spark’a eklenir.

!pip install findspark: Bu komut, findspark adlı Python paketini yükler. findspark, Spark'ın bulunduğu dizini otomatik olarak bulmaya yardımcı olur ve PySpark'ı Python ortamına bağlamak için kullanılır.

import configparser: Bu komut, Python'ın configparser modülünü içe aktarır. Bu modül, yapılandırma dosyalarını okumak ve yazmak için kullanılır.

import findspark: Bu komut, findspark modülünü içe aktarır. Bu modül, SPARK_HOME ortam değişkenini otomatik olarak bulmaya yardımcı olur.

from pyspark.sql import SparkSession, functions as F: Bu komutlar, PySpark'ın SparkSession sınıfını ve functions modülünden F takma adını içe aktarır. SparkSession, Spark işlemlerini başlatmak için kullanılır ve F takma adı, PySpark'ta bulunan yaygın fonksiyonlara kısa bir erişim sağlar.

findspark.init("/opt/spark"): Bu komut, findspark modülünü kullanarak SPARK_HOME ortam değişkenini belirtir ve Spark'ı Python ortamına bağlar.

!ls -l $SPARK_HOME/jars | grep postgresql: Bu komut, Spark'ın jars dizinindeki dosyaları listeler ve bunlar arasında "postgresql" kelimesini içeren dosyayı bulur. Bu durumda, postgresql-42.3.5.jar isimli dosyanın varlığı kontrol edilir. Bu dosya, PostgreSQL veritabanına bağlantı sağlamak için gerekli olan JDBC sürücü dosyasıdır.

Yapılandırmalar yapılarak bir Spark oturumu oluşturur.

SparkSession.builder: Bu ifade, bir SparkSession örneği oluşturmak için kullanılan SparkSession.Builder nesnesini başlatır. SparkSession, Spark işlemlerini başlatmak ve yapılandırmak için kullanılır.

.appName("JDBC and SQL"): Bu metot, Spark uygulamasının adını belirler. Bu ad, Spark UI ve günlük dosyalarında görünecek olan uygulama adıdır.

.master("local[2]"): Bu metot, Spark uygulamasının çalışacağı master düğümünü belirler. Bu örnekte, local[2] kullanılmıştır, bu da yerel modda çalışan iki worker düğümü anlamına gelir.

.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,org.postgresql:postgresql:42.7.1"): Bu metot, Spark uygulamasının bağımlılıklarını ayarlar. Bu örnekte, Delta Lake ve PostgreSQL JDBC sürücüsü bağımlılıkları eklenmiştir. Bu, Delta Lake işlemleri için io.delta:delta-core_2.12:2.4.0 ve PostgreSQL veritabanı bağlantısı için org.postgresql:postgresql:42.7.1 bağımlılıklarını içerir.

.getOrCreate(): Bu metot, mevcut bir SparkSession örneğini alır veya varsa yeni bir örnek oluşturur. Böylece, bir Spark oturumu oluşturulur veya mevcut bir oturuma erişim sağlanır.

Spark’ı yapılandırarak ve gerekli bağımlılıkları ekleyerek bir Spark oturumu oluşturur. Oluşturulan bu oturum, Spark üzerinde SQL sorguları yapmak, veritabanlarına bağlanmak veya Delta Lake işlemleri gerçekleştirmek gibi çeşitli görevler için kullanılabilir.

SparkSession konfigürasyonunda belirtilen Delta Lake ve PostgreSQL bağımlılıklarını başarıyla indirdiğini gösterir. Maven, indirilen her bağımlılığın durumunu bildirir. Buradan 5 tane paketi başarılı şekilde indirdiğini görebiliriz.

Tekrar terminale gelerek, Docker PostgreSQL konteynerine bağlanarak PostgreSQL veritabanı yönetim aracını (psql) kullanarak bazı işlemleri gerçekleştirir.

docker exec -it postgresql bash: Docker'da çalışan postgresql adlı konteynere interaktif bir terminal bağlantısı sağlar.

psql -U postgres: PostgreSQL veritabanı yönetim aracına (psql) bağlanır. -U postgres parametresi, "postgres" adlı kullanıcıyla giriş yapmayı ifade eder. Bu, genellikle PostgreSQL'in varsayılan yönetici kullanıcısıdır.

postgres=# \l: PostgreSQL komut satırı aracılığıyla mevcut veritabanlarını listeler.

postgres=# create database traindb;: Yeni bir PostgreSQL veritabanı oluşturur. traindb adında bir veritabanı oluşturulmuştur.

PostgreSQL konteynerine bağlanarak PostgreSQL veritabanı oluşturulması işlemlerini gerçekleştirdi.

PostgreSQL veritabanı içinde bir kullanıcı oluşturulması, bu kullanıcıya gerekli yetkilerin verilmesi ve ardından bu kullanıcı ile oluşturulan veritabanına bağlanılması işlemlerini gerçekleştirir.

psql -U train -d traindbdemo: train kullanıcısı ile traindbdemo veritabanına bağlanır.

create table test(id int, name varchar(50));: train kullanıcısı, traindbdemo veritabanında test adında bir tablo oluşturmayı denedi ancak şemaya erişim izni olmadığı için hata aldı.

psql -U postgres: PostgreSQL yönetici kullanıcısı ile bağlantı kurar.

\c traindbdemo: PostgreSQL yönetici kullanıcısı, traindbdemo veritabanına bağlanır.

grant all privileges on schema public to train;: PostgreSQL yönetici kullanıcısı, train kullanıcısına public şemasındaki tüm yetkileri verir.

psql -U train -d traindbdemo: train kullanıcısı ile tekrar traindbdemo veritabanına bağlanır.

create table testdemo(id int, name varchar(50));: train kullanıcısı, traindbdemo veritabanında testdemo adında bir tablo oluşturur.

select * from testdemo;: train kullanıcısı, traindbdemo veritabanındaki testdemo tablosunu sorgular.

Böylelikle yeni oluşturduğumuz kullanıcının “permission denied” sorununu çözülmüş oldu.

1. Write to Postgresql:

Konfigürasyon dosyasından veritabanı bağlantı bilgilerini okuma işlemini gerçekleştirir.

config = configparser.RawConfigParser(): Python'ın configparser modülünden RawConfigParser sınıfından bir yapılandırma nesnesi oluşturur. Bu nesne, konfigürasyon dosyasındaki verileri okumak için kullanılacaktır.

config.read('./db_conn'): db_conn adlı konfigürasyon dosyasını okur ve bu dosyadaki veritabanı bağlantı bilgilerini alır.

user_name = config.get('DB', 'user_name'), password = config.get('DB', 'password'), db_ip = config.get('DB', 'db_ip'): Konfigürasyon dosyasından sırasıyla kullanıcı adı, şifre ve veritabanı IP adresi gibi veritabanı bağlantı bilgilerini alır.

Genellikle veritabanı bağlantısı için kullanılan kullanıcı adı, şifre ve IP adresi gibi bilgilerin bulunduğu bir konfigürasyon dosyasından bu bilgileri okuma amacını taşır. Bu sayede bu bilgiler, kod içinde doğrudan belirtilmek yerine dış bir yapılandırma dosyasından alınabilir, böylece güvenlik ve esneklik sağlanabilir.

from pyspark import SparkFiles: PySpark'ın SparkFiles modülünü içe aktarır. SparkFiles, Apache Spark uygulamasında dosyaların paylaşılmasını ve erişilmesini sağlayan bir yardımcı modüldür.

sc = spark.sparkContext: SparkContext'i alır. sc, Spark uygulamasının ana bağlam nesnesidir ve çeşitli işlemleri başlatmak ve Spark kaynaklarına erişmek için kullanılır.

github_url = "https://raw.githubusercontent.com/erkansirin78/datasets/master/Churn_Modelling.csv": Okunacak CSV dosyasının URL'sini tanımlar. Bu örnekte, bir GitHub deposundan bir CSV dosyası kullanılmaktadır.

sc.addFile(github_url): Belirtilen URL'den dosyayı Spark uygulamasına ekler. Bu işlem, dosyanın Spark işçi düğümlerine dağıtılmasını ve uygulama tarafından erişilmesini sağlar.

df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"), header=True, inferSchema=True): PySpark DataFrame'ini oluşturur ve CSV dosyasını bu DataFrame'e yükler. SparkFiles.get("Churn_Modelling.csv"), eklenen dosyaya erişmek için SparkFiles modülünü kullanır. header=True parametresi, CSV dosyasının ilk satırının başlık olduğunu belirtir ve inferSchema=True parametresi, Spark'ın sütun veri tiplerini otomatik olarak çıkarmasını sağlar.

Uzaktan bir CSV dosyasını Apache Spark uygulamasına ekleyerek ve ardından bu dosyayı bir PySpark DataFrame’e yükleyerek veri analizi ve işleme işlemleri yapmaya hazır hale getirir.

1. Method:

PySpark DataFrame’inin içeriğini PostgreSQL veritabanına yazma işlemini gerçekleştirir.

jdbcUrl = f"jdbc:postgresql://{db_ip}:5432/traindbdemo?user={user_name}&password={password}": PostgreSQL veritabanına bağlantı için kullanılacak JDBC URL'sini oluşturur. Bu URL, veritabanı IP adresini (db_ip), port numarasını (varsayılan olarak 5432), veritabanı adını (traindbdemo), kullanıcı adını (user_name) ve şifreyi (password) içerir.

df.write.jdbc(url=jdbcUrl, table="churn_spark", mode="overwrite", properties={"driver": 'org.postgresql.Driver'}): PySpark DataFrame'ini PostgreSQL veritabanına yazma işlemini gerçekleştirir. İlgili parametreler şunlardır:

  • url: Bağlantı URL'si.
  • table: Hedef PostgreSQL tablosu adı (churn_spark).
  • mode: Yazma modu, bu örnekte "overwrite" olarak belirlenmiştir. Bu, var olan tabloyu ezer ve yeni verilerle günceller.
  • properties: Veritabanı bağlantısı için ek özellikler. Bu örnekte sadece kullanılan JDBC sürücüsü belirtilmiştir (org.postgresql.Driver).

PySpark DataFrame’ini PostgreSQL veritabanına yazma işlemini gerçekleştirmek için kullanılır. Büyük veri kümelerini veritabanına aktarmak veya güncellemek için yaygın olarak kullanılan bir yöntemdir.

PostgreSQL veritabanı içindeki traindbdemo veritabanında bulunan ilişkisel tabloları ve bu tabloların bir örnek içeriğini gösterir.

\dt: Veritabanındaki tüm tabloların listesini görüntüler. Bu komut, public şemasındaki tüm tabloları listeler.

2. Method:

PostgreSQL veritabanına yönelik bir JDBC bağlantısı kullanarak gerçekleştirilmektedir.

format("jdbc"): Yazma işlemi için JDBC formatını belirtir. Bu, PySpark'ın JDBC bağlantısını kullanacağını gösterir.

mode("overwrite"): Yazma modunu belirtir. Bu durumda, var olan tabloyu silip üzerine yazar. Diğer seçenekler arasında "append" (ekleme) ve "ignore" (yoksayma) bulunabilir.

option("driver", "org.postgresql.Driver"): PostgreSQL veritabanına bağlantı için kullanılacak JDBC sürücüsünü belirtir.

option("url", f"jdbc:postgresql://{db_ip}:5432/traindbdemo"): PostgreSQL veritabanına bağlanmak için kullanılacak JDBC URL'sini belirtir.

option("dbtable", "churn_spark2"): Hedef PostgreSQL veritabanı tablosunun adını belirtir.

option("user", user_name): PostgreSQL veritabanına bağlanmak için kullanıcı adını belirtir.

option("password", password): PostgreSQL veritabanına bağlanmak için şifreyi belirtir.

save(): DataFrame'i belirtilen PostgreSQL tablosuna yazma işlemini başlatır.

\dt: Veritabanındaki tüm tabloların listesini görüntüler. Bu komut, public şemasındaki tüm tabloları listeler.

2. Read From Postgresql:

1. Method:

spark.read.format("jdbc"): PySpark DataFrame'i oluşturmak için JDBC formatını belirtir. Bu, PySpark'ın JDBC bağlantısını kullanacağını gösterir.

.option("url", jdbcUrl): PostgreSQL veritabanına bağlanmak için kullanılacak JDBC URL'sini belirtir.

.option("driver", 'org.postgresql.Driver'): PostgreSQL JDBC sürücüsünü belirtir.

.option("query", "select * from churn_spark"): PostgreSQL veritabanından okunacak veriyi seçmek için bir SQL sorgusu belirtir.

.load(): PostgreSQL veritabanından veriyi yükler ve bu veriyi PySpark DataFrame'ine dönüştürür.

2. Method:

MinIO

MinIO, açık kaynaklı bir nesne depolama sistemidir. Bu yazılım, büyük ölçüde dağıtılmış bulut altyapıları ve büyük veri uygulamaları için optimize edilmiştir. MinIO’nun temel amacı, yüksek performanslı ve ölçeklenebilir nesne depolama çözümleri sunmaktır.

MinIO, S3 uyumlu bir API (Amazon Simple Storage Service) ile çalışır, bu da mevcut S3 tabanlı uygulamaların MinIO’yu kolayca entegre etmelerini sağlar. S3 uyumluluğu sayesinde MinIO, geniş bir ekosistemde kullanılabilir ve özellikle bulut tabanlı depolama çözümlerinde tercih edilebilir.

Kullanıcılara yüksek performanslı, ölçeklenebilir ve dayanıklı bir nesne depolama çözümü sağlamak amacıyla tasarlanmıştır. MinIO’nun özellikleri arasında veri paralelliği, otomatik ölçekleme, erişim kontrolü, veri şifreleme ve yedekleme gibi özellikler bulunmaktadır. Ayrıca, MinIO’nun dağıtılmış mimarisi, kullanıcılara depolama altyapılarını hızlı bir şekilde genişletme ve ölçekleme yeteneği sunar.

Docker Compose içinde tanımlanan kullanıcı adı (user) ve şifre (password) bilgileri sayesinde, MinIO arayüzüne güvenli bir şekilde localhost üzerinden erişebilirsiniz. Arayüz, tarayıcınızı localhost:9001 adresine yönlendirerek, belirtilen port üzerinden MinIO'nun kullanıcı dostu web tabanlı kontrol paneline giriş yapabilirsiniz. Bu bilgilerle MinIO'ya güvenli bir bağlantı kurarak obje depolama, yönetme ve analiz işlemlerinizi gerçekleştirebilirsiniz.

PySpark kullanarak MinIO ile bir Apache Spark uygulaması başlatma işlemlerini gerçekleştirir.

findspark.init(“/opt/spark”): findspark, Spark’ın bulunduğu dizini otomatik olarak bulmaya yardımcı olur ve PySpark’ı Python ortamına bağlamak için kullanılır. Bu komut, Spark’ın bulunduğu dizini belirler.

SparkSession Oluşturma:

  • SparkSession.builder: SparkSession oluşturmak için kullanılan bir yapıyı başlatır.
  • appName(“Spark Example MinIO”): Oluşturulan Spark uygulamasına bir ad verir.
  • master(“local[2]”): Spark uygulamasının yerel modda çalışacağını ve iki işçi düğümü kullanacağını belirtir.
  • config(“spark.jars.packages”, “…”): Kullanılan bağımlılıkları ayarlar. Bu örnekte Hadoop AWS ve Delta Lake bağımlılıkları eklenmiştir.
  • config(“spark.hadoop.fs.s3a.access.key”, accessKeyId): MinIO’ya erişim için kullanılacak AWS S3 uyumlu erişim anahtarını belirtir.
  • config(“spark.hadoop.fs.s3a.secret.key”, secretAccessKey): MinIO’ya erişim için kullanılacak AWS S3 uyumlu gizli anahtarını belirtir.
  • config(“spark.hadoop.fs.s3a.path.style.access”, True): MinIO ile etkileşimde bulunurken yol tarzı erişiminin kullanılacağını belirtir.
  • config(“spark.hadoop.fs.s3a.impl”, “org.apache.hadoop.fs.s3a.S3AFileSystem”): S3A dosya sistemini kullanmayı belirtir.
  • config(“spark.hadoop.fs.s3a.endpoint”, “http://minio:9000"): MinIO’nun endpoint URL’sini belirtir.
  • getOrCreate(): Eğer mevcut bir SparkSession varsa onu alır, yoksa yeni bir tane oluşturur.

MinIO ile etkileşimde bulunacak bir Apache Spark uygulamasını yapılandırmak ve başlatmak için gerekli temel adımları içerir.

Bucket oluşturma işlemi, MinIO’nun depolama alanını organize etmek ve verileri gruplandırmak için kullanılan bir adımdır. “dataset” adındaki bu depo, içerisine ilerleyen süreçte yüklenecek verileri saklamak için oluşturulmuştur.

Daha sonra, “Object Browser” sekmesine geçtik. Bu sekme, oluşturduğumuz bucket ve içerisindeki verilere erişim sağlamak için kullanılır. Bu sayede, yüklenen verilere göz atabilir, düzenleyebilir ve silebiliriz.

Yüklediğimiz deneme amaçlı örnek veri, MinIO’nun sunduğu kullanıcı dostu arayüz üzerinden kolayca gerçekleştirilen bu işlemlerle dataset adlı bucket içerisine eklenmiştir.

df = spark.read.format('csv'): Bu satır, bir PySpark DataFrame'i oluşturmak için kullanılır. format('csv') ifadesi, DataFrame'in CSV formatında veri okuyacağını belirtir. Bu şekilde, CSV dosyalarından veri çekmek için gerekli olan ayarlamalar yapılır.

.option('header','true'): Bu ifade, CSV dosyasının ilk satırının başlık (header) bilgilerini içerdiğini belirtir. Yani, okunan veri setinde sütun isimleri ilk satırda bulunacaktır. 'header' seçeneği true olarak belirlendiği için bu bilgi kullanılır.

.load('s3a://datasets/iris.csv'): Bu kısım, okuma işleminin gerçekleştirilecek olan CSV dosyasının konumunu belirtir. 's3a://datasets/iris.csv' ifadesi, verinin Amazon S3 (Simple Storage Service) üzerindeki bir konumdan çekileceğini gösterir. Bu adres, s3a protokolü ile belirtilmiş bir URI'dir.

Sonuç olarak, verilen kod parçası, MinIO üzerindeki ‘datasets’ adlı bir klasör içinde bulunan ‘iris.csv’ adlı CSV dosyasını okuyarak bir PySpark DataFrame’i oluşturur.

DataFrame’i Parquet formatında MinIO üzerine yazma ve ardından bu Parquet dosyasını okuma işlemini gerçekleştirir.

df.write.save('s3a://datasets/iris_parquet'): Bu satır, DataFrame'i Parquet formatında kaydetmek için kullanılır. 's3a://datasets/iris_parquet' ifadesi, Parquet dosyasının yazılacağı konumunu belirtir. Parquet formatı, sıkıştırma özellikleri ve sütun tabanlı depolama nedeniyle büyük veri setleri için etkili bir tercihtir.

spark.read.load('s3a://datasets/iris_parquet').show(3): Bu kısım, önceki adımda Parquet formatında kaydedilen dosyayı Amazon S3 üzerinden okuma ve ilk üç satırını görüntüleme işlemini gerçekleştirir. load fonksiyonu, belirtilen konumdan DataFrame'i yükler, ardından show(3) ile ilk üç satırını görüntüler.

Veri setini Parquet formatında MinIO üzerine kaydedip daha sonra bu kaydedilen Parquet dosyasını tekrar okuyarak içeriğini görüntüler. Parquet, sıkıştırma ve sütun tabanlı depolama avantajlarıyla büyük veri setlerinin etkili bir şekilde işlenmesine olanak tanır.

AWS S3 Bucket

Amazon Simple Storage Service (Amazon S3), Amazon Web Services (AWS) tarafından sunulan bir bulut depolama hizmetidir. S3, kullanıcılara büyük miktarlarda veriyi depolamak ve bu veriye internet üzerinden güvenli bir şekilde erişim sağlamak için kullanılır. S3, ölçeklenebilir, dayanıklı ve yüksek performanslı bir depolama çözümüdür.

AWS S3'de veri depolamak için kullanılan temel öğe “bucket” olarak adlandırılır. Bir S3 bucket, bir depolama alanını temsil eder ve bu alanda veri depolanabilir. Bucket’lar, AWS hesabınızdaki benzersiz bir adla tanımlanır ve bu adın global olarak benzersiz olması gerekir. Her AWS hesabı, birçok bucket oluşturabilir ve bu bucket’lar arasında veri bölünebilir.

İlk olarak AWS Management Console’u açtıktan sonra, bulutta veri depolama ve yönetim hizmetleri sunan Amazon S3 servisine giriş yaparız.

Burada “create bucket” komutuyla yeni bir depolama alanı oluşturabilir ve bu alana istediğimiz adı verebiliriz.

Oluşturulan bu bucket içine örnek veri eklemek de oldukça kolaydır. “Add Object” veya “Upload” seçeneklerini kullanarak, belirli dosyaları veya klasörleri seçip bu bucket’a yükleyebiliriz.

Read AWS S3 Credentials:

Amazon S3 servisine bağlanabilmek için ilk adım olarak IAM (Identity and Access Management) servisi üzerinden güvenlik kimlik bilgileri oluşturuyoruz. Bu kimlik bilgileri, S3 servisine erişim yetkilerini kontrol etmemizi ve güvenli bir şekilde veri depolamamızı sağlar.

IAM üzerinde yeni bir kullanıcı oluşturarak, bu kullanıcıya Amazon S3 erişim izinlerini tanımlarız. Ardından, kullanıcıya özel bir erişim anahtarı ve güvenlik kimliği (access key ve secret key) sağlanır. Bu anahtarlar, S3 servisine programatik olarak erişim sağlamak için kullanılır.

Amazon S3 servisine güvenli ve yetkilendirilmiş bir şekilde erişim sağlamak için IAM servisini kullanmamıza olanak tanır. Bu güvenlik önlemleri, AWS hizmetlerini kullanırken veri güvenliğini ve yetkilendirme kontrolünü sağlamak için önemlidir.

Read Data From Local:

Amazon S3 depolama alanındaki bir CSV dosyasından veriyi okur.

df = spark.read: Bu satırda, SparkSession nesnesi olan spark üzerinden bir DataFrame oluşturulur. SparkSession, Spark uygulamalarının başlatılmasını sağlar.

.option("inferSchema", True): Bu seçenek, Spark'ın veri tipini otomatik olarak çıkarmasını sağlar. Yani, her sütunun veri tipi Spark tarafından çıkarılır.

.option("header", True): Bu seçenek, CSV dosyasının ilk satırının sütun başlıklarını içerdiğini belirtir. Bu şekilde, DataFrame'in sütun adları doğru bir şekilde atanır.

.csv('s3a://sparkdemode/iris.csv'): Bu kısım, S3 depolama alanındaki bir CSV dosyasının konumunu belirtir ve bu dosyadan veriyi okur. s3a protokolü, Apache Hadoop tarafından Amazon S3 ile etkileşimde bulunmak için kullanılır.

Sonuç olarak, belirtilen S3'deki CSV dosyasını okuyarak bir Spark DataFrame oluşturur.

Write to AWS S3, Read From AWS S3

DataFrame’in içindeki veriyi Parquet formatında S3 depolama alanına yazma ve ardından bu Parquet verisini tekrar bir DataFrame’e okuma işlemlerini gerçekleştirmektedir.

df.write.format('parquet').option('header','true').save('s3a://sparkdemode/iris_data_parquet', mode='overwrite'): Bu satırda, önceden oluşturulmuş olan DataFrame (df) içindeki veriyi Parquet formatında S3 depolama alanına yazma işlemi gerçekleştirilir. format('parquet') ifadesi, veriyi Parquet formatında kaydetmek istediğimizi belirtir. option('header','true'), Parquet dosyasının başlık bilgisi içermesini sağlar. save fonksiyonu, veriyi belirtilen S3 yoluna kaydeder. mode='overwrite' parametresi, var olan dosyaların üzerine yazılmasını sağlar.

df_s3 = spark.read.parquet('s3a://sparkdemode/iris_data_parquet'): Bu satırda ise, önceki adımda Parquet formatında kaydedilmiş olan veriyi S3 depolama alanından okuma işlemi gerçekleştirilir. spark.read.parquet fonksiyonu kullanılarak Parquet dosyası okunur ve bu veri ile yeni bir DataFrame olan df_s3 oluşturulur.

Apache Spark’ın büyük veri işleme yeteneklerini kullanarak veriyi Parquet formatında S3 depolama alanına yazma ve ardından bu veriyi tekrar okuma işlemlerini gerçekleştirmektedir. Parquet formatı, sıkıştırma ve sütun-tabanlı depolama gibi avantajlarıyla büyük veri setlerinin etkili bir şekilde saklanması için kullanılan bir sütun-tabanlı dosya formatıdır.

En son yayınlardan haberdar olmak veya benimle iletişime geçmek için:

LinkedIn: https://www.linkedin.com/in/enessoztrk/

Yeni yazılarımda görüşmek üzere…🧠

--

--