Observables nedir ? Neden kullanmalıyız ?

Ahmet Cem Kaya
kodlama
Published in
7 min readJun 23, 2018

Merhaba ilk blog yazımda size javascript’te son zamanlarda sıkça kullandığımız observables nedir ondan bahsedeceğim. Yazım hataları için şimdiden özür dilerim. Türkçe hep en kötü dersim olmuştur. :)

Aslında observables yeni moda olmuş olsada nerdeyse hep vardı diyebiliriz. Reactive programming yapısını kullanan rxjs’in angular ve react’de sıkça kullanılmasıyla popülerliği artmıştır.

REACTIVE PROGRAMMING

Reactive programlama’nın alt yapısı tamamen stream’e dayalıdır. Stream en basit şekilde şöyle açıklanabilir. Bir olay gerçekleştiğinde elimize öncelikle bir data gelir ve stream akış baslar. Bu data’yi ben başka bir fonksiyona modifiye etmesi icin devredebilirim(Akış devam eder. ornek: map). Veya data’yi direk alarak(subscribe) akışı bitirebilirim.

Rxjs’i ele alacak olursak. Her akış fonksiyonu (operators) observable donuyor. Bunun sebebi fonksiyonları birbirine bağlayarak zincir oluşturabilmek ve zincirin sonunda akışı bitirebilmek.

Rx.Observable.from([1,2,3,4,5])
.filter(val => val % 2 == 0)
.map(val => val*2 }).subscribe();

Yukarıda ki kodu Türkçe’ye çevirirsek.

[1,2,3,4,5] array’i ile observable oluştur. Stream’e(Akışa) sırasıyla 1,2,3,4,5 sayılarını ver. Sayılar önce filter fonksiyonuna düşecekler. Eğer sayı 2 ‘ye tam bölünüyorsa map fonksiyonuna geçecek ve 2 ile çarpılacak. Subscribe ile akışı başlatıyoruz ve bitiş noktasını belirlemiş oluyoruz.

Reactive programming koskoca bir dünya ve kalın kitaplara sahip olması aslında ne kadar geniş bir konu oldugunu göstermektedir.

Bu yazının konusu olan observables’i daha detaylı şekilde anlatmanın zamanı geldi. Observables tamamen observer dizayn paterni’nin implement edilmiş hali olduğu icin öncelikle observer paterni nedir onu bilmemiz gerekiyor.

OBSERVER PATTERN

En basit şekilde observer paternin amacı dinlenilen bir objenin degişmesi durumunda dinleyenlerin son durum hakkında bildirilmesidir.

Detaya girecek olursak Dinlenilen obje’ye Subject ve dinleyenlerede Observer dersek. Observer subject’e abone olabilir(subscribe). Bir subject’e birden fazla observer abone olabilir. Bu da one-to-many relationship yapıya örnek olur. Subject’de herhangi bir durum değişikliği tüm abonelerine bildirilir.

Eğer UML bilginiz varsa aşağıda ki diagram yardimci olabilir. Eğer diagram bilginiz yoksa en basit sekilde anlatılan kısmı anlamış olmanız yeterli olacaktır.

Yukarıdaki diagramda
dinlenilen obje = Subject
dinleyen obje = Observer

Subject birden fazla observer’a sahip olabileceği için elinde observerCollection var. Herhangi bir degisiklik durumunda bu observer’larin update fonksiyonunu çağırıyor. Bu sayede degisiklik durum bilgisini göndermiş oluyor. Bunu rxjs observable’da next fonksiyonuna örnek verebiliriz.( yazının ilerleyen kısımlarında next fonksiyonundan bahsedeceğim.)

RXJS’den BAGIMSIZ KENDI OBSERVABLE’IMIZI YAZALIM

Observables yukarıdaki kısımdan anlayacağınız gibi basit bir yapisi var. Implement etmesi de bir okadar basit olduğundan ve daha iyi anlayabilmek için önce kendi observable’mizi yazalım. Aşağıda yazacağım kodlar es6 standardı ile yazılmıştır.

Öncelikle Observable class’imizi oluşturalım.

class Observable {
constructor(observer) {
this.observer = observer;
}
subscribe(next, error, complete) {
this.observer(next, error, complete);
}

Observable’a abone olduğumuzda çağırdığımız observer fonksiyonu bulunmaktadır. Bu fonksiyon herhangi bir observer abone olduğunda gerçekleştirilecek data oluşturma mantığını kapsar. Ornek olarak Angular’da HttpClient.get yaptığımızda arkada çalışan asenkron http request mantığını verebiliriz. Öncelikle asenkron bir get request yapılıyor ve gelen sonucdan http response’u ayıklayarak bizim abone olurken verdiğimiz fonksiyonu çağırıyor. (callback) Eğer sonuç hatalı ise yine bizim verdigimiz error fonksiyonunu çağırıyor. İşlem tamamlandığında ise complete’i çağırıyor.

Basit bir data oluşturan fonksiyon ile observer oluşturalım.

var myObservable = new Observable((next, error, complete) => {
next(1); // data oluşturduk
next(2); // data oluşturduk
next(3); // data oluşturduk
})

Observer fonksiyonumuz sırasıyla 1,2,3 parametreleri ile next fonksiyonunu çağırıyor. Gördüğünüz gibi herhangi bir hata olmayacağı için error fonksiyonunu çağırmadık. Ayrıca complete fonksiyonunuda yazacağım örnekle bir ilişkisi olmayacağı için eklemedim.

myObservable.subscribe((res) => {
console.log(res);
});

Yukarıda oluşturduğumuz observable’a abone olmuş oluyoruz. Abone olurken next fonksiyonunu, yani arkadaki mantığın çağıracağı callback fonksiyonu veriyoruz. Data oluşturan fonksiyonda ki her next buraya verdiğimiz fonksiyon olduğunu umarım anlamışsınızdır. Fonksiyonumuz sadece gönderilen data’yi konsola yazdırmaktadır. 1,2,3

Ozaman biraz daha detaya inip rxjs’de yine sıkça kullandığımız map operatorü ve zincirleme özelliğini ekleyelim.

Öncelikle zincirleme mantığını açıklayayım. Rxjs’de operator dediğimiz fonksiyonlar bulunmaktadır. Bunlar observable class’inin icindeki fonksiyonlar olarak düşünebiliriz. Örneğin map operatorü gelen data’yi modifiye etmemize yarayan bir operatordür. Map operatorü de subscribe fonksiyonu gibi bizden callback fonksiyonu istemektedir. Burada data’nin nasıl modifiye edilip akışa verileceğini tanımlamamız gerekmektedir.

myObservable.map((x) => x * 6);

Yukarıda görüldüğü gibi subscribe ile akışı tamamlamak yerine akışa map fonksiyonu ile devam ettim.Bu fonksiyonu Türkçe’ye çevirecek olursak : her gelen x sayısını 6 ile carp ve akışa devam et demektir.

myObservable.map((x) => x * 6).subscribe((res) => {
console.log(res);
});

subscribe ederek akışı bitiriyoruz. Modifiye edilmiş sayılar subscribe’a verdiğimiz fonksiyona gelecektir.

Peki sirada bunu nasıl observable class’ımıza implement edeceğiz ondan bahsedelim.

map(func, error, complete) {
return new Observable((next) => {
this.subscribe((x) => {
var x = func(x);
next(x);
}, error, complete);
});
}

Yukarıda gördüğünüz fonksiyon ilk bakışta karışık gelebilir o yüzden adım adım anlatacağım.

Öncelikle map fonksiyonu yeni bir observable dönmek zorunda ki zincirlemeye kaldığı yerden devam edebilsin. Bu sayede map’i zincire ekledikten sonra subscribe veya başka bir operatorü zincire ekleyebiliriz.

Yeni observable oluşturmamız demek yeni data oluşturan fonksiyonu tanımlamamız gerek demektir.

(next) => {
this.subscribe((x) => {
var x = func(x);
next(x);
}

Yeni data fonksiyonumuz parametre olarak next dediğimiz bir fonksiyon kabul etmektedir. Bu next fonksiyonu akışta map’den sonra gelen fonksiyonu temsil etmektedir. Örneğin map’den sonra subscribe ile akışı tamamlarsak subscribe’a verdiğimiz callback fonksiyonu burada ki next’e denk gelecektir.

this.subscribe() ile map’den once gelen operator’e map mantığını gönderiyoruz. Bizim senaryomuzda map’den once gelen observable’in kendisi var yani ilk data oluşturan fonksiyona map icin hazırladığımız fonksiyonu gönderiyoruz. Fonksiyonda 3 tane next methodunu 1,2,3 parametreleri ile çağırıyorduk. Her next burada ki this.subscribe’a verdiğimiz callback fonksiyonuna denk gelecektir.

İlk data oluşturan fonksiyonumuzu hatırlayalım:

(next) => {
next(1);
next(2);
next(3);
}

next’in bir fonksiyon aldığını biliyoruz. Bizim ana observer fonksiyonuna gönderdiğimiz fonksiyon ise

(x) => {
var x = func(x); // Data'yi modifiye etmek icin bizim yazdigimiz fonksiyon.
next(x); // buradaki next akistaki map'den sonra gelen fonksiyon.
}

Kisacasi map fonksiyonunun ne yaptigini ozetlersek;

Data’yı nasil modifiye edecegi fonksiyonu alıyor.
Yeni observable oluşturuyor.
Eski observable’a subscribe oluyor ve oradan gelen data’yı modifiye eden fonksiyonla modifiye ediyor.
Akıştaki diğer fonksiyonu(next) çağırıyor.

Bütün kod’a tıklayarak ulaşabilirsiniz.

RXJS OBSERVABLES KULLANIM

En basit sekilde yazılabilecek observable’ımızı daha iyi anlamak icin yazdık. Ama çok daha kapsamlı bir kütüphanemiz var RXJS!

Ayni işi yapan kodu RXJS kütüphanesini kullanarak yazalım.

var observable = Rx.Observable.create((observer) => {   observer.next(1);   
observer.next(2);
observer.next(3);
});
observable.map(value => value *6)
.subscribe(value => console.log(value));

Gördüğünüz gibi data oluşturan fonksiyonu veriyoruz ve subscribe oluyoruz! Bizim yazdığımız ile rxjs ne kadar birbirine benzese de tabi ki bizim basit implementasyonumuz rxjs’in guvenlikli ve optimize koduna yaklaşamaz.

Her subscribe’in bir unsubscribe’i var

Bir observable’a subscribe oluyorsak islem tamamlandığında unsubscribe olmayı unutmayalım. Çünkü her subscribe memory’de yer kaplamaktadır ve Sıcak observable’lar programda istenilmeyen hatalara sebep olabilirler.

Otomatik Unsubscribe etmek icin kullanabilecegimiz iki tane operator bulunmaktadir. Take ve First operator u.

ornegin observable.pipe(take(1)) yaptigimizda bu observable sadece ilk gonderilen elemani alip observable’i sonlandiracaktir. Eger sicak observable kullaniyorsaniz ve surekli asynchronous data akisini acik tutmak istiyorsaniz bu yontem ise yaramayacaktir cunku ilk eleman’dan sonrasi gelmeyecektir.

.pipe(first()) de adindan anlasilacagi gibi ilk gelen eleman’i alip observable’i sonlandiriyor. Take(1) ve First operatorlerini observable’i sonlandirmak icin kullaniyorsak aralarinda cok bir fark yok. Aklinizda bulunsun eger observable data akisi olmadan tamamlanirsa first() operatoru hata verecektir. Take operatoru hata vermeyecektir. Ama genel olarak data akisi olmadan tamamladigimiz observable cok nadir olmaktadir.

Sicak observable’lar icin takeWhile operator u kullanilabilir. takeWhile operatoru bir sarta bagli olarak data akisinin devam edip veya sonlanacagini belirler. While loop sart’i gibi dusunebilirsiniz. Eger verilen sart saglaniyorsa akis devam edecektir.

PROMISE VS OBSERVABLES

AngularJS’e alışmış kişilerin Angular 2' ye geçerken en büyük soruları bu olsa gerek.

Promise ile ben aynı işi yapabiliyorum niye observable kullanayım ?
Observable’i promise’e çevireyim bildiğim yoldan gideyim ?

Diyorsanız sizi böyle alalım.

Evet sadece asenkron programlama yapmak istiyorsanız aslında promise’lerede ihtiyacınız yok. Direk eski callback stili ile bile Asenkron programlama yapabilirsiniz.
Fakat geliştirilebilirlik, esneklik, kolay okuma, entegrasyon kolaylığı istiyorsanız observables bu konuda en iyisi çünkü yukarıda bahsettigimiz gibi Reactive programlamanın esnekliği, geliştirilebilirliği ve stream özelliği developerlara çok kolaylıklar sağlamaktadır. Daha detaylı bir sekilde reactive programlamayı belki başka bir yazıda anlatmak isterim. Size tavsiyem reactive programming’i duymadıysanız once onu araştırmanız.

Örneğin Angular’da http client modülünde get kullandınız ve onu promise’e cevirdiniz.

myFunc() {
return this.http.get<any>(endpoint)
.toPromise() //promise'e cevirdik..then((response) => {this.logSuccess(response, endpoint, funcName); // logladik.return response; // response u donduk...}).catch((response) => { //hata kontrolu var ise handleError fonksiyonunu cagirdik.this.handleError(response, endpoint, funcName, errorParserOverride);});
}

Birde rxjs’in guzelliklerini kullanalim.

myFunc() {
return this.http.get<any>(endpoint)
.do(() => {
this.logSuccess(response, endpoint, funcName);
})
.catch(err => {
this.handleError(response, endpoint, funcName, errorParserOverride);
});
}

Observable demek , stream(Akış) demek. Biz bir akış oluşturuyoruz onu tamamlamak ise subscribe ile oluyor.

Ornegin ben bu logSuccess’i sadece belirli yerlerde yapılmasını istiyorsam onu akıştan çıkarabilirim ve o istediğim yerlerde akışa eklerim ve subscribe ederim.

myFunc().do(() => {
this.logSuccess(response, endpoint, funcName);
}).subscribe();
VEYAmyFunc().subscribe();

SICAK VE SOGUK (HOT AND COLD) OBSERVABLES

Eğer akışa verilen data observable’in ana fonksiyonun icinde oluşturuluyor ise soğuk observable oluyor. Yani yukarıda ki örnekler soğuk observable diyebiliriz.

data akışı data oluşturma fonksiyonunun dışında bir yerden tetikleniyor ve tüm aboneler ayni kaynaktan data’yi alıyorsalar bu observable sıcak olmuş oluyor.

Soğuk observable’a abone olduğumuzda tüm data akışı baştan tetiklenir.
sıcak observable’a abone olduğumuzda zaten bir data akışı vardır ve ona ortak olmuş oluruz.

Sıcak observable’a ornek verecek olursak Rxjs kütüphanesinden fromEvent operatoru ile oluşturulan observable’lar birer sıcak observable’dir.

Rx.Observable.fromEvent(document, 'click').subscribe();

Browser mouse click event’i gerçekleştiğinde tüm abonelere aynı kaynaktan data gelir.

Soğuk observable’i sıcak observable’a yine rxjs operatoru kullanarak gerçekleştirebiliriz.

var observable = Rx.Observable.create((observer) => {   observer.next(1);   
observer.next(2);
observer.next(3);
}).share();

share operatorü ile gönderilen 1,2,3 dataları tekrar abone olan birisine gönderilmeyecektir. Eğer ilk fonksiyon tetiklenip bitmeden once ikinci bir abonelik gelmişse son abone olan baştan değil, ana fonksiyonun kaldığı yerden dataları almaya başlayacaktır.

YENI STIL RXJS: ZINCIRLEME YERINE PIPE KULLANALIM

Rxjs kütüphanesi her geçen gün yenileniyor ve yeni özellikler geliyor. Klasik zincirlemede çıkan bazı sorunlar nedeniyle( bende bilmiyorum o sorunları, Javascript bazen hayal kırıklığına uğratabiliyor..) daha esnek ve iyi olan yeni pipe metodunu eklediler.

Sadece istenilen operatorleri import ederek kullanabiliyoruz. Bu aplikasyon’un boyutundan da kazanmamızı sağlıyor. Kendi operator fonksiyonlarımızı yazabiliyoruz. Bu da tekrar kullanılabilinir internal operatorler oluşturmamızı sağlıyor.

Rx.Observable.interval(1000)
.pipe(
filter(x => x % 2 === 0),
map(x => x + x))
.subscribe(x => console.log(x))

pipe adından da anlaşılacağı gibi BORU. Su yerine data akıyor gibi düşünelim. Önce filtreliyor sonra mapliyor ve pipe bittikten sonra subscribe olarak akisi tamamlıyoruz.

Ayni kodu klasik zincirleme ile yazarsak;

Rx.Observable.interval(1000)
.filter(x => x % 2 === 0)
.map(x => x + x))
.subscribe(x => console.log(x))

Java veya C#’da reactive programming yapmış kisilere bu klasik zincirleme daha cazip gelsede Rxjs dünyası pipe yolunda ilerlemeye karar verdi.

--

--