Postgres ve Apache Spark ile Büyük Veri Üzerinde Çalışma

Postgres ve Apache Spark ile Büyük Veri Üzerinde Çalışma

PostgreSQL, en gelişmiş açık kaynak veritabanı olarak bilinir ve veri kümesi ne kadar büyük, küçük veya farklı olursa olsun verilerinizi yönetmenize yardımcı olur. Bu yazıda, PostgreSQL ve Apache Spark ile birlikte çalışarak Büyük Veri üzerinde nasıl çalışılır konusunu anlatacağım.

OKAN KURNAZ
4 min readAug 23, 2022

--

Büyük veri analitiği için iki farklı türde analiz bulunur:

  • Batch Analitik : Belirli bir süre boyunca toplanan verilere dayanır.
  • Gerçek Zamanlı Analitik : Anlık bir sonuç için anlık verilere dayanır.

Apache Spark Nedir ?

Apache Spark, hem toplu hem de gerçek zamanlı analitik üzerinde daha hızlı ve daha kolay bir şekilde çalışabilen, büyük ölçekli veri işleme için birleşik bir analitik motorudur.

Java, Scala, Python ve R’de üst düzey API’ler ve genel yürütme grafiklerini destekleyen optimize edilmiş bir motor sağlar.

Apache Spark Componentleri

Apache Spark, PostreSQL ile Nasıl Kullanılır ?

PostgreSQL kümenizin hazır ve çalışır durumda olduğunu varsayacağım. Bu görev için Ubuntu 20.04 üzerinde çalışan bir PostgreSQL 12 sunucusu kullanacağız.

Öncelikle PostgreSQL sunucumuzda test veritabanımızı oluşturalım:

postgres=# CREATE DATABASE spark_deneme; 
CREATE DATABASE
postgres=# c spark_deneme
You are now connected to database "spark_deneme" as user "postgres".

Şimdi, tablo_1 adında bir tablo oluşturacağız ve birkaç test kayıt insert edeceğiz :

spark_deneme=# CREATE TABLE tablo_1 (id int, name text); 
CREATE TABLE
spark_deneme=# INSERT INTO tablo_1 VALUES (1,'name1');
INSERT 0 1
spark_deneme=# INSERT INTO tablo_1 VALUES (2,'name2');
INSERT 0 1

Apache Spark’ı PostgreSQL veritabanımıza bağlamak için bir JDBC bağlayıcısı kullanacağız. Buradan indirebilirsiniz.

$ wget https://jdbc.postgresql.org/download/postgresql-42.2.6.jar

Şimdi Apache Spark’ı kuralım. Bunun için spark paketlerini buradan indirmemiz gerekiyor.

$ wget http://us.mirrors.quenda.co/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz $ tar zxvf spark-2.4.3-bin-hadoop2.7.tgz $ cd spark-2.4.3-bin-hadoop2.7/

Spark kabuğunu çalıştırmak için sunucumuzda yüklü JAVA’ya ihtiyacımız olacak:

$  yum install java

Artık Spark Shell’imizi çalıştırabiliriz:

$ ./bin/spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://ApacheSpark1:4040
Spark context available as 'sc' (master = local[*], app id = local-1563907528854).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.3
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

Sunucumuzdaki 4040 portundan Web Kullanıcı Arayüzümüze erişebiliriz:

Apache Spark UI

Spark kabuğuna PostgreSQL JDBC sürücüsünü eklememiz gerekiyor:

scala> :require /path/to/postgresql-42.2.6.jar 
Added '/path/to/postgresql-42.2.6.jar' to classpath.
scala> import java.util.Properties
import java.util.Properties

Ve Spark tarafından kullanılacak JDBC bilgilerini ekleyelim:

scala> val url = "jdbc:postgresql://localhost:5432/spark_deneme" 
url: String = jdbc:postgresql://localhost:5432/spark_deneme
scala> val connectionProperties = new Properties() connectionProperties: java.util.Properties = {}
scala> connectionProperties.setProperty("Driver", "org.postgresql.Driver")
res6: Object = null

Artık SQL sorgularını çalıştırabiliriz. Öncelikle query1'i tablo_1 tablomuza SELECT * FROM tablo_1 olarak tanımlayalım.

scala> val query1 = "(SELECT * FROM tablo_1) as q1" 
query1: String = (SELECT * FROM tablo_1) as q1

Ve DataFrame’i oluşturalım:

scala> val query1df = spark.read.jdbc(url, query1, connectionProperties) 
query1df: org.apache.spark.sql.DataFrame = [id: int, name: string]

Şimdi bu DataFrame üzerinde bir eylem gerçekleştirebiliriz:

scala> query1df.show() 
+---+-----+
| id| name|
+---+-----+
| 1|name1|
| 2|name2|
+---+-----+
scala> query1df.explain
== Physical Plan ==
*(1) Scan JDBCRelation((SELECT * FROM t1) as q1) [numPartitions=1] [id#19,name#20] PushedFilters: [], ReadSchema: struct

Mevcut değerleri döndürdüğünü doğrulamak için Postgres’e daha fazla değer ekleyebilir ve tekrar çalıştırabiliriz.

PostgreSQL

testing=# INSERT INTO t1 VALUES (10,'name10'), (11,'name11'), (12,'name12'), (13,'name13'), (14,'name14'), (15,'name15'); 
INSERT 0 6
testing=# SELECT * FROM t1;
id | name
----+--------
1 | name1
2 | name2
10 | name10
11 | name11
12 | name12
13 | name13
14 | name14
15 | name15
(8 rows)

Spark

scala> query1df.show() 
+---+------+
| id| name|
+---+------+
| 1| name1|
| 2| name2|
| 10|name10|
| 11|name11|
| 12|name12|
| 13|name13|
| 14|name14|
| 15|name15|
+---+------+

Örneğimizde, Büyük Veri bilgilerimizi nasıl yönettiğini değil, yalnızca Apache Spark’ın PostgreSQL veritabanımızla nasıl çalıştığını gösteriyoruz.

Günümüzde, bir şirkette büyük verileri yönetme zorluğu oldukça yaygındır ve gördüğümüz gibi, bununla başa çıkmak ve daha önce bahsettiğimiz tüm özelliklerden yararlanmak için Apache Spark kullanabiliriz. Apache Spark ve PostgreSQL kullanımı hakkında daha fazla bilgi için resmi belgelere göz atabilir ve gereksinimlerinize uygun hale getirebilirsiniz.

--

--