Angular’da RxJS

Selen Ece Özbay
6 min readNov 30, 2022

--

Angular öğrenirken sıkça karşılaştığım RxJS ‘in ne olduğunu araştırırken benim gibi programlamaya yeni giriş yapmış yazılımcılar için bir kaynak oluştumaya karar verdim. Bu yazıda RxJS nedir bundan bahsedeceğim.

RxJS Nedir?

RxJs gözlemleneblir dizileri kullanarak asenkron ve tepkisel olay tabanlı Single Page Aplications (SPA) oluşturmak için kullanılan JavaScript ile yazılan bir kütüphanedir. Reactive Programlama için tam bir çözüm değildir ancak asenkron ve olaylardan kaynaklanan hatalarda iyi çözümler sunar.

RxJS’i anlamayı kolaylaştıracak bazı sınıf ve nesneler,

  • Observable: RxJS’in temel sınıfıdır. Zaman içerisinde gelebilecek bir veri akışını veya kaynağını temsil eder. RxJs’te genel olarak butona veya sayfaya tıklama, arama kutusuna değer yazma veya uygulama sayfasının değiştirilmesi gibi olaylarda kullanılır. Veri akışını tüketmek amacıyla her Observable’da bir subscribe metodu bulunur.
  • Observer: Observable’a abone olurken (subscribe) kullanılan artçı çağrılardır. İçerisinde next, error ve complete metodlarını barındırır.
import {
NextObserver,
ErrorObserver,
CompletionObserver,
} from "rxjs";

const nextObserver: NextObserver<string> = {
next: (value: string) =>
console.log(`[nextObserver] next`, value),
};

const errorObserver: ErrorObserver<string> = {
error: (error: Error) =>
console.error(`[errorObserver] error`, error),
};

const completeObserver: CompletionObserver<string> = {
complete: () =>
console.log(`[completeObserver] complete!`),
};
  • Subscribtion (Abonelik): Subscribtion, bir Observable’ın ne zaman dinlenmesi gerektiğine ve onu dinlemeyi ne zaman bırakabileceğimize karar veren bir aktördür. Her abonelik bir “subscribe” ile başlar ve “unsubscribe” ile biter. Bellek sızıntısı gibi sorunları önlemek için aboneliğe ihtiyaç duyulmadığında “unsubscribe” çağırmak önemlidir.
import { Subscription } from "rxjs";

const observer = (value: string) =>console.log(`[unsubscribe method] next`,
value)

const subscription: Subscription = observable.subscribe(observer);
subscription.unsubscribe();
  • Pipe: Pipe fonksiyonu, bir observable’ı girdi olarak alan ve başka bir gözlemlenebilir döndüren işlevlerdir. Observable girdi değişmez ancak işlev yeni bir tane döndürür.Kaynakan gelen veriler, ihtiyaca göre filtreleme, dönüştürülme gibi süreçlerden geçebilir. RxJS kütüphanelerinin tree-shakeable(ihtiaç duyulmadıkça import edilmeyen ve bundle boyutunu büyütmeyen) olmasını sağlar. Third-party operatör yazmayı ve kullanmayı kolaylaştırır.

| Pipe fonksiyonuna verilen tüm fonksiyonlar aynı tipte değer döner.

import { Observable } from "rxjs";
import { map, tap } from "rxjs/operators";

new Observable<number>(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);

observer.complete();
})
.pipe(
map(val => val * 2),
tap(res => {
console.log("[pipeable tap]", res);
})
)
.subscribe();

Yukarıda yapılan işlemin çıktısı;

[pipeable tap] 2
[pipeable tap] 4
[pipeable tap] 6
[pipeable tap] 8

Aşağıda sizler için en sık kullanılan bazı operatörler listelenmiştir.

  1. Creation Operatörler:
  • of Operatörü, bir argümanı gözlemlenebilir bir argümana dönüştürmek için kullanılır. Bir örnekle anlatılmak istenirse,
import { of } from "rxjs";

of([a,e,i,o,u], b, c, d).subscribe({
next: res => console.log("[of]", res),
complete: () => console.log("[of] complete"),
});
// console'da yer alan çıktı aşağıdaki gibi olacaktır.

[of] [ a, e, i, o, u]
[of] b
[of] c
[of] d
[of] complete

Bu operatör bir argüman listesi alır ve bunları gözlemlenebilir değerlere dönüştürür;

şekil.1
  • from Operatörü, bir Diziden Gözlemlenebilir, dizi benzeri bir nesne, bir Söz, yinelenebilir bir nesne veya Gözlemlenebilir benzeri bir nesne oluşturur. Başlangıçta yer alan dizide Şekil2'deki değişim gözlemlenir.
from([1, 2, 3]).subscribe(res => console.log("[from]", res));

// console'da yer alan çıktı aşağıdaki gibi olacaktır.

[from] 1
[from] 2
[from] 3

şekil.2
  • interval Operatörü, belirli bir zaman aralığında sıralı sayılar yazan bir Gözlenebilir oluşturur.
import { interval } from "rxjs";

console.log(new Date().toLocaleTimeString(), "[interval] start");
interval(1000).subscribe({
next: res => console.log(new Date().toLocaleTimeString(), "[interval]", res),
complete: () =>
console.log(new Date().toLocaleTimeString(), "[interval] complete"),
});

// console'da yer alan çıktı aşağıdaki gibi olacaktır.

14:15:10 [interval] start
14:15:11 [interval] 0
14:15:12 [interval] 1
14:15:13 [interval] 2
14:15:14 [interval] 3
14:15:15 [interval] 4
...
...
...
  • timer Operatörü: 0 sayısını yaymadan önce milisaniye cinsinden belirli bir süre veya kesin tarih için bekleyecek bir gözlemlenebilir oluşturur.
import { timer } from "rxjs";

console.log(new Date().toLocaleTimeString(), "[timer] start");
timer(11000).subscribe({
next: res => console.log(new Date().toLocaleTimeString(), "[timer]", res),
complete: () =>
console.log(new Date().toLocaleTimeString(), "[timer] complete"),
});
//console'da yer alacak çıktı aşağıdaki gibidir.

14:14:34 [timer] start
14:14:45 [timer] 0
14:14:45 [timer] complete

2. Filtering Operatörler:

  • filter Operatörü, Observable kaynağı tarafından yayılan öğeleri, yalnızca belirli bir yüklemi karşılayanları yayarak filtreler.
import { Observable } from "rxjs";
import { filter } from "rxjs/operators";

const input$ = new Observable<number>(subscriber => {
let count = 0;
const id = setInterval(() => {
if (count < 10) {
subscriber.next(++count);
} else {
clearInterval(id);
subscriber.complete();
}
}, 1000);
});

input$.pipe(filter(x => x % 2 === 0)).subscribe({
next: x => console.log(`${new Date().toLocaleTimeString()} - [filter]: ${x}`),
});

// console'da yazacak çıktı aşağıdaki gibidir

11:46:43 - [filter]: 2
11:46:45 - [filter]: 4
11:46:47 - [filter]: 6
11:46:49 - [filter]: 8
11:46:51 - [filter]: 10
Şekil.3: filter operatörünün çalışması

3. Transformation Operatörler:

  • map Operatörü, gözlemlenebilir kaynak tarafından yayınlanan her değere belirli bir proje işlevi uygular ve elde edilen değerleri bir Gözlemlenebilir olarak yayınlar. map operatörü, dizilerin harita işlevi gibidir. Argüman olarak iletilen işlevi kullanarak değeri dönüştürür.
import { interval } from 'rxjs';
import { map, take, tap } from 'rxjs/operators';

const source$ = interval(1000, ).pipe(
take(5),
tap(val => {
console.log(`${new Date().toLocaleTimeString()}: Generated`, val);
}),
)

source$.pipe(
map(value => Math.pow(value, 2))
)
.subscribe({
next: value => console.log(`${new Date().toLocaleTimeString()}: map`, value),
complete: () => console.log(`${new Date().toLocaleTimeString()}: map: complete`),
})

// console'da yazacak çıktı aşağıdaki gibi olacaktır.

16:58:51: Generated 0
16:58:51: map 0
16:58:52: Generated 1
16:58:52: map 1
16:58:53: Generated 2
16:58:53: map 4
16:58:54: Generated 3
16:58:54: map 9
16:58:55: Generated 4
16:58:55: map 16
16:58:55: map: complete
  • scan Operatörü, akış ile gelen verilerin sonuncusunu ve sonuncusundan bir öncekine kadarkilerin toplamını ile işlem yapılmasını sağlar.
import { of } from 'rxjs';
import { scan } from 'rxjs/operators';

const source$ = of(1, 2, 3, 4, 5);

source$.pipe(
scan((acc, curr) => acc + curr, 0)
)
.subscribe({
next: value => console.log(`${new Date().toLocaleTimeString()}: scan`, value),
})

// console'da yazacak çıktı aşağıdaki gibi olacaktır.

18:20:26: scan 1
18:20:26: scan 3
18:20:26: scan 6
18:20:26: scan 10
18:20:26: scan 15

Bu operatör , dizinin reducer yöntemine benzer. Bir akümülatör oluşturulabilir ve kaynaktan her yeni bir değer yayıldığında bu akümülatörü güncellenebilir ve sonuç olarak döndürülebilir.

4. Utility Operatörler:

  • tap Operatörü, gözlemlenebilir kaynaktan gelen bildirimler için yan etkileri gerçekleştirmek için kullanılır. Bu operatör, yürütülmesi sırasında observable hakkında bazı bilgiler edinmemize yardımcı olur. Bu operatör aboneye eşittir ve bilgi almak için üç yöntem kullanır: next, error, complete.
import { of } from 'rxjs'
import { tap } from 'rxjs/operators'


of('a', 'b', 'c')
.pipe(
tap({
next: x => console.log(`tap: ${x}`),
complete: () => console.log('tap: complete'),
error: err => console.log(`tap: error: ${err}`)
})
)
.subscribe()

// console'da yazacak çıktı aşağıdaki gibi olacaktır.

tap: a
tap: b
tap: c
tap: complete

5. Conditional Operatörler:

  • find Operatörü, yalnızca bazı koşulları karşılayan Gözlemlenebilir kaynak tarafından yayınlanan ilk değeri yayar. Bu operatör dizilerdeki find metodu gibidir. Observable’daki bir koşulu karşılayan bir değer bulmak için kullanılabilir. Operatör koşula uyan bir değer bulduğunda observable tamamlanır.
import { Observer, of } from "rxjs";
import { find } from "rxjs/operators";

const observer: Observer<number | undefined> = {
next: x => console.log('value', x),
error: err => console.error('error', err),
complete: () => console.log('complete'),
};

of(1,2,3,4,5,6,7,8,9).pipe(
find(val => val === 5)
).subscribe(observer);

// console'da yazılacak değer aşağıdaki gibi olacaktır.

value 5
complete

Sonuç

Bu yazımda Angular öğrenme sürecinde SPA oluştururken sıkça karşılaştığım RxJS kütüphanesi ve bu kütüphaneye ait sıkça kullanılan operatörlerden bahsettim. RxJs çok kapmamlı bir kütüphane olması nedeniyle içeriğinde pek çok operatör barındırmaktadır. Eğer aradığınızı bulamadıysanız aşağıda yer alan bağlantıları ziyaret edebilir ve daha detaylı bilgiye ulaşabilirsiniz. ✌

Kaynaklar:

https://rxjs.dev/

https://dev.to/puppo/series/13743

https://bilisim.io/2019/10/08/rxjse-merhaba-deyin/

https://bilisim.io/2019/10/20/rxjse-yakindan-bakis-1-observables/

https://thrkardak.medium.com/rxjse-merhaba-deyin-1dd612b72e58

--

--