Python’da Büyük Hacimli Verilerle Çalışmak — 2. Bölüm

Volkan Yurtseven
Akbank Teknoloji
Published in
29 min readJul 26, 2024

Pandas ile efektif veri okuma ve işleme yöntemleri

Bu serinin ilk kısmında Pandas dışında da veri okuma/işleme kütüphanelerinin varlığından bahsetmiştim. Evet, bunların çoğuyla daha verimli çalışılabilir ancak Pandas çok daha yaygındır. Bunun en büyük sebebi gerek kendisinin gerek baz aldığı Numpy’ın diğer kütüphanelerle (Ör: Sklearn) tam bir uyum içinde olmasıdır. O yüzden Pandas’la verimli çalışmanın yollarını bilmek oldukça önemlidir.

Not: Bu yazının veritabanıyla ilgili kısımlarında sadece Oracle örneklerini göreceksiniz, bununla beraber benzer yaklaşımlar diğer RDBMS’ler için de uygulanabilir, sadece kullanılacak kütüphaneler ve bağlantı yaratma yöntemleri değişecektir.

İçindekiler

Part I ToC

İçeriği üstte verilen Part I’e buradan ulaşabilirsiniz.

Veri Okuma

Herhangi bir veri kaynağından bir Pandas DataFrame’e veri okurken dikkat edilmesi gereken birçok madde olmakla birlikte, gözlemlediğim kadarıyla bu konu çok ihmal edilen ve bu nedenle verimsizliğe yol açan bir konudur, özellikle de veritabanından veri okuma kısmı. O yüzden buna geniş bir alan ayırmak istedim.

Pandas’ın mimarı Wes McKinney “Veri setinizin en az 5 katı büyüklüğünde RAM’iniz olsun” diyor ama bu kadar RAM’i kim kaybetmiş ki siz bulasınız. Veri kaynağının veritabanı olduğu bir durumda ve hele bir de compression yapılmışsa siz bunu 5 değil en az 10 kat diye düşünün. “Ne yani 10 GB’lık bir veri için 200 GB’lık bir RAM mi lazım?” diye düşünebilirsiniz. Teoride öyle ama pratikte elimizdeki tüm cephaneleri kullanarak ve Pandas’tan ayrılmadan bunu nasıl başarabileceğimizi göreceğiz.

Flat File Okuma

  • Mümkünse veri kaynağını oluşturan kişiye veriyi CSV/Excel yerine parquet veya feather formatında oluşturmasını söyleyin. Hem onlar diske daha hızlı yazmış olur ve daha az disk alanı kaplar, hem de siz çok daha hızlı okursunuz.
  • Eğer CSV mecburiyeti varsa, sadece gerekli kolonları okuyun ve kolonların veri tipini okuma sırasında (dtype parametresini vererek) belirtin.
  • Veriniz çok büyükse chunk’lar halinde okuyup, okurken de işlemek faydalı olacaktır.
  • Son olarak aslında işin temelinde veri tiplerini uygun veri tipine dönüştürme var ki yukarıda bahsettiğim işleme kısmı bunu da içeriyor. Bunun detaylarını da aşağıda göreceğiz.

Detaylar için basit bir “how to read csv files efficiently with pandas” araması size faydalı birkaç link çıkaracaktır.

Veri Tipi Dönüşümü

Elimde büyük bir CSV olmadığı için ben yine veritabanından bir tablo okuyup, bunu CSV olarak yazdıracağım, sonra onu chunk’lar halinde nasıl okuruz onu göstereceğim. Ancak tüm dönüşüm işini veritabanından okuduktan sonra yapmış olacağım. Aslında bu kısım doğrudan CSV’den okumayla alakalı olmayacak, ancak işlem bütünlüğü adına buraya koymanın daha uygun olacağını düşündüm. Devam edelim;

Şimdi ilk olarak Oracle veritabanından diskteki hacmi 23.5 GB olan bir tablo buldum. Siz de bir tablonun diskte ne kadar yer kapladığını şu sorguyla görebilirsiniz.

SELECT
to_char(bytes / 1024 / 1024 / 1024, '999.9') GB
FROM
dba_segments
WHERE
segment_type = 'TABLE'
AND owner||'.'||segment_name = 'ŞEMA.TABLO'

Bu tabloyu df değişkeni içine okuyorum, sonra da shape ve info bilgilerine bakıyorum. (Okuma kısmına ait farklı tipteki fonksiyonları daha aşağıda bulacaksınız, buraya bilerek onu koymadım.)

df.shape
# 30Mio satır, 25 kolon

df.info()
#......
# dtypes: datetime64[ns](2), int64(8), object(15)
# memory usage: 5.7+ GB

sys.getsizeof(df)>>30 #-->31 GB, sondaki '>>30' 1024^3'e bölmenin kısa yoludur

Memory usage kısmındaki ‘+’ için dokümantasyonun ne dediğine bakalım: “The + symbol indicates that the true memory usage could be higher, because pandas does not count the memory used by values in columns with dtype=object.” Yani yaklaşık 25 GB’lık (31–5.7) kısım bu object tipinden kaynaklı görünüyor. İşte biz de bu object tipli kolonların bazısını category tipine dönüştürerek optimize edeceğiz ama öncesinde hangi kolonları kapsama alacağımızı bulalım. Bu veri tipi, cardinalitesi (distinct değer adedi) düşük kolonlarda yüksek memory kazanımı sağlayacak.

Bildiğiniz gibi Pandas bir DataFrame yarattığında nümerik kolonlar için de en yüksek seviyedeki veri tiplerini kullanır, yani int64 ve float64. Bunların işi görece daha kolay, pd.to_numeric ile downcast edince iş bitiyor. Object (string) tipler için ise kapsam belirleme işi var. Bunlar için önerilen şudur: “Satır sayısının en fazla 2'de 1'i kadar distinct değer varsa bunu category tipine dönüştürün.” Bu 2'de 1 oranı bana çok yetersiz geldiği için ben emniyetli davranıp birçok projede 20'de 1 olarak ilerlemeyi seçiyorum. Bu durumda, “Ad Soyad” gibi bir alan çok sayıda distinct değer içereceği için böyle bir dönüşüme uygun olmayacakken, meslek bilgisi, il adı vs. gibi kolonlar kapsama girecektir.

Öncelikle bu yazı boyunca sıklıkla kullanacağımız iki fonksiyona bakalım: İlki, category tipindeki kolonları tespit eden fonksiyondur. İkincisi ise veri tipi dönüşümünü yapan fonksiyon

#bu iki fonksyion da dataanalysis.py(da olarak import edilir) modülü içinde yer almakta
def columnsFromObjToCategory(df:pd.DataFrame, n:int=20)->list:
dt=pd.DataFrame(df.dtypes, columns=["Type"])
dn=pd.DataFrame(df.nunique(), columns=["Nunique(Excl.Nulls)"])
d=pd.concat([dt,dn],axis=1).sort_index()
objtocat=d[(d["Nunique(Excl.Nulls)"]<(len(df)/n)) & (d["Type"]=="object")].index.tolist()
return objtocat

def optimize_types(df:pd.DataFrame, datetime_features:list, categoric_features:list)->pd.DataFrame:
floats = df.select_dtypes(include=['float64']).columns.tolist().removeItems_(datetime_features+categoric_features)
# df[floats] = df[floats].apply(pd.to_numeric, downcast='float') apply vektörize işlemi bozuyor, to_numeric zaten vectorized olduğu için faydalanalım
for f in floats:
df[f] = pd.to_numeric(df[f], downcast='float')

ints = df.select_dtypes(include=['int64']).columns.tolist().removeItems_(datetime_features+categoric_features)
# df[ints] = df[ints].apply(pd.to_numeric, downcast='integer') #aynı şekilde
for i in ints:
df[i] = pd.to_numeric(df[i], downcast='integer')

df[categoric_features] = df[categoric_features].astype('category')
return df

#optimize_types içnde kullandığım removeItems_ isimli extension metodu
def removeItems_(self,list2:list,inplace:bool=True)->list:
"""
Extension method for list type. Removes items from list2 from list1.
First, forbiddenfruit must be installed via https://pypi.org/project/forbiddenfruit/
"""
if inplace:
for x in set(list2):
try:
self.remove(x)
except:
pass
return self
else:
temp=self.copy()
for x in set(list2):
try:
temp.remove(x)
except:
pass
return temp

curse(list, "removeItems_", removeItems_)

Şimdi yukarıda elde ettiğim df için bu fonksyionları çalıştırıp sonuca bakalım:

cats=da.columnsFromObjToCategory(df.select_dtypes('O'),20) #burdan tesadüfen tüm object kolonlar geldi, yani elenen olmadı
dates = df.select_dtypes(np.datetime64).columns.tolist()

dfopt=da.optimize_types(df, dates, cats)
dfopt.info()
#.......
#dtypes: category(15), datetime64[ns](2), int32(6), int64(2)

Güzel bir dönüşüm sağladık; 8 int64 kolonundan 6’sını int32'ye, tüm object olanları da category’ye çevirdik. Memory kullanımına bakınca 31 GB’tan 2.3 GB’a düştüğünü gördüm, gerçekten çok başarılı.

Diske Yazarken Dosya Tipi Belirleme

Şimdi bunları bir de farklı formatlarda diske yazıp hem hacim hem süre kontrolü yapalım.

dfopt.to_csv("df.csv") #3 dk 4 sn, 6.55 GB
dfopt.to_pickle("df.pkl") #1 sn, 2.44 GB
dfopt.to_parquet("df.parquet") #11 sn, 1.13 GB
dfopt.reset_index().to_feather("df.feather") #2 sn, 1.44 GB, feather does not support serializing a non-default index for the index; you can .reset_index() to make the index into column(s)

Gördüğünüz gibi süre açısından diğer 3 yöntem CSV’ye göre fersah fersah önde, disk hacmi olarak yine oldukça fark var. Bu seçeneklere göre sanki parquet veya feather tercih edilebilir gibi duruyor. Bunlar yazma performansı, okuma performansı da benzer şekilde olacaktır.

CSV’den Okuma

CSV dosyamızı oluşturduğumuza göre artık chunklar halinde okumayı test edebiliriz. Burada da benchmark olması adına 3 farklı yöntem uygulayacağız.

  • Tek seferde okuma
  • Veri tipi belirterek okuma (yukarıda yaptığımız dönüşümü okurken yapmış olacağız)
  • Chunk’lar halinde okuma

Tek seferde okuma

Öncelikle dosyadan örnek bir 5 satır alıp kolonlara ve veri tiplerine bakalım.

df_tek = pd.read_csv("df.csv", nrows=5)
df_tek.info()

Bunun çıktısında ilk iki kolonun anlamsız olduğunu gördüm, o yüzden bunları okumayacağız. Şimdi sadece gereklileri okuyalım:

cols = df_tek.columns[2:]
df_tek = pd.read_csv("df.csv", usecols= cols)
len(df_tek) #30.5 mio satır var

Tüm okuma 1 dk 16 sn sürdü, memory kullanımı 33 GB

Veri tipi belirterek okuma

Şimdi öncelikle bir önceki senaryoda elde ettiğimiz dataframe üzerinden unique değer kontrolü yaparak hangi kolonlar category olabilir bunları belirleyelim.

df_tek.select_dtypes("O").nunique()

Sonuca baktığımda 2 kolonun aslında tarihsel kolon olduğunu(isimlerinden anladım), diğer hepsinin düşük cardinalite(distinclik) durumu sayesinde category yapılabileceğini gördüm. Bu iki tarihsel kolon ise muhtemelen pandas için geçersiz tarihleri içerdiği için object olmuş. Bunları pd.to_datetime converter’ı ile çevirmemiz gerekecek.

Şimdi veri tiplerini belirtmek üzere bir dictionary hazırlayalım. pandas 1.5.0 sürümünden itibaren read_csv içindeki dtype parametresi için defaultdict kullanılabiliyor, biz de bunu kullanalım.

from collections import defaultdict
types = defaultdict(lambda: "category") #default değer category olacak

Diğer tüm kolonları int32 yapacağız, dictionary’de bunları belirleyelim. (Burada bir sakınca var, az sonra bahsedeceğim)

for c in df_tek.select_dtypes("int").columns:
types[c]="int32"

Son olarak yukarıda bahsettiğimiz 2 tarihsel kolon için bi converter fonksiyon yazalım. İlgili kolonları converters parametresine vereceğiz ve bunları tarih tipine dönüştürmüş olacağız. Bu parametrenin kullanımını bilmiyorsanız pandas dokümantasyona bakabilirsiniz.

to_date = lambda x: pd.to_datetime(x, errors='coerce')
converters={'ZAMAN':to_date,'GUNC_TAR':to_date}

Son olarak okumayı yapalım:

df_dtyped=pd.read_csv("df.csv", dtype=types, converters=converters, 
keep_default_na=False, usecols = cols)

Burada bir uyarı çıkabilir, bu normal, dikkate almayın.

Both a converter and dtype were specified for column ZAMAN - only the converter will be used.

Bu yöntemde converterlardan kaynaklı olarak okuma biraz uzun sürdü: 37 dk (Nedeni hakkında bir fikrim yok maalesef), memory kullanımı ise 2 GB oldu. Converter kullanmayıp bu kolonları olduğu halde bırakırsam okuma 4 dk 47 sn sürüyor, memory kullanımı ise 8 GB oldu, hemen arkadan bu 2 kolon için manuel dönüşüm yapmak ise 15 sn sürdü.

Şimdi, yukarıda belirttiğim sakınca şu: Biz tüm int64 kolonları int32 yapmış olduk. Halbuki bazısı int32/int16/int8 olabilirdi. Maalesef bunları tek tek belirtmekten başka bir yöntem yok. Biz burada basitlik olması adına hepsine int32 dedik. Bununla beraber eğer bir kolon int32 olamayacak kadar büyük değerler içeriyorsa veri kaybı yaşardık.

İşte hem bu veri kaybı olasılığına karşı hem de gereksiz yere daha yüksek veri tipinde(int32) tutma ihtimaline karşı ben üçünü senaryoyu önereceğim.

Chunk’lar halinde okuma

Burada optimizasyon ve veri tipi dönüşüm işlemlerini chunk’lar üzerinde yapacağız. Öncelikle bu işlemleri yapacağımız bir fonksiyon tanımlayalım:

def process(df,dates,cats):
df=da.optimize_types(df,dates,cats)
for d in dates:
df[d]=pd.to_datetime(df[d],errors='coerce')
return df

Şimdi de nihai kod:

chunks=[]
cs=1000000
dates=["ZAMAN","GUNC_TAR"]
cats=da.columnsFromObjToCategory(df_tek2.select_dtypes('O'),20)
with pd.read_csv("df.csv", chunksize=cs, usecols=cols) as reader:
for chunk in reader:
chunk = process(chunk, dates, cats)
chunks.append(chunk)

Bunları bir de concat edip nihai df’mizi elde edelim:

df_chunk=pd.concat(chunks,ignore_index=True,axis=0)     

Bu senaryoda chunk’ların okunması, okunurken veri tipi dönüşümü ve chunkların birleştirilmesi 2,5 dk sürdü. Son durumda memory kullanımı da 2 GB oldu. Veri tiplerine baktığımda int16 ve int32ler görüyorum, demek ki hepsini int32 yapmak akıllıca değilmiş. Bu arada 1 kolon ise int64 kalmış. Demek ki 2.senaryodaki yöntemde bu kolon için veri kaybı yaşadık.

Not: Bu senaryoda object tiplerin category’ye dönüşmediğini görme ihtimaliniz yüksek, ki bende böyle oldu. Bunun için çözüm, concat edilmiş df üzerinden bir kez daha optimize_types fonksiyonunu çalıştırmak olacaktır. Bunun detaylarına biraz aşağıda gireceğim.

Özet

En hızlı yöntem tek seferde okuma gibi görünüyor. Veri tiplerini vererek okuduğumuzda memory kullanımı düşmekle birlikte çalışma süresi uzadı. Chunk’lar halinde okuma ise memory tüketimi açısından en iyi yöntem gibi duruyor.

Daha detaylı nasıl analiz yapılır, son durumdaki memory bilgisi mi önemli yoksa okuma sürecindeki tüm memory footprinti mi dikkate alınmalı, bütün bunları veritabanından okuma kısmında detaylıca göreceğiz. Benzer mantık CSV okumada da uygulanabilir.

Veritabanından Okuma

Pandas ile veritabanından okuma yaparken read_sql metodu kullanılır. Burada veri aslında öncelikle sqlalchemy kütüphanesi kullanılarak cursor aracılığıyla okunur. Bir profiler ile bakıldığında görülecektir ki, aslında okunan verinin gerek sunucu diskindeki hacmi gerek memory’deki cursor’lı okunmuş hali görece daha düşüktür. Cursor’la okunduktan sonra dataframe üretimi yapıldığında üzerine belli miktar memory tüketimi daha gelir. Python/Pandas/Jupyter üçlüsünden kaynaklı ilave overhead’leri saymıyorum bile (Bakınız Part I). Bu arada siz de manuel olarak cursor yaratabilir ve read_sql metodunu kullanmadan veri okuma ve dataframe üretme süreci üzerinde daha çok kontrol sahibi olabilirsiniz. Böyle yapıldığında cx_Oracle (sonradan python-oracledb oldu) kütüphanesi de kullanılabilir, ki biz de aşağıdaki örneklerde bunu kullanacağız.

Veri, okunduktan sonra Pandas API’sine paslanır. Yani Pandas veriyi doğrudan okumaz. Pandas’a paslandıktan sonra Pandas birçok ara (intermediate) veri yapısı oluşturarak memory tüketimini artırabilir, bu bazen birkaç kat olabilir, bunun detaylarını ilk yazıda görmüştük. Ve birçok durumda kullanıcı “Benim verim bu kadar büyük değil ki, neden memory patladı?” diye sorar. İşte çoğu zaman bunun sebebi bu ara yapılardır ve kendisini “dead kernel” hatasıyla gösterir.

SQLAlchemy vs cx_Oracle

Aşağıda kod örneklerinden ilkinde göreceksiniz, bunda hem SQLAlchemy hem cx_Oracle ile kod örneği var, diğerlerinde sadece cx_Oracle ile ilerledim. Sebebi belli; cx_Oracle daha performanslıdır.

Bunların ikisi de veri okuma için uygundur aslında. cx_Oracle daha doğrudan bir bağlantı imkânı sunarken, SQLAlchemy engine’i daha gelişmiş bir bağlantı yönetimine imkân veriyor ve Pandas ile daha sorunsuz çalışıyor. Zira Pandas’la cx_Oracle kullanıldığında şu uyarı çıkmakta:

UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.

Fakat ben cx_Oracle ile daha hızlı okuma ve daha düşük memory tüketimi olduğunu gözlemledim. Örnek vermek gerekirse bir tabloyu cx_Oracle ile 13 dakika içinde okudum ve 82 GB memory tüketimi oldu, SQLAlchemy ile ise bu sayılar 21 dakika ve 119 GB şeklindeydi. Bu farkın sebebini ChatGPT’ye ve Gemini’a sorduğumda şu cevapları aldım:

SQLAlchemy provides an object-relational mapping (ORM) layer that creates in-memory representations of your database tables (models). This additional layer can introduce some memory overhead compared to the more direct connection approach of cx_Oracle.

Lazy Loading: If using SQLAlchemy’s ORM features, explore lazy loading techniques (e.g., defer, joinedload) to minimize the amount of data loaded into memory at once.

Muhtemelen kompleks kullanımlar için SQLAlchemy daha iyidir ancak sadece veri okuma söz konusu olduğunda cx_Oracle daha uygun bir tercih gibi duruyor.

Bu arada cursor’la okuma konusunda, hatta genel olarak cx_Oracle kullanımı konusunda şurada özet bir bilgi bulunuyor, bakabilirsiniz.

SQL ve Veritabanı Mimarisi

Veriyi Python’da işlemeye geçmeden önce size önereceğim bazı hususlar olacak. Öncelikle iyi derecede SQL öğrenmenizi tavsiye ederim, sonrasında genel olarak bir veritabanı nasıl çalışır, genel mimarisi nasıldır, bir DBA kadar olmasa bile genel bir bilgi sahibi olmanız önemli. Böylece işin Python’a kalacak kısmını da hafifletmiş olursunuz. Bu konuda YouTube ve Udemy gibi platformlarda işleyişi görsel olarak da anlatan birçok kaynak bulunuyor.

Ben de size birkaç öneri vermek isterim. Benim vereceğim örneklerin hepsi Oracle için olacaktır. Farklı bir RDBMS’de çalışıyorsanız muadil işlemleri bulabilirsiniz.

  • Öncelikle, birçok işlemi SQL ile de yapabileceğinizi bilin. Mesela iki kolon arasındaki benzerliği bulmak için illa Python’ın NLP paketlerini kullanmanız gerekmiyor. Bu işi Oracle’ın UTL_MATCH paketindeki EDIT_DISTANCE ve JARO_WINKLER fonksiyonlarıyla da yapabilirsiniz, üstelik çok daha hızlı. Keza bir kolondaki outlierları bulmak için PERCENTILE (ve türevleri) fonksiyonunu veya analitik fonksiyonları kullanabilirsiniz. Veritabanı mimarisi hakkında bilginiz de iyi olursa tablo için uygun tipte bir index yaratılmasını da isteyebilirsiniz, böylece outlier hesabındaki sıralama işlemleriniz optimize edilmiş olur, özellikle sıklıkla bu hesabı yapmanız gerekecekse. Örnekler çoğaltılabilir.
  • Büyük hacimli bir veri okuyacaksanız, öncelikle bir sample çekin ve veriyi Python’da profilleyin derim. Profilleme sonunda sadece analizde işinize yarayacak olan kolonları belirleyin. Örneğin TCKN, müşteri ad-soyad gibi cardinalitesi (distinctlik) yüksek alanlar çoğu durumda bir veri analizinde işe yaramazlar. Bu yüzden “Select *” ile gereksiz okuma yaparak hem kodunuzun uzun sürede çalışmasına neden olursunuz hem de veritabanına gereksiz yük bindirmiş olursunuz hem de daha çok memory tüketmiş olursunuz.
  • Tablonuzda partition olup olmadığını bilmek de önemli. Her tabloda partition olmak zorunda değil fakat özellikle partition’lı kolonlara göre parçalayıp paralel okuma yaparsanız okuma performansı çok daha iyi olacaktır, zira partition olmayan veya partition olduğu halde buna göre parçalanmayan sorguların her biri tabloya full gidecektir (full table scan), bu da performans kaybı demektir. Bu konuya ait örnekler aşağıda olacak.
  • Okuma yapacağınız tablonun paralellik derecesi (DEGREE) de önemli, bu da sunucu tarafındaki paralellik miktarını gösteriyor. DEGREE değeri düşükse PARALLEL hint’i ile okuma performansını artırabilirsiniz. (Her zaman garanti olmamakla birlikte. Ayrıca veritabanının global max paralel limiti de bulunabilir. Mesela max limit 16 ise , veritabanı müsaitliği olsa bile siz 32 verseniz işe yaramaz, 16'yla sınırlanır)
  • Okuyacağınız tablonun disk hacmini bilmeniz de önemli ki bunun sorgusunu yukarıda vermiştim. Tabloda compression yapılıp yapılmadığına da bakmak gerek. Compress’li ise ve diyelim ki diskte 5 GB yer tutuyorsa gerçekte bundan daha fazla yer tutacağı aşikârdır. Ne yazık ki compress oranını öğrenmenin doğrudan bir yolu yok, 3rd party araçlar varmış, ben hiç kullanmadım, deneme yanılma yoluyla ilerliyorum.

Aşağıda bazı faydalı metadata sorgularını bulabilirsiniz.

--index
SELECT COLUMN_NAME
FROM dba_ind_columns
WHERE table_owner='....' and table_name='....'

--önemli alanlar: last analyzed önemli, güncel değilse sıkıntı, ama partitionlıysa parititona analiz tarihine bakmak lazım
Select DEGREE,LAST_ANALYZED,PARTITIONED,NUM_ROWS,COMPRESSION
FROM ALL_TABLES
WHERE owner='....' and table_name='....'

--partiton olduğundan emin olduktan sonra hangi kolonda olduğu
select OBJECT_TYPE, COLUMN_NAME, COLUMN_POSITION
from ALL_PART_KEY_COLUMNS
where OWNER='...' and name='....'

--sample okuma
Select * from Table1 sample(1) --%1'i okunacak

--birden fazla tablo durumunda sample okuma
--dbms_random.random fonksiyonu kullanılabilir
  • Execution plana bakmayı ve yorumlamayı bilmekte de fayda var, burada özellikle TEMP SPACE alanı önemli. Sizin kullanıcı adınızın bağlı olduğu user grubunun TEMP SPACE kotasını bilmeli ve bunun veri okuma sırasında ne durumda olduğunu da takip edebilmelisiniz. Bunun için aşağıdaki sorgudan faydalanabilirsiniz. Bunu bir fonksiyon haline getirip, döngüsel okuma sırasında bu değeri loglamanızda fayda var. Bu alan özellikle sıralama gibi maliyetli işlemlerde şişebilmektedir. Unutmayın, büyük hacimli veri okurken sadece kendi makinenizin RAM sınırı değil database tarafındaki kullanıcı grubunuzun TEMSPACE kotası da önemli bir faktör olacak. Yoksa işleriniz sürekli kill olur.
SELECT tablespace_name, tablespace_size/ 1024 / 1024 /1024 TOTAL_GB , 
round(free_space / 1024 / 1024 /1024,0) FREE_GB FROM dba_temp_free_space
WHERE tablespace_name = '.....' ORDER BY 1 ;

Memory_profiler ile Gözlem

Part I’de gördüğümüz memory_profiler’ı biraz hatırlayıp veritabanından veri okurken neler olduğunu biraz daha yakından görelim.

from memorytest_fornotebook import readData
%mprun -T mprof0.txt -f readData readData(sql)

Widget’ta 1.8’den başladık, 13.2’ye çıktı, sonra 9.6’ya düştü, biraz bekledi ve sonra bir anda 2 oldu, yani DataFrame’i memory’den sildi.

*** Profile printout saved to text file mprof0.txt.
Line # Mem usage Increment Occurrences Line Contents
=============================================================
12 287.4 MiB 287.4 MiB 1 def readData(sql,us,pwd):
13 7985.1 MiB 7697.7 MiB 1 dfmim1=da.oracleSql(sql,usr,pwd)

Ama bu çıktı bize tam olarak ne olduğunu söylemiyor, zaten tüm okuma için tek satırlık bir fonksiyon var. O zaman bu fonksiyonu olabildiğince açarak, yani cursor kullanarak tekrar yazalım.

def readCursor(sql, cols, usr,pwd):
try:
exadatahost, exadataport, db = getExadataHostPort() #utility function
with cx_Oracle.connect(usr, pwd, exadatahost + ":"+ exadataport + '/' + db, encoding = "UTF-8", nencoding = "UTF-8") as conn:
with conn.cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
print(len(rows))
print(f":getsizeof rows: {sys.getsizeof(rows)>>20} MB")
print(rows[0])
df=pd.DataFrame(rows,columns=cols)
del rows
except Exception as e:
print(str(e))

Aşağıdaki gibi çağıralım:

from memorytest_cursor import readCursor
%mprun -T mprof1.txt -f readCursor readCursor(sql,cols)

Bunun çıktısı da şöyle:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
13 287.0 MiB 287.0 MiB 1 def readCursor(sql, cols, usr,pwd):
14 287.0 MiB 0.0 MiB 1 try:
15 287.1 MiB 0.0 MiB 1 exadatahost, exadataport, db = getExadataHostPort()
16 308.2 MiB 21.0 MiB 1 with cx_Oracle.connect(usr, pwd, exadatahost + ":"+ exadataport + '/'+ db,encoding = "UTF-8", nencoding = "UTF-8") as conn:
17 308.2 MiB 0.0 MiB 1 with conn.cursor() as cursor:
18 308.2 MiB 0.0 MiB 1 cursor.execute(sql)
19 7281.1 MiB 6972.9 MiB 1 rows = cursor.fetchall()
20 7281.1 MiB 0.1 MiB 1 print(len(rows))
21 7281.1 MiB 0.0 MiB 1 print(f":getsizeof rows: {sys.getsizeof(rows)>>20} MB")
22 7281.1 MiB 0.0 MiB 1 print(rows[0])
23 8169.1 MiB 888.0 MiB 1 df=pd.DataFrame(rows,columns=cols)
24 8088.4 MiB -80.8 MiB 1 del rows

Bu çıktıyı şöyle yorumlamak lazım:

Normalde okuduğumuz veri aslında 6972 MB. Bunu DataFrame’e çevirince ilave 888 MB geliyor, bunların sebebini yine 1. yazıda görmüştük. Rows’u silince sadece rows’a özgü 80 MB siliniyor, kalan 6892'lik kısım df içinde yaşamaya devam ediyor. Gördüğünüz gibi aslında okunan veriyi DataFrame’e çevirmek %15 civarında hacim artışına neden olabiliyor. İhmal edilmeyecek bir oran.

Pandas ile Paralel Veri Okuma

Veri tabanında bilmemiz gerekenleri öğrendik, şimdi tekrar pandas’a odaklanalım. Pandas’a yeni başladıysanız read_sql metodu ile datayı okumayı biliyorsunuzdur.

İlk soru şu: Veri çok büyükse ve yeterli miktarda memory’nin olduğundan eminseniz en ideal okuma şekli nedir? pd.read_sql ile tek seferde mi? Tabii ki hayır, böylesi çok uzun sürecektir. Veritabanları her ne kadar kendi içinde paralel okuma yapıyor olsa da client’ta yani sizin makinede bunu sadece tek bir proses yönetmektedir, yani veri size bu tek proses üzerinden topluca gelecektir. Ancak bu verinin size tek bir kanaldan gelmesine gerek yok. Mesela aşağıdaki görselde siz bağlantı kurup sorgu çekmeye çalıştığınızda veritabanının o anki müsaitliğine göre, tablonun paralellik derecesi 4 olduğu için veri de 4 paralel şekilde okunacaktır. Bu arada tablo üzerinde paralellik derecesi verilmediyse (DEGREE=1 ise) siz PARALLEL hint verip kendiniz de paralellik sağlayabilirsiniz. (Ancak DEGREE=X de olsa veya siz bu değeri hint olarak da verseniz illa X adet paralellik olmak durumunda değil, yani bu hiçbir zaman garanti edilmiyor. Veritabanı müsaitliği önemli ama bunun detayları şu an bizi ilgilendirmiyor, müdahil olabileceğimiz bir detay değil zaten.)

Single Thread okuma

Sadece veritabanı (server) tarafı değil client olarak sizin bilgisayarınız da paralel sorgulama yapabilir ve bizim odaklanacağımız kısım da bu olacak.

Tarih kolonuna göre partition yapılmış bir tablonuz olsun. Daha önce söylediğimiz gibi tabloda partition olmak zorunda değil. Çok büyük bir tabloysa muhtemelen vardır ama olmasa da herhangi bir kolona göre parçalama yapabiliriz. Biz partition olduğu varsayımıyla devam edelim. Bu tablo üzerinde 8 paralellik verilmiş olsun. Yani ideal durumda veri, veri tabanı tarafında 8 server-side process ile okunacak. Biz bu büyük hacimli veriyi partition kolonuna göre parçalara bölüp client tarafında da çoklu okuma yapabiliriz. Şimdi de ikinci soru geliyor: Client tarafında kaç paralellik vermeliyiz?

Normalde veri okuma işleri IO Bound işler olduğu için böyle bir işte multithreading yöntem tercih edilir ve paralellik derecesi için de deneme yanılma yoluyla bir değer bulunur. Bu arada DB adminlerin de müsaade edeceği bir üst limit de olabilir, ona da dikkat etmek lazım. Biz abartmayalım ve diyelim ki, 4 parallellik vereceğiz. Okuyacağımız veri de 1 yıllık (yuvarlak hesap 320 gün diyelim) olsun. Yani tarih kolonuna göre 4'e bölüp, PC’mizden çıkacak her bir kanal 80 günlük veri okuyacak. Tablonun da kendi içinde 8 paralelliği var ve DB de müsait, yani bundan tam faydalanacak diyelim, yani server tarafındaki paralel işlerin her biri de 10 günlük veri okuyacak. Bu durumda okuma sürecini şöyle bir temsille göstermek hatalı olmayacaktır:

Peki client tarafında paralellik vermek ne demek? Burada önemli olan detay da budur; sizin PC’nizde paralellik yönteminin ne olacağı ve kaç adet olacağı. Multithreading yönteminin ekstra bir thread yaratma maliyeti (overhead) var. Onun yerine data okumada daha çok asenkron yöntem önerilir, ki bunun maliyeti daha düşüktür. Şöyle bir analoji ile ilerleyelim:

İlk 80 günlük data yemeği için bir garson gönderip yemeği getirmesini beklerken diğer 80 günlük data için de ilave garson gönderebiliriz. Seçeceğimiz yönteme göre garson türü ve adedi değişecek. Thread kullanmayı normal garsona benzetebiliriz, asenkron yaklaşımda ise daha lightweight yapılar devreye girer, bunu da çaylak bir garson olarak düşünebilirsiniz. Üstelik thread’li okumada 4 garson göndermeniz lazımken, asenkron okumada tek çaylak da gidebilir, zaten bu garsonun yapacağı iş mutfağa siparişi vermek ve sadece bunların pişmelerini beklemektir, sonra bunlar çıktıkça arka arkaya servis yapabilir. Burada esas maliyetli(uzun süren) kısım servisi yapmak(veriyi pandasa paslamak) değil yemeklerin pişmesini(datanın okunmasını) beklemektir. (Bu arada asenkron yapıda işletim sistemi gerekirse 1/2/3/4 çaylak kullanmaya da karar verebilir, bu bizim karışabildiğimiz bir süreç değildir.)

Ne var ki, Pandas’la veritabanından asenkron veri okumanın doğrudan bir yolu şu an yok, biraz dolambaçlı işler yapmak gerekiyor. Bunlar da şu an size anlatmak istediğim senaryolar kadar basit olmadığı için bu yazımda bu senaryoya girmeyeceğiz. Belki ileride ayrı bir gönderi olarak paylaşabilirim.

O zaman tek seçeneğimiz thread (normal garson) kullanmak gibi duruyor. Ama biz veriyi okurken aynı zamanda veri tipi dönüşümü gibi bir processing(veri işleme) işlemi de yapacağız. (Biz basitlik olması adına sadece veri tipi dönüşümü yapacağız ama aslında başka processing işlemleri de yapılabilir, ki bunlar hep CPU-bound işlerdir.) O yüzden thread değil process kullanmamız gerekir, yani multithread değil multiprocess çalışacağız; bu yöntemdeki garsonları da kıdemli garsona benzetebiliriz, zira process yaratmanın overhead’i daha fazladır ama bir yandan buna mecburuz ve katlanacağız. Zaten en alttaki karşılaştırma tablosunda rakamlar beni doğrulayacaktır.

Veri dönüşüm işi olmasaydı belki threading tercih ederdik. Gerçi süre açısından bakınca multiprocess çalışmak daha avantajlı, en azından kendim için bu şekilde daha hızlı sonuç döndüğünü deneyimledim. Ancak bunun da kendi içinde 2 dezavantajı var: 1) Gereksiz yere CPU’ları işgal etmiş oluruz. Eğer CPU tüketimi açısından bir sıkıntımız yoksa endişe etmeden düşünülebilir. 2) Multithread çalışırken tek bir ConnectionPool yaratıp, connection’ları bunun üzerinden yaratmak şeklinde ilerliyoruz. Multiprocess’te ise process sayısı kadar connection yaratılmış olacaktır, bu da DBA’lerin çok arzu ettiği bir durum değildir. Pooling konusunda buradan detaylı bilgi alabilirsiniz. Bu konuda da bir sınırlamanız yoksa veya uygun sınır kadar process yaratarak yine multiprocessing’i düşünebilirsiniz.

Bu arada core sayımız az ise birçok thread yaratmak multiprocess çalışmaya göre daha avantajlı olabilir, o zaman yine multithreading düşünülebilir. Bunlar hep deneme yanılmayla tespit edeceğiniz durumlardır.

Not: Part I’in ilk bölümünde paralelleştirme kavramlarından bahsetmiştim, bunlarla ilgili daha detay bilgilere oradaki kaynaklardan ama özellikle şuradan bakabilirsiniz.

Şimdi yukarıda bahsettiğimiz alternatifler için kodlarımıza ve rakamlara(süre ve memory) bakalım.

Düz Okuma

Öncelikle düz okuma durumuna bakalım. Buradaki fonksiyonumuzda, yukarıda belirttiğim gibi hem SQLAlchemy hem cx_Oracle yöntemini kullanabileceğimiz bir parametre olacak. Sadece bu fonksiyon için bu iki opsiyonu da koydum, sonrakilerde sadece cx_Oracle olacak.

#memorytest.py dosyasının içeriği, diğer senaryolardaki fonksiyonlar da bu dosya içindedir
#o yüzden buradaki import kısmı diğerilerinde de geçerlidir
from urllib.parse import quote_plus
from sqlalchemy import create_engine
import sys
from utility import getExadataHostPort
import dataanalysis as da #2 önemli fonksiyon burada, yukarıda vermiştik
import numpy as np
import pandas as pd
import cx_Oracle
import gc

#DB bağlantısı için gerekli bilgileri bi fonksiyon aracılığı ile okuyorum, siz burda kendi bilgilerinizi elle yazabilirsiniz
exadatahost, exadataport, SERVICE_NAME = getExadataHostPort()

def duzOkuma(query, usr, pwd, sqlalchemy):
if not sqlalchemy:
with cx_Oracle.connect(usr, pwd,exadatahost + ":"+ str(exadataport) + '/'+SERVICE_NAME,encoding = "UTF-8", nencoding = "UTF-8") as connection:
df = pd.read_sql(query, connection)
return df
else:
DIALECT = 'oracle'
SQL_DRIVER = 'cx_oracle'
USERNAME = usr
PASSWORD = pwd
HOST = exadatahost
PORT = exadataport
ENGINE_PATH_WIN_AUTH = DIALECT + '+' + SQL_DRIVER + '://' + USERNAME + ':' + quote_plus(PASSWORD) +'@' + HOST + ':' + str(PORT) + '/?service_name=' + SERVICE_NAME
engine = create_engine(ENGINE_PATH_WIN_AUTH, encoding='iso-8859-9')
with engine.connect() as connection:
df = pd.read_sql(query, connection)
df.columns = [x.upper() for x in df.columns] #sqlalchemy olduğunda, kolon isimleri küçük harf döndüğü için
return df

Baştan söyleyeyim, yukarıda “select *” yapmak yerine analizde kullanacağınız kolonları belirleyin demiştik ama burada SQL metnini çok uzun göstermemek adına bu şekilde ilerleyeceğiz.

sql = 'select * from TABLO where CALL_DATE between trunc(sysdate)-32 and trunc(sysdate)-1'
df_duz = duzOkuma(sql, usr, pwd, True) #SQLAlchemy ile
df_duz = duzOkuma(sql, usr, pwd, False) #cx_Oracle ile

Süre ölçümü için nbextensions’taki execution time değerini, memory ölçümünde peak memory değeri için %%memit komutunu ve son durumdaki memory tüketimini görmek için resource_usage widget’ını (bundan sonra kısaca widget diyeceğim) kullanacağım. Ayrıca okuduğumuz DataFrame’in hacmini görmek için de sys.getsizeof fonksiyonunu veya DataFrame’in memory_usage metodunu kullanacağım. Bütün bunlardan Part I’de bahsetmiştik, o yüzden detaylarına girmiyorum.

Bu tablo özelinde rakamlar şöyle:

Tabloyu okuma süresi(SQLAlchemy ile): 20 dk
Tabloyu okuma süresi(cx_Oracle): 14 dk
Memory tüketimi(SQLAlchemy): peak =164 GB , widgetta 120 GB, sys.getsizeof(veya df.memory_usage): 72 GB.
Memory tüketimi(cx_Oracle): peak =164 GB, widget 88 GB, sysgetsize of yine 72.
Widget ve sys.getsizeof arasındaki farkın sebebini Part I’den biliyorsunuz.

Özetle hem süre açısından hem memory kullanımı açısından cx_Oracle kullanmak daha mantıklı duruyor. Bundan sonra da sadece bu şekilde bağlantı oluşturulacaktır. Şimdi bir de info bakalım:

df_duz.info()
#....
#dtypes: datetime64[ns](13), float64(34), int64(10), object(38)
#memory usage: 16.9+ GB

Flat_file kısmında gördüğümüz gibi burada görünen 16.9 GB, object tipli kolonların hacmini içermiyordu, zaten yanındaki + işareti de bunu anlatıyordu. Kalan yaklaşık 55 GB, object tipli kolonlardan geliyor. Biz bu 55 GB’lık metinsel veriyi ve ilaveten nümerik kolonlardaki 64 bitlik veriyi nasıl düşüreceğimize bakacağız. O yüzden şimdi bir de veri tipi optimizasyonu yapıp sayıları görelim:

read_as_date = df.select_dtypes(np.datetime64).columns.tolist() #ideal olanı veritabanı metadatasından bakmak, zira bazen oraclede 31.12.9999 veya 01.01.1900 gibi tarihler nedeniyle tip object gelebiliyor
read_as_category = da.columnsFromObjToCategory(df.select_dtypes('O'),20)

df_duz_opt=da.optimize_types(df_duz, read_as_date, read_as_category)

Buradaki işlem 15 dakika sürdü, df’in son memory tüketimi ise 14 GB. Dönüşüm işlemi inplace olmakla birlikte Part I’den bildiğiniz sebeplerle widget’ta bir miktar memory artışı da gözlemlenebilir. Şimdi tekrar info bakalım:

df_duz_opt.info()
#.....
#dtypes: category(34), datetime64[ns](13), float32(33), float64(1), int16(2), int32(2), int8(6), object(4)
#memory usage: 7.7+ GB

Özetle toplam süre, 14 + 15 =29 dakika. Memory işgali ise 72 GB’dan 14’e inmiş durumda, bunun da büyük kısmı 34 adet object tipli kolonun category’ye dönüşmesi sayesinde oldu, bir kısmı da downcast olan nümerik kolonlar sayesinde. Memory optimizasyonu oldukça başarılı, şimdi paralel yöntemlerle karşılaştırma zamanı.

Multithread Okuma

Öncelikle multithread’e bakalım. Her ne kadar ideal durumda bu yöntemi tercih etmeyecek olsak da, benchmark değeri olarak görmekte sakınca yok.

#memorytest.py içindeki fonksiyonumuz
def multiThreadOkuma(_sessionPool1, query, read_as_date, read_as_category):
connection=_sessionPool1.acquire() #Havuzdan connection çekiyoruz
df = pd.read_sql(query, connection)
df = da.optimize_types(df, read_as_date, read_as_category)
_sessionPool1.release(connection) #conneciton serbest bırakıyoruz
return df

Fonksiyon imzasından görüleceği üzere öncelikle bir SessionPool(ConnectionPool) hazırlamak gerekiyor, ayrıca 8 adet thread göndereceğiz.

import cx_Oracle
kanaladet=8
_sessionPool1 = cx_Oracle.SessionPool(usr, pwd, exadatahost + ":"+ str(exadataport) +'/'+SERVICE_NAME,encoding = "UTF-8",
nencoding = "UTF-8", min=kanaladet, max=kanaladet, increment=0,
threaded = True, getmode = cx_Oracle.SPOOL_ATTRVAL_WAIT) #min,max aynı olsun, incr=0

Her bir thread’de çalışacak SQL metinlerini hazırlayalım:

sqller = [f"select * from TABLO where CALL_DATE between trunc(sysdate)-{x+4} 
and trunc(sysdate)-{x}" for x in list(np.arange(0,32,4))]
print(sqller)
['select * from TABLO where CALL_DATE between trunc(sysdate)-4 and trunc(sysdate)-0',
'select * from TABLO where CALL_DATE between trunc(sysdate)-8 and trunc(sysdate)-4',
'select * from TABLO where CALL_DATE between trunc(sysdate)-12 and trunc(sysdate)-8',
'select * from TABLO where CALL_DATE between trunc(sysdate)-16 and trunc(sysdate)-12',
'select * from TABLO where CALL_DATE between trunc(sysdate)-20 and trunc(sysdate)-16',
'select * from TABLO where CALL_DATE between trunc(sysdate)-24 and trunc(sysdate)-20',
'select * from TABLO where CALL_DATE between trunc(sysdate)-28 and trunc(sysdate)-24',
'select * from TABLO where CALL_DATE between trunc(sysdate)-32 and trunc(sysdate)-28']

Fonksiyona gidecek tüm değerleri içeren “list of tuples”ı oluşturalım:

liste = [(_sessionPool1, sql, read_as_date, read_as_categor-y) for sql in sqller]

Fonksiyonumuzu çağıralım:

from memorytest import multiThreadOkuma
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(kanaladet) as tpe:
dfs= tpe.map(multiThreadOkuma, *zip(*liste))

Connection pool’u kapatalım:

_sessionPool1.close()

Son olarak her thread’den gelen sonuçları birleştirelim:

df_thread=pd.concat(dfs,ignore_index=True)

Rakamlar şöyle:
Thread’lerin çalışması: 23 dk, memory peak 91GB, widget son :74GB
Thread sonuçlarını birleştirme: 30 sn, memory peak 88, widget:86
Son hacim : 61 GB

Şimdi buraya baktığımızda aslında çok daha düşük bir memory kullanımı beklerdik, çünkü düz okumada en sonki optimizasyon sürecinden sonra 13 GB’a düşmüştü. Biz thread’ler içinde bu dönüşümü yapıyoruz, yine 13 görmeyi beklerdik. Öncelikle beklediğimiz dönüşüm olmuş mu diye info bakalım:

df_thread.info()
#dtypes: category(10), datetime64[ns](13), float32(30), float64(3), int16(2), int32(2), int8(6), object(29)

Düz okumanın aksine 34 değil sadece 10 kolon category olmuş, üstüne üstlük enteresan bir şekilde bazı nümerik kolonlar (her çalıştırmada değişebiliyor, bazen bir bazen birkaç kolon) da object’e dönmüş. Acaba her bir thread’deki durum böyle miydi, yoksa birleştirme sonrasında mı oldu, bakalım. (Kodları tekrar çalıştırmak gerekti, çünkü yukarıdaki birleştirme sırasında map sonucunda elde edilen generator yapısını erişilmez hale getirmiş oluyoruz.)

for d in dfs:
print(d.info())

#output
dtypes: category(34), datetime64[ns](13), float32(29), int16(2), int32(5), int8(7), object(5)
dtypes: category(34), datetime64[ns](13), float32(29), int16(1), int32(5), int8(8), object(5)
dtypes: category(34), datetime64[ns](13), float32(32), float64(1), int16(2), int32(2), int8(6), object(5)
dtypes: category(34), datetime64[ns](13), float32(29), int16(2), int32(4), int8(8), object(5)
dtypes: category(34), datetime64[ns](13), float32(29), int16(2), int32(3), int64(1), int8(8), object(5)
dtypes: category(34), datetime64[ns](13), float32(29), int16(3), int32(3), int8(8), object(5)
dtypes: category(34), datetime64[ns](13), float32(30), int16(2), int32(3), int64(1), int8(8), object(4)
dtypes: category(34), datetime64[ns](13), float32(29), int16(2), int32(3), int64(1), int8(8), object(5)

Thread’lerdeki veri tipi bazındaki rakamlar birbirinden farklı olmakla birlikte dönüşümler olmuş gibi görünüyor. Demek ki ne oluyorsa concat sırasında oluyor ve category kolonların bir kısmı tekrar object oluyor. Evet, aslında bu cateogry tipinin kırılgan doğasından kaynaklı ve bilinen bir durumdur ve size göstermek istediğim bir durumdu. Aşağıdaki linkte detaylı bir anlatım bulabilirsiniz.

Bizdeki kırılganlık sebebi şu: Diyelim ki 1.thread’daki category’ler A, B, C değerlerini içerirken 2.thread’de bir de ilaveten D olsun, işte o zaman bunlar birleşirken tekrar object olurlar. Nümeriklerin object’e dönüşmesi ise şöyle açıklanabilir: Farklı thread’lerde farklı veri tipleri (kiminde int8, kiminde float 16) ve farklı null gösterimleri (kiminde np.nan, kiminde None, kiminde pd.NA gelmiş olabilir) nedeniyle olabilirmiş.

O zaman concat edilmiş hali üzerinden bir kez daha optimizasyon kodunu çalıştıralım:

df_thread_opt=da.optimize_types(df_thread, read_as_date, read_as_category)
df_thread_opt.info()
#dtypes: category(34), datetime64[ns](13), float32(32), float64(1), int16(2), int32(2), int8(6), object(5)

İşte şimdi oldu. Bir de sys.getsizeof bakalım, o da 13 GB görünüyor. Ancak 3,5 dakika süren bu işlemle birlikte toplamda 27 dakika sürmüş oldu. Sonuçta süre olarak düz okumaya göre çok değişen bir şey yok ama memory’de daha avantajlı olduğumuz kesin.

Şimdi diyeceksiniz ki, madem sonda bir kez daha optimize ediyoruz neden thread’ler içinde de optimizasyon yaptık. Çünkü böyle olacağını bilmiyorduk(!). Bu arada tekrar object’e dönüş gibi bir durum olmazsa ikinci dönüşümü yapmayız, bunu da thread için dönüşümü çalıştırmadan bilemeyiz. Birazdan thread içinde dönüşüm yapılmamış haline de bakacağız.

Thread içinde Veri Dönüşümü Yapmadan Multithread Okuma

Yukarıda demiştik ki, CPU-bound bir iş olan data processing işlemleri yapmayacaksak multithreading tercih edilebilir. Hadi bu işi şimdi şöyle simüle edelim; thread’lere gönderilecek ana fonksiyondan veri tipi optimizasyonu kaldıralım, dönüşümü tıpkı düz okumada yaptığımız gibi nihai df üzerinde yapalım. Zaten az önce thread’ler içindeki optimizasyonun beklendiği gibi çalışmadığını ve sonra bir dönüşüm daha yapmak zorunda kaldığımızı görmüştük.

Düz okumaya göre bu senaryoda aslında sadece okumayı parçalara ayırarak süreyi kısaltmış olacağız, memory anlamında olumlu bir etki beklemiyoruz. (Ama unutmayın söz konusu Python ve Pandas ise beklenmedik durumlara hazır olmak lazım)

#memorytest.py dosyası içindeki fonksiyonumuz
def multiThreadOkumaNoDonusum(_sessionPool1, query, read_as_date, read_as_category):
connection=_sessionPool1.acquire()
df = pd.read_sql(query, connection)
_sessionPool1.release(connection)
return df
from memorytest import multiThreadOkumaNoDonusum
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(kanaladet) as tpe:
dfs= tpe.map(multiThreadOkumaNoDonusum, *zip(*liste))

Bu sefer thread’lerin çalışması 9,5 dakika sürdü. Peak memory: 109 GB, widget: 97 GB. Birleştirme 1 dakika sürdü ve birleştirme sırasında peak 118 GB, widget 115 GB, sys.getsizeof sonucu da düz okumadaki gibi 72 GB. Sonraki re-optimizasyon süresi ve memory artışları da düz okumayla benzer oldu.

Peki bir de 8 thread değil de 32 gün için 32 thread kullansak, hatta bir de mevcut durumda klasik for döngüsünden oluşan veri tipi optimizasyon fonksiyonunu multiprocess çalıştırsak nasıl olur? Bunun kodlamasını hayal etmeyi veya daha iyisi kendi veri setinizde yapmayı size bırakıyorum

Multiprocess Okuma

Son olarak artık multiprocess koda bakabiliriz:

#memorytest.py dosyası içindeki fonksiyonumuz
def multiProcessOkuma(query, usr, pwd, read_as_date, read_as_category):
with cx_Oracle.connect(usr, pwd,exadatahost + ":"+ str(exadataport) + '/'+SERVICE_NAME,encoding = "UTF-8", nencoding = "UTF-8") as connection:
df = pd.read_sql(query, connection)
df = da.optimize_types(df, read_as_date, read_as_category)
return df

Bu sefer liste içinde usr ve pwd de koyarız, sessionpool koymayız, çünkü her processte ayrı connection yaratılacak demiştik. Şimdi fonksiyona gönderilecek liste elemanlarını belirleyelim:

liste = [(sql, usr, pwd, read_as_date, read_as_category) for sql in sqller]

Fonksiyonu çağıralım:

from memorytest import multiProcessOkuma
from multiprocessing import Pool
with Pool(kanaladet) as p:
dfs= p.starmap(multiProcessOkuma, liste)

En hızlı çalışan bu oldu, 6 dakika. Üstelik memory değerleri de oldukça iyi, peak yaptığı nokta 115 GB olup son durumda widget 15 GB gösteriyor. (Peak noktasını bu senaryoda memit ile değil de widget’ı gözle takip ederek elde ettim, zira memit sadece tek bir process’tekine baktığı için 17 gösterdi.) Şimdi birleştirme yapalım:

df_cpu=pd.concat(dfs,ignore_index=True)

Birleştirme sırasında peak memory 28 GB, sonrasında widget 20 GB gösteriyorken, df için getsizeof bakınca ilginç şekilde 58 GB görüyoruz. İlginç olan bunun 58'e çıkması değil, bunu zaten bekliyorduk, yukarıda diğer senaryolarda gördüğümüz gibi category ve numeriklerin bazısı tekrar object oluyor sonuçta; ilginç olan widget’ın bu değerden daha düşük bir değer gösteriyor olması. Bunun için bir açıklama maalesef bulamadım. ChatGPT’ye de sorduğumda verdiği cevaplar çok tatmin etmedi açıkçası. Biz devam edelim. Info ile gerçekten category’lerin sayısını azaldığını görelim:

df_cpu.info()
#dtypes: category(10), datetime64[ns](13), float32(30), float64(3), int16(2), int32(2), int8(6), object(29)
#memory usage: 12.1+ GB

Şimdi tekrar bir optimizasyon yapalım:

df_cpu_opt = da.optimize_types(df_cpu,read_as_date,read_as_category)

Bu işlem de 2,5 dk sürmüş olup peak noktada memory 37 GB, widget ise son durumda 16 GB göstermektedir. df için getsizeof da beklendiği gibi 14 GB göstermektedir.

Bu yöntemler içinde süre açısından en iyisinin multiprocessing olduğunu görebiliyoruz. Memory açısından peak noktalara bakmak önemli, zira “dead kernel” hatasını peak nokta available memory’yi geçince alırız. Bu anlamda düz okuma en kötüsüyken diğer üçü yakın sonuçlar vermiş ama multiprcoessing en iyisi değil.

Burada kritik nokta şu: Toplam memory kullanımı 8 CPU’daki (Thread de olsa fark etmezdi.) memory toplamı kadar olacak. Üstelik veriler işlenirken geçici de olsa ara yapılar oluşabileceği (Bizim dönüşüm kodunda çok olmuyor ama sizde olabilir.) için toplam memory kullanımı artabilir. Peki bu ilave artış memory patlamasına neden olursa? Rakamsal gidecek olursak, bu yukarıdaki tablo için multiprocessing yöntemini seçtik ve elimizde o an 100 GB memory var diyelim ama peak noktada 115 GB oluyor, bu da dead kernel verir. O zaman çözüm nedir?

Chunklar Halinde Paralel Okumaya Genel Bakış

Evet, okunacak veri yine çok büyük ama bu sefer memory’nin yetmediğini fark etmiş olduk. Muhtemelen “chunk’lar halinde okurum,” diye düşünüyorsunuzdur; hem doğru hem yanlış. Şöyle ki, CSV okurkenki gibi chunk parametresini kullanmak işe yaramaz (buraya ve buraya bakın), zira aslında olayların oluş sırası özetle şöyledir: Önce cursor ile verinin tamamı okunur, sonra bu veri Pandas API’sine chunk’lar halinde paslanır. (Not: postgre gibi bazı RDBMS’lerde bir parametre ile gerçekten chunk parametresi işe yarar hale geliyormuş). Yani “chunking” aslında veri okunduktan sonra devreye girer, ama siz zaten veriyi okuyamamaktan şikayetçiyseniz chunk belirtmek çözüm değildir. Çözüm, veriyi veritabanından chunk’lar halinde okumaktır.

Şimdi yöntemimiz şu olacak: Veriyi partition’lı kolona göre bölüp yine farklı CPU’lara (veya thread’lere) dağıtacağız ve bu dağıttığımız her işin de veriyi chunk’lar halinde okumasını sağlayacağız. Aslında memory sorununu çözen kısım chunk’lar halinde okumaktır, farklı CPU’lara dağıtmak ise çalışma süresini düşürecektir.

Örneğin çalıştığınız makinede 30 CPU olsun. 300 günlük veri okuyacaksınız ve tarih kolonuna göre de partition yapılmış diyelim. O zaman her CPU’da 10 günlük veriyi paralel şekilde okuyabilirsiniz. Her bir güne ait veri diyelim ki 10 milyon satır olsun, bunları 1'er milyonluk chunk’lar halinde okuyup veri tipini optimize ederek sıradakine geçeceğiz ve 10 tur atacağız. Bunları bir tablo olarak gösterelim:

Okuyacağımız toplam veri miktarı 300 GB olsun (Bu Pandas’taki karşılığı, DB’de daha düşük olabilir). Memory’miz yetseydi, tek seferde multiprocess okuyabilirdik ve 300 GB’lık datayı optimizasyon sonrasında 60 GB’a indirmiş olurduk. Ama diyelim ki o sırada 100 GB memory müsait durumda; memory yetmediği için chunk’lar haline okursak, her bir CPU’daki ilk chunk grubunu optimize ettiğimizde toplam 6 GB veri olacak, 2.chunk sonrasında 12 GB ve böylece 10.chunk’a geldiğimizde 60 GB’a ulaşmış olacağız. Tabii chunk’sız okumaya göre daha uzun sürecek ama en azından memory’yi patlatmamış olacağız. Hiç paralelleştirme yapmazsanız, sürenin çok daha fazla süreceğini söylemeye de gerek yok.

Bu arada aklınıza veriyi PySpark veya Dask ile okumak gelebilir. Eğer single-node bir makinede çalışıyorsanız, CPU adediniz istediği kadar çok olsun memory ve tempspace kısıtları hep devrede olacaktır. Bu kütüphanelerin güzelliği bir cluster ortamında devreye girer. O zaman, veri memory problemi olmadan cluster’a parça parça dağıtılır ve sonra siz bu cluster’ın file system’i üzerinden parçayı flat file şeklinde okursunuz, ki bu okuma da Pandas gibi tek seferde tüm veriyi memory’ye alma şeklinde değil, lazy evaluation şeklinde olmaktadır, ama bunun detaylarına bu yazımızda girmeyeceğimiz söylemiştik. Bunlardan bahsetme sebebim, veriyi boş yere PySpark veya Dask ile okumaya çalışmamanız içindir.

Chunklar halinde farklı okuma yöntemleri

Pandas’ın API’si sizi yanıltmasın, her ne kadar pd.read_sql metodunun chunk parametresi olsa da yukarıda belirttiğimiz gibi aslında verinin tamamı önce memory’ye alınıyor, sonra Pandas API’sine chunk’lar halinde paslanıyor (en azından Oracle için böyle.). O halde bu chunk işlemini nasıl yapacağız? Bunun için benim tespit ettiğim kadarıyla 3 yöntem var.

  1. yöntem: SQLAlchemy , stream_results=True ayarlaması ile

Detaylarına buradan ulaşabileceğiniz bu yöntemi ben denedim, bende işe yaramadı, üstelik yukarıda bahsettiğim gibi cx_Oracle ile okuma yöntemi hem süre açısından hem memory açısından daha uygun. O yüzden burada bir kod örneği vermeyeceğim, dileyen linkten kod örneğine bakabilir.

2. yöntem: cursor + fetch_many()

Bu yöntemde cx_Oracle’ın cursor’ının fetch_many metoduna chunk_size miktarını vererek ilerliyoruz. Başka bir alternatif de cursor objesinin arraysize property’sine bu değeri atayıp fetch_many’yi parametresiz çalıştırmak da olabilir. Aşağıda, kullanılacak fonksiyonu bulabilirsiniz ancak bu da bende sürekli olarak “connection closed” hatası veriyor, sebebini tespit edemedim, muhtemelen bir ayar yapmak gerekiyor ama ben uğraşmadım, zira 3.yöntem benim işimi görüyor.

def fetch_data_in_chunks_WithCursor(query, usr, pwd, read_as_date, read_as_category, chunk_size):    
with cx_Oracle.connect(usr, pwd,exadatahost + ":"+ str(exadataport) + '/'+SERVICE_NAME,encoding = "UTF-8", nencoding = "UTF-8") as connection:
with connection.cursor() as cursor:
cursor.arraysize = chunk_size
cursor.prefetchrows = chunk_size
cursor.execute(query)
chunks=[]
while True:
rows = cursor.fetchmany()
if not rows:
break
cols = [row[0] for row in cursor.description]
chunk = pd.DataFrame(rows, columns=cols)
chunk = da.optimize_types(chunk, read_as_date, read_as_category)
chunks.append(chunk)
del chunk
return pd.concat(chunks,ignore_index=True)

Yukarıdaki arraysize, prefetchrows ve hataya sebep olan ancak ilave araştırma gerektiren diğer detaylar için kütüphanenin dokümantasyonuna bakabilirsiniz.

3. yöntem: Sql, “offset X rows fetch next N rows only” kullanımı

Bu yöntem çalışan tek yöntem oldu. Fonksiyonu aşağıdaki gibidir.

def fetch_data_in_chunks_withSQL(query, usr, pwd, read_as_date, read_as_category, chunk_size):
with cx_Oracle.connect(usr, pwd,exadatahost + ":"+ str(exadataport) + '/'+SERVICE_NAME,encoding = "UTF-8", nencoding = "UTF-8") as connection:
chunks = []
offset = 0
while True:
chunk_query = f"{query} OFFSET {offset} ROWS FETCH NEXT {chunk_size} ROWS ONLY"
chunk = pd.read_sql(chunk_query, connection)
chunk = da.optimize_types(chunk, read_as_date, read_as_category)
if chunk.empty:
break
chunks.append(chunk)
offset += chunk_size
return pd.concat(chunks, ignore_index=True)

Bu yöntemle düz multiprocessing’e göre bir tık yavaşlama bekliyorduk, nitekim öyle oldu. İlki 6 dakika sürerken bu 7 dakika sürdü. Memory değerleri ise tam da beklediğimiz gibi diğer tüm senaryolara göre çok daha iyiydi; peak noktada 62 GB, widget son durumda 22 GB. Bu arada bu değerlere chunk adedi 1 milyonken ulaştım. Chunk adedini 100 bin yaparsam süre 2 dakika daha uzuyor ama peak memory değeri 42 GB oluyor. Available memory miktarına göre siz de chunk değeri ile oynayabilirsiniz. Sonrasındaki süreç de normal multiprocessing’e benzer şekilde ilerledi. Diğer sayıları biraz daha aşağıda bulabilirsiniz.

from memorytest import fetch_data_in_chunks_withSQL
chunk=1000000
liste = [(sql, usr, pwd, read_as_date, read_as_category, chunk) for sql in sqller]
with Pool(kanaladet) as p:
dfs= p.starmap(fetch_data_in_chunks_withSQL, liste)

Özet

  • Düz okuma: Çok uzun süreceği gibi memory hatasına da neden olabilir
  • Paralel okuma(chunk’sız): Okuma hızını artırır, ama hala memory hatası alabiliriz.
  • Chunk’lar halinde paralel okuma: Okuma hızı paralele göre düşecektir ancak memory sorununu çözmüş oluruz.

Peki hangi yöntemi ne zaman tercih etmeliyiz?

  • Düz okuma: Küçük tablolarda veya sonucu az kayıt dönen sorgularda
  • Paralel okuma(chunk’sız): Büyük tablolarda veya sonucun çok sayıda kayıt dönen sorgularda, ancak memory sorunu olmayacaksa
  • Chunk’lar halinde paralel okuma: Büyük tablolarda veya sonucun çok sayıda kayıt dönen sorgularda, ve chunksız okuma durumunda memory sorunu olma ihtimali varsa.

Nihai Karşılaştırma Tablosu

Bu tabloda peak memory değeri, dead kernel hatası alıp almayacağımızın bir ölçüsü, son değer ise gerçek değer üzerine Jupyter/Pandas/Python üçlüsü tarafından eklenen miktarı görmek adına önemli.

Veri İşleme üzerine notlar

Processler Arası Veri Paylaşımı

Veriyi okuduk, şimdi bunu işleyeceğiz. Diyelim ki çalıştığımız makine için bize ayrılan kaynak 500 GB, datasetimiz ise veri tipi optimizasyonuna rağmen 50 GB.

Veri işlerken multithread çalışamayız, çünkü bu tür işler CPU-bound işlerdir. DataFrame’mizi CPU’lara dağıtırken hepsinde kopyalanacağını unutmamak lazım, zira datasetimiz büyük. 50 GB’lık DataFrame’i 10 CPU’ya dağıtsak patlatırız. Çözüm olarak az sayıda CPU’ya dağıtma yoluna gidilebilir. Tabi bu iş, ilgili DataFrame’in serialization’ını (geçici diske yazılması ve tekrar okunması) gerektirdiği için performans kayıpları da söz konusu olabilecektir. Dağıtılacak CPU adedi ve serialization süreci arasında bir trade-off var, deneme yanılmayla uygun CPU sayısını bulabilirsiniz.

Konu sadece DataFrame paylaşımı ile sınırlı değil. Process’ler arasında belli bir veri yapısını (list, dict v.s) da paylaşmanız ve buna eş zamanlı kayıt ekleme/çıkarma yapmanız gerekebilir. Böyle bir durumda Manager sınıfını kullanmanız gerekecektir. Bu biraz kompleks bir konu olduğu için detaylarına şu an girmeyi düşünmüyorum, arzu eden üstte verdiğim linki ve tabii resmi dokümantasyonu inceleyebilir.

Bir ihtimal da joblib paketini kullanmak olacaktır ki, bu arka planda veri paylaşım işini sizin için yapmaktadır.

Apply Metodu

Öncelikle buraya, buraya ve şuraya bakmanızı rica edeceğim. Buralarda gerekli detaylı açıklama var, ancak özetlemek gerekirse apply metodu her ne kadar az kod yazmayı sağlasa da uzak durulması gereken bir metottur. Elinizdeki veri seti küçükse, daha az kod yazabilme imkânı vermesi sebebiyle tercih edilebilir, ne de olsa işlemi 20 ms yerine 25 ms içinde yapması sizin için dert olmayacaktır. Ancak büyük bir veri setinde hem 3 dakika yerine 10 dakika sürebilir, hem de oluşturduğu ara yapılar (Part I’de görmüştük) nedeniyle memory patlamasına neden olabilir.

Peki çözüm ne? Mümkünse Pandas veya NumPy’ın vektörel işlemlerini kullanın. Örnek bir kod merak edenler, en yukarıda verdiğim optimize_types fonksiyonu içindeki comment’li satırlara bakabilir. Vektörizasyondan faydalanılamıyorsa klasik for döngüsü ile işlemi yapabilir veya yukarıda veri okumada kullandığımız multiprocessing yöntemini kullanılarak paralelleştirebilirsiniz. Emin olun klasik for döngüsü apply’a göre çok daha hızlı olacak ve ilave memory tüketimi olmayacaktır. (Başta verdiğim linklerdeki birkaç istisna dışında)

--

--

Volkan Yurtseven
Akbank Teknoloji

Once self-taught-Data Science Enthusiast,now graduate one