Angular RxJS Kütüphanesi nedir ?

Emircan Omak
7 min readNov 26, 2022

--

Son zamanlarda sıkça duyulan “rxjs” kütüphanesini bu yazımda birlikte inceleyeceğiz. Genel olarak rxjs, olay ve veri kaynaklarını abone olunabilir(subscribable) nesnelere dönüştürüp, bunlar üzerinde operatörler yardımıyla dönüşümler gerçekleştirilebilen, Observer aracılığıyla kod üretimini kolaylaştıran JavaScript’le yazılmış bir reaktif programlama(reactive programming) kütüphanesidir.

Verileri bütün olarak değilde küçük parçalar halinde transfer etmemizi sağlayan bu yapının sınıf ve nesnelerini inceleyelim.

Observable Nedir ?

Observable zaman içerisinde gelebilecek veri akışını veya kaynağını temsil eder. Neredeyse her şeyden Observable yaratabilirsiniz. RxJs’te Observablelar genellikle olaylardan(butona veya sayfaya tıklama, arama kutusuna değer yazma veya uygulama sayfasının değişmesi vb.) oluşturulur. Observable yaratmanın en kolay yolu; RxJs’te bulunan “built-in observable” fonksiyonlarını kullanmaktır.

Observable için bir kaç karakteristik özelliklere sahip fonksiyon diyebiliriz. İçinde “next, error, complate” metotlarına sahip bir nesne olan “observer” alır ve iptal mantını döndürür.

Bir observable; abonelik yapıldıktan sonra değer yaymaya başlar. Peki nedir bu abonelik ?

Subscription(Abonelik)

Subscription; her şeyi harekete geçiren işlemdir. Bir dokunuşla(subscribe) su akmaya başlayan bir musluk gibi düşünebilirsiniz. Birinin sadece musluğu çevirmesi gerekiyor. Musluktan su akışınn gözlenebilir(observable) yapılması rolü aboneye(subscriber) aittir.

Bir subsciption yaratmak için subscribe( ) metodunu çağırmak gerekir. Observer olarakta bilinen, bir fonksiyon veye nesne sağlayan subscribe( ) metodunun çağrılması gerekir. Reactive Programlamada; her olaya nasıl tepki vereceğine karar verebileceğiniz yer burasıdır.

Pipe

Pipe fonksiyonu, observable veri kaynağınızdaki operatörler ile montaj hattıdır. Tıpkı bir arabanın bitmiş halinin fabrikadan çıkmadan önce uğradığı bir dizi üretim bandı gibi düşünebilirsiniz. Kaynakan gelen veriler, ihtiyaca göre filtreleme, dönüştürülme gibi süreçlerden geçebilir. Pipe fonksiyonunda, observable zinciri içinde 3 veya daha fazla fonskiyon kullanmak olağan dışı bir durum değildir. Örneğin observable ile oluşturulmuş bir arama işleminde yapılan işlemleri optimize etmek için birden çok operatör kullanılabilir.

Operatörler

Operatörler, bir kayaktan gelen değerleri manipüle etmemize yardımcı olurlar ve dönüştürülen değerlerin observable değerini döndürürler. JavaScript Array metotlarına aşına iseniz, RxJS operatörlerinin çoğu size tanıdık gelecektir. Gelin sık kullanılan operatörleri inceleyelim.

of Operatörü

of Operatörü, JavaScript’ten aşina olduğumuz .map( ) fonksiyonu gibi davranır. Farkı; elinizde bir dizi olmasa bile istediğiniz sayıda ve türde veriyi bir dizi gibi bir araya getirip observable yayar. Örneğin elimizde aşağıdaki gini birbiri ile alakasız bazı veriler olsun.

1 - “mike” - true - 4 - { country: “German” }

of() ile bu verileri dizi gibi sıra ile console’a yazdıralım:

const source = of(1, “mike”, true, 4, { country: “German” });
const subscribe = source.subscribe((val) =>
console.log(val)
)

Çıktısı şu şekilde olacaktır:

1
mike
true
4
{ country: ‘German’ }

from Operatörü

from diğer nesneleri ve türlerini Observable’a dönüştürür. Bunlar Promise, dizi veya benzeri nesneler olabilir.

Gerçek bir stream olması için htpps://swapi.dev/ sitesinden /people Endpoint’i ile bir data çekelim ve bunu from ile observable’a çevirelim.

import express from "express";
import axios from "axios";
import { from } from "rxjs";
const opfrom = express.Router();opfrom.route("/").get(async (req, res, next) => {
await axios.get("https://swapi.dev/api/people").then((result) => {
const array = from(result.data.results);
array.subscribe((val) => console.log(val));
});
});
module.exports = opfrom;

Görüldüğü üzere gelen datayı from operatörü ile observable’a çevirip daha sonra buna .subscirbe( ) metodu ile abone olabiliyoruz.

Çıktısı şöyle olacaktır:

{
name: ‘Luke Skywalker’,
height: ‘172’,

}
{
name: ‘C-3PO’,
height: ‘167’,

}
...

interval Operatörü

Önceden belirlenmiş zaman aralıkları ile ardışık Observable sayılar çıktısı verir.

import express from "express";
import { interval } from "rxjs";
const opinterval = express.Router();opinterval.route("/opinterval").get(async (req, res, next) => {
//emit value in sequence every 1 second
const source = interval(1000).subscribe((val) => console.log(val));
});
module.exports = opinterval;

Çıktısı:

1
2
3
...

timer Operatörü

Observable bir değer yayınlamaya başlamadan önce istenen süre kadar işlemi bekletmeye yarar.

import express from "express";
import { timer, of } from "rxjs";
import { concatMapTo } from "rxjs/operators";
const optimer = express.Router();optimer.route("/optimer").get(async (req, res, next) => {
example - 1;
const source = timer(1000);
//output: 0
const subscribe = source.subscribe((val) => console.log("example-1: " + val));
//example-2
const source2 = timer(1000, 2000);
//output: 0,1,2,3,4,5......
const subscribe2 = source2.subscribe((val) =>
console.log("example-2: " + val)
);
// example-3
// This could be any observable
const source = of(1, 2, 3);
const result = timer(3000)
.pipe(concatMapTo(source))
.subscribe((x) => console.log("example-3:" + x));
});
module.exports = optimer;

tap Operatörü

tap operatörü ile logging gibi çıktıyı etkilemeyecek işlemler yapabilirsiniz.

import express from "express";
import { of } from "rxjs";
import { tap, map } from "rxjs/operators";
const optap = express.Router();optap.route("/optap").get(async (req, res, next) => {
const source = of(1, 2, 3, 4, 5)
.pipe(
tap((val) => console.log(`map'ten önce: ${val}`)),
map((val) => val + 10),
tap((val) => console.log(`map'ten sonra: ${val}`))
)
.subscribe((val) => console.log(val));
});
module.exports = optap;

Çıktısı:

map’ten önce: 1
map’ten sonra: 11
11
map’ten önce: 2
map’ten sonra: 12
12
map’ten önce: 3
map’ten sonra: 13
13
map’ten önce: 4
map’ten sonra: 14
14
map’ten önce: 5
map’ten sonra: 15
15

throwError Operatörü

Veri akışı sırasında hata oluşursa, hatayı ve hataya dair bir mesajı yazdırabileceğiniz bir operatördür.

import express from "express";
import { throwError, of } from "rxjs";
import { concatMap } from "rxjs/operators";
const opthrowerror = express.Router();opthrowerror.route("/opthrowerror").get(async (req, res, next) => {
const delays$ = of(1000, 2000, 3000, 4000, "5000");
delays$
.pipe(
concatMap((val) => {
if (typeof val === "number") {
return of(val);
} else {
// This is probably overkill.
return throwError(() => new Error(`Invalid number ${val}`));
}
})
).subscribe({
next: (val) => console.log(val),
complete: () => console.log("Complete!"),
error: (val) => console.log(`Error: ${val}`),
});
});
module.exports = opthrowerror;

filter Operatörü

Akış ile yayılan değerleri filter operatörü ile filtreleyebilirsiniz. Bu filtre kuralı sizin belirleyeceğiniz bir kural olacaktır. Aşağıdaki örnekte StarWars server’ından gelen people bilgilerinden boy uzunluğu konsola bastırılmaktadır. Filtreleme kuralı 100 değerinden büyük olmasıdır.

import express from "express";
import axios from "axios";
import { from } from "rxjs";
import { filter } from "rxjs/operators";
const opfilter = express.Router();opfilter.route("/opfilter").get(async (req, res, next) => {
await axios.get("https://swapi.dev/api/people").then((result) => {
const array = from(result.data.results);
array
.pipe(filter((val) => parseInt(val["height"]) > 100))
.subscribe((val) => console.log("height: " + parseInt(val["height"])));
});
});
module.exports = opfilter;

map Operatörü

map operatörü ile iterable bir nesnenin her bir öğesi için transformasyon uygulayabiliriz. Bu operatörler JavaScript’in .map( ) fonksiyonuna çok benzer bir şekilde çalışır. Farkı ise; var olan bir dizideki öğeler üzerinde değil de, akış(stream) anında elde edilen (bir observable’dan emin olan) öğeler üzerinde değişiklikler yapmaya yarar.

Aşağıdaki örnekte server’dan gelen nesnenin bir field’ı alınıp integer değere çevriliyor ve buna 50 eklenerek consol’a yazdırılıyor.

import express from "express";
import axios from "axios";
import { from } from "rxjs";
import { map } from "rxjs/operators";
const opMap = express.Router();opMap.route("/").get(async (req, res, next) => {
await axios.get("https://swapi.dev/api/people").then((result) => {
const array = from(result.data.results);
array
.pipe(map((val) => parseInt(val["height"]) + 50))
.subscribe((val) => console.log(val));
});
});
module.exports = opMap;

Burada .pipe( ) metodu göze çarpmakta. Bu metod ile birden fazla operatör virgüller ile ayrılarak kullanılabilir. Yani bir değer bir çok kez arka arkaya işlenebilir.

map operatörü Observable döndürdüğü için elde edilen değere abone olarak(subscribe) ulaştığımıza da dikkat edelim.

scan Operatörü

Akış ile gelen verilerin sonuncusunu ve sonuncusundan bir öncekine kadarkilerin toplamını ile işlem yapılmasını sağlar.

Aşağıda bir dizide scan ile döngü yapıp gelen elemanı öncekilerin toplamına ekleyen bir örnek görüyoruz.

import express from "express";
import axios from "axios";
import { from, Subject } from "rxjs";
import { scan, map } from "rxjs/operators";
const opScan = express.Router();opScan.route("/opscan").get(async (req, res, next) => {
const array = from([10, 20, 30, 40, 50, 60]);
array
.pipe(
map((val) => {
return val;
}),
scan((total, n) => {
return total + n;
}),
map((sum, index) => sum)
)
.subscribe(console.log);

});
module.exports = opScan;

Çıktısı:

Infinity
30
30
33.333333333333336
37.5
42

concatMap Operatörü

Bu operatör veri akışında aktif subscription işlemi bitmeden bir sonrakini başlatmaz ve veri akışını, veri kaynağındaki sıraya göre sağlar. Bir önceki veriyi alıp sonrakinde değişken olarak kullanmak mümkündür.

import express from "express";
import { of } from "rxjs";
import { concatMap, delay, mergeMap } from "rxjs/operators";
const opconcatmap = express.Router();opconcatmap.route("/opconcatmap").get(async (req, res, next) => {
// If order is important at getting data from source then concatMap is usefull
const source = of(3000, 2000, 1000)
.pipe(concatMap((val) => of(`Delayed by: ${val}ms`).pipe(delay(val))))
.subscribe((val) => console.log(`With concatMap: ${val}`));
});
module.exports = opconcatmap;

Çıktısı :

With concatMap: Delayed by: 3000ms
With concatMap: Delayed by: 2000ms
With concatMap: Delayed by: 1000ms

şeklinde olacaktır ve gecikme işlemi; kaynaktaki verilerin sırasına göre olacağı için ilk veri 3sn sonra ikinci veri 2sn sonra ve son veri 1sn sonra basılacaktır.

mergeMap Operatörü

mergeMap, concatMap’in aksine veri kaynağındaki sıraya bakmaksızın ve bir önceki veri işleminin bitmesine dikakt etmeksizin hızlı bir şekilde tüm subscription’ları arka arkaya basar.

import express from "express";
import { of } from "rxjs";
import { delay, mergeMap } from "rxjs/operators";
const opmergemap = express.Router();opmergemap.route("/opmergemap").get(async (req, res, next) => {
// mergeMap handles the data without order unlike concatMap, in this case mergeMap result inner subscription without to wait previous datas ends And does it so fast
const source = of(3000, 2000, 1000)
.pipe(mergeMap((val) => of(`Delayed by: ${val}ms`).pipe(delay(val))))
.subscribe((val) => console.log(`With mergeMap: ${val}`));
});
module.exports = opmergemap;

Çıktısı:

With mergeMap: Delayed by: 1000ms
With mergeMap: Delayed by: 2000ms
With mergeMap: Delayed by: 3000ms

şeklinde olup, sıraya dikkat edilmediği gibi konsolda gözlemleneceği üzere; bir önceki subscription sonucunu bir sonrakinde değişken olarak kullanmadığı için gecikme fonksiyonuda sağlıklı çalışmamaktadır.

switchMap Operatörü

mergeMap ve concatMap’den ayıran özellik; bir veri switchMap ile yayıldığında bir önceki yayılma(emission) iptal edilir ve yeni sonuca abone(subscribed) olunur.

Aşağıdaki örnekte map ve switchMap operatörleri kıyaslanmaktadır. Görüldüğü gibi map operatörü Observable değerler yayarken, switchMap iç Observable(inner Observable) sonucu verip buna abone olur ve iç Observable’ın sonucunu yayar.

import express from "express";
import { of } from "rxjs";
import { map, switchMap } from "rxjs/operators";
const opswitchmap = express.Router();opswitchmap.route("/opswitchmap").get(async (res, req, next) => {
let obs = of(1, 2, 3, 4);
//Using MAP
obs
.pipe(
map((val) => {
return val * 2; //Returning Value
})
)
.subscribe((ret) => {
console.log("Recd from map : " + ret);
});
// Using SWITCHMAP
obs
.pipe(
switchMap((val) => {
return of(val * 2); //Returning Value
})
)
.subscribe((ret) => {
console.log("Recd from map : " + ret);
});
});
module.exports = opswitchmap;

Sonuç

Bu yazımda RxJS kütüphanesi ve sık kullanılan operatörleri üzerinde durdum. Bunların dışında çok daha fazla operatör mevcut. Diğer operatörleri incelemek ve RxJS hakkında daha detaylı bilgi için:

Bir sonraki RxJS yazısında görüşmek üzere :)

Kaynak :

Örnek kodların verildiği gitHub adresi:

--

--