Wróć do strony głównej
RxJS

RxJS w Angular – co wypada wiedzieć

Tym razem post skierowany do początkujących deweloperów Angulara, dla których pierwsze zderzenie z biblioteką RxJS może stanowić nie lada problem. W tym artykule podsumuję zagadnienia, z jakimi początkujący programista/ka powinien być zaznajomiony. Samo mięsko, minimum niezbędnej wiedzy!

Jeśli nie poznałeś/aś jeszcze liba RxJS – jest to biblioteka do reaktywnego programowania, Angular korzysta z niej aby pomóc sobie z asynchronicznością i przekazywaniem danych pomiędzy komponentami.

Lista tematów, przez które wspólnie przejdziemy:

  1. Czym jest Observable, Observer
  2. Promise vs Observable
  3. Strumienie w Angular
  4. Subskrypcja do strumieni
  5. Cold vs Hot Observable
  6. Wiele subskrypcji do jednego strumienia
  7. Unsubscribe
  8. Kiedy korzystać z unsubscribe()
  9. AsyncPipe
  10. Operatory
  11. Importowanie
  12. Subjects
  13. Konwencja nazewnicza

Brzmi zachęcająco? No to wio 😉

1. Czym jest Observable, Observer

Zapewne nie raz już zapisałaś/eś się do newslettera na stronie z ulubioną tematyką, klikając przycisk „Subscribe”, który najczęściej jest umieszczony w widocznym miejscu. Gdy tylko na ów stronie pojawi się nowy news / post, dostajesz natychmiastowo powiadomienie na maila o pojawieniu się nowych treści. W dowolnym momencie możesz kliknąć na link „Unsubscribe”, który z reguły znajduje się na końcu każdego, takiego maila. Oczywistym jest, że póki nie klikniesz Unsubscribe, to będziesz cały czas dostawać powiadomienia, co prowadzi często do niezłego spamu na skrzynce :). Podobnym przykładem są płatne subskrypcje SMS na telefon.

Wyobraź sobie, że powyższą sytuację można przedstawić w postaci strumienia z newsami, do którego Ty się podłączyłeś/aś. W tej sytuacji:

  • Ty jesteś obserwatorem (obserwujesz ten strumień i nasłuchujesz na wysyłane posty)
  • Strumień z newsami to obiekt obserwowany czyli Observable, który emituje wartości (newsy)
  • Subskrypcja do strumienia z newsami udostępnia Ci funkcjonalność w postaci „unsubscribe”, która powoduje, że już nie otrzymujesz powiadomień
  • W opcjach na stronie możesz filtrować powiadomienia („chcę być powiadamiany tylko o newsach w dziale „Programowanie”) i nasłuchiwać tylko na wybrane

Jeśli wzorce projektowe są Ci znane, to powyższy opis już przypomina Ci o wzorcu obserwatora… dokładnie! Observable to mniej więcej wzorzec Obserwatora  na sterydach (z tym, że Observable nie trzyma listy obserwatorów, z wyjątkiem Subject).

Podsumowując:
Observable to strumień z wartościami (z pojedynczą lub wieloma, dowolnego typu – np. stringami, liczbami, obiektami, tablicami), na który możesz nasłuchiwać i od którego w dowolnym momencie możesz się odłączyć.

2. Promise vs Observable

Po co tam ten cały RxJS? Już śpieszę z wyjaśnieniem. Jest genialnym narzędziem jeśli chodzi o obsługę asynchroniczności w Twoich aplikacjach i przekazywanie danych pomiędzy komponentami. Teraz sobie myślisz:

„przecież poznałam/em już Promise, które pozwalają mi ogarniać asynchroniczność, po co mi kolejne narzędzie?”

Nic bardziej mylnego. Observable w swoich możliwościach jest o wiele potężniejszy niż Promise.

Podstawowe różnice:
Promise:

  • jest zawsze asynchroniczny
  • jest „eager”
  • zwraca tylko jedną wartość
  • nie posiada dedykowanych operatorów
  • nie może być anulowany (co jest problematyczne chociażby w przypadku próby anulowania zapytań HTTP)

Observable:

  •  może być asynchroniczny lub synchroniczny (np. strumień stworzony za pomocą of() jest synchroniczny)
  • jest „lazy”
  • może zwracać jedną lub wiele wartości
  • możemy skorzystać z ogromnej ilości operatorów
  • może być anulowany
  • lepsze możliwości obsługi errorów (np. poprzez operator retryWhen)

3. Strumienie w Angular

Angular w wielu miejscach zwraca nam strumienie, min.:

W każdym momencie, możemy się podłączyć pod strumień i nasłuchiwać na wartości.

4. Subskrypcja do strumieni

Podłączenie się do strumienia jest banalnie proste. Observable posiada metodę subscribe, do której parametry możemy przekazać na dwa sposoby:

  • jako obiekt z metodami (TIP: poniższy zapis metod w obiekcie od ES 2015+ jest możliwy):

  • jako zestaw funkcji (callbacków):

Taki obiekt, przekazany do metody subscribe nazywamy Observerem.

Biorąc pod uwagę poszczególne składowe Observera:

  • Pierwsza funkcja (lub metoda next) jest wywołana, jeśli z sukcesem odbierzemy wartość ze strumienia. Każda nowa wartość otrzymana ze strumienia, powoduje wywołanie next() na nowo.

 

  • Druga funkcja (lub metoda error) jest wywołana, jeśli w strumieniu wystąpi error (np. w http.get dostaniemy status 404). W przypadku errora, observer nie przejdzie do wykonania funkcji completed. Natomiast zostanie oznaczony jako closed i nie będzie już emitował więcej kolejnych wartości.

 

  • Trzecia funkcja (lub metoda complete) jest wywołana, gdy observer otrzyma ostatnią wartość ze strumienia ze sukcesem, mówiąc bardziej po chłopsku, gdy Observable wypstryka się z wartości.

Korzystanie z wersji z obiektem jest przydane, jak np. chcemy skorzystać np. tylko z callbacka na next i complete , wtedy nie musimy przekazywać pustej funkcji na error.

Spójrzmy jeszcze na dwa strumienie, stworzone za pomocą funkcji of(), do tworzenia synchronicznych strumieni:


Zwróć uwagę, że tablica jest traktowana jako pojedyncza wartość w strumieniu, a nie seria wartości.

Unikaj w .subscribe() wszelkiej maści sideEffects (Np. przestawiania jakichś flag, mapowania wartości). Od tego są operatory, które poznasz w kolejnej części.

Warto również wydzielać callbacki do osobnych metod, w celu poprawienia czytelności kodu, np:

5. Cold vs Hot observables

Observables dzielimy na HOT & COLD:

Cold Observables:

  • Zaczynają emitować wartości, dopiero jak pojawia się subskrybent (czyli gdy pojawi się pierwszy subscribe())
  • Dla nowego subksrybenta, zwracają od nowa te same zasoby (patrz pkt. 4)
  • Przykład: Observable zwrócony z httpClient.get

Hot Observables:

  • Produkują już wartości nawet jak nie ma jeszcze subskrybenta
  • Współdzielą te same zasoby dla kolejnych subskrypcji
  • Przykład: Observable stworzony z eventa click: fromEvent(document, 'click’);

Możemy samemu przestawić Cold Observable na Hot Observable, np. za pomocą operatora share().

Poniżej przykład Hot Observable:
https://stackblitz.com/edit/angular-bwjpnu?file=src/app/app.component.ts

Zwróć uwagę na indentyczne TimeStamps w konsoli.

 

6. Wiele subskrypcji do jednego strumienia

Poniższy pomysł w pewnych przypadkach, może skończyć się niedobrze:

Za każdym razem gdy pojawia się nowy Observer dla danego Observable, zasoby owego Observabla zostają wyemitowane ponownie. W powyższym przykładzie zostaną wykonane dwa requesty HTTP.  Wynika to z tego, że powyższy Observable z pochodzący z http.get jest COLD. Uważaj na to!

7. Metoda unsubscribe()

Zapoznałaś/eś się już z metodą subscribe(). Owa metoda, oprócz tego że jako parametr przyjmuje Observera, zwraca również typ Subscription:

Z kolei obiekt Subscription posiada metodę Unsubscribe(), która mówi:
„ja, pan subskrybent (obserwator), nie jestem już dalej zainteresowany otrzymywaniem wartości ze strumienia, proszę już mnie nie informować o nowych wartościach : ]”.

Unsubscribe najczęściej wołamy w hooku OnDestroy, aby posprzątać sobie w momencie zniszczenia komponentu:

Wystarczy:

  1. Stworzyć pole o typie Subscription
  2. Następnie przypisać do tego pola obiekt Subscription zwracany przez metodę subscribe
  3. Zawołać na polu o typie Subscription metodę unsubscribe();

Wołanie unsubscribe() jest bardzo ważne, w przypadku braku odpinania się od strumieni, może dojść do wycieków pamięci (będzie istniało więcej observerów, niż myślisz, a Ty nawet nie będziesz o tym wiedzieć!). Jeśli nie jesteś pewny, czy użyć unsubscribe – to lepiej go użyj.

Natomiast w przypadku wielu subskrypcji w jednej klasie, możemy je zgromadzić pod jedną subskrypcją za pomocą metody add(), obiektu Subscription:

Tworzymy nową instancję Subscription, a następnie do metody ADD, przekazujemy jako parametr daną subskrypcję. Dzięki temu, możemy jednym unsubscribe(), odpiąć się od wszystkich subskrypcji na raz, dodatkowo nie dostaniemy błędu, jeśli nie są zdefiniowane.

8. Kiedy korzystać z unsubscribe()

W poprzednim punkcie poznaliśmy metodę Unsubscribe(). Teraz czas nauczyć się, kiedy faktycznie musimy odpinać się od strumieni.

 

Unsubscribe konieczny, gdy korzystasz z :

1) Inifinite Observables (np. te stworzone za pomocą rxjs – Interval)

2) z subskrypcji do Subjects (poznasz później)

3) Strumieni z WebSockets

4) AbstractControl.valueChanges()

5) Renderer2.listen

6) Strumienie stworzone z eventów za pomocą rxjs – fromEvent

7) NgRx Store (np. store.select(’products).subscribe(console.log))

 

NIE musisz korzystać z Unsubscribe, gdy:
1) Strumień sam się kompletuje (np. strumień stworzony za pomocą operatora of(1,2,3,4));

2) Router.events i wszystkie inne Observables z Routera (Angular automatycznie je odpina za Ciebie)

3) Korzystasz z AsyncPipe (poznasz dalej)

4) używasz Observables zwracane z HttpClient.get, post etc., czyli wszystkie finite Observales (chyba, że połączysz je z rxjs Interval, wtedy już musisz unsubscribe!)

5) korzystasz z EventEmittera

 

9. AsyncPipe

 Angular udostępnia nam pipe o nazwie AsyncPipe o dzięki któremu:
  • nie musimy jawnie wołać subscribe, następuje to automatycznie.
  • nie musimy jawnie wołać unsubscribe, AsyncPipe robi to za nas gdy komponent zostaje zniszczony
  • mniejszy narzut kodu
  • asyncPipe zwraca ostatnią wyemitowaną wartość ze strumienia
  • działa również z Promises
  • gdy przyjdzie nowa wartość, uruchamia system detekcji
  • gdy dostanie jako input Observabla z nową referencją, to odepnie się od poprzedniej i zasubskrybuje się do nowego strumienia

Przykład użycia:

Jak widzisz, AsyncPipe wykonuje subskrypcję za nas. Ladnie i czysto. No to wyświetlmy w templatce jeszcze ilość produktów, korzystając ponownie z asyncPipe:

Brawo! Właśnie zrobiłeś/aś dwa strzały do API (pod getProducts() siedzi http.get()) …niefajnie, prawda? 😉

Odkryj zatem prosty trik 😉

Wystarczy wrzucić AsyncPipe do *ngIf i udostępnić w templatce odebraną wartość ze strumienia poprzez „as” i wybraną przez nas nazwę. W tym momencie jest tylko jeden AsyncPipe, czyli wyłącznie jedna subskrypcja, a co za tym idzie, jedno zapytanie do serwera.

Jest również drugi sposób, aby uniknąć wielu zapytań HTTP, a jednocześnie móc nadal skorzystać z wielu asyncPipe na jednym strumieniu, bez użycia *ngIf.  Wystarczy skorzystać z operatora ShareReplay():

Teraz możesz do bólu walić asyncPipe na jednym strumieniu 😉 ShareReplay powoduje, że kolejni subskrybenci mają dostęp to wcześniej wyemitowanej wartości, oraz, że zasoby strumienia nie są produkowane na nowo dla każdej subskrypcji (stąd jedno zapytanie HTTP w powyższym przypadku).

Chciałbym Cię jeszcze ostrzec przed popularnym błędem, w którym asyncPipe nakłada się na metodę, która zwraca nowy Observable:

Założmy, że FightsService jest wstrzyknięty do klasy komponentu, a metoda getFligts() robi http.get do serwera po loty i zwraca strumień z lotami. Prosty, typowy przypadek.

W czym więc problem?

Angular przy każdym uruchomieniu ChangeDetection, odswieża bindingi ( {{ … }} ) i na nowo woła metodę getFlights() z FlightsService,  która zwraca nowego Observabla, i poprzez asyncPipe robi unsubscribe od poprzedniego i subskrybuje się do nowego, wiec suma sumarum, Angular nie ma szansy wyświetlić tej wartości, wszystko trwa milisekundy. Dlatego nigdy nie przekazuj do AsyncPipe metody, która po zawołaniu zwraca nowy strumień! Przypisuj strumienie do pól klasy, aby mieć pewność, że działasz ciągle na tej samej referencji strumienia.

 

PS! Staraj się zawsze używać AsyncPipe jeśli możesz, zamiast wołać jawnie subscribe.

10. Operatory

Potężną funkcjonalnością Observables, są operatory. Możemy łatwo filtrować, mapować, transformować wartości ze strumienia. Istnieje ich wiele (chyba z ponad 150?), stąd oczywiście nie będziemy robić przeglądu wszystkich. Nauczę Cię jak ich używać.

Każdy Observable, posiada metodę .pipe():

  • do niej wrzucamy operatory
  • pipe robi kompozycję z przekazanych operatorów
  • wartości przepływają poprzez kolejne operatory (stąd kolejność ma znaczenie)
  • kolejne operatory oddzielamy przecinkiem
  • pipe zwraca typ Observable<T>, stąd możemy po danym pipe, wołać znowu pipe, a potem znowu pipe, a w końcu np. subscribe
  • możemy stworzyć custom operator i przekazać go do pipe

Przykład wykorzystania operatorów, na strumieniu 3 tablic:

No początku trzeba przestawić myślenie jeśli chodzi o użycie chociażby map i filter, które dotyczą kolejnych wartości przychodzących ze strumienia, a nie pojedynczych wartości w danej tablicy. Stąd jak chcę przemapować tablicę, to wołam w operatorze map, Array.prototype.map.

Pamiętaj, że na pipe() możesz wołać kolejny pipe(). Jest bardzo przydatne, np. pierwszą obróbkę danych możemy już zrobić w serwisie (tam już pierwszy pipe), a następnie kolejną np. w klasie komponentu (drugi pipe, nałożony na ten sam Observable).

Operatorów jest multum, poniżej podaję operatory, które wypada znać:

  • Filtrowanie:  filter, first, debounce, distinctUntilChanges
  • Kombinacje: merge, forkJoin, startWith
  • Transformowanie: map, switchMap, pluck
  • Obsługa błędów: catchError
  • Inne: tap, finalize

I nie bój się korzystać z operatorów! Jak widzisz kod:

To ochrzań kolegę, koleżankę lub siebie i zadbaj o wykonanie operacji w odpowiednich operatorach : -)

11. Importy

Od wersji RxJS 5.5.0, importy mocno się uprościły:

  • operatory:  import { map, filter, debounce, tap } from „rxjs/operators”
  • typingi oraz funkcje które zwracają strumienie: import { Observable, of, fromEvent } from „rxjs”

Także wystarczy zapamiętać prostą zasadę, że operator z rxjs/operators, a cała reszta z rxjs.

12. Subjects

Subjects to cały, duży odrębny temat, stąd wyłącznie parę słów, abyś miał/a świadomość istnienia takiego tematu.
Jest to jednocześnie HOT Observable jak i Observer. W Angularze najczęściej wykorzystywany do tworzenia strumieni, które przekazują nam dane pomiędzy komponentami na różnych poziomach zagnieżdżenia. Rozróżniamy parę rodzajów Subjects:

  • Subject – subskrybent otrzyma wartości, które zostały wyemitowane po subksrypcji
  • BehaviorSubject – ostatnia wartość zostaje zapisana w pamięci (w cache). Wszyscy subskrybenci dostaną „skeszowaną” wartość jako początkową.
  • ReplaySubject – może „skeszować” więcej wartości niż tylko ostatnią i subskrybenci po subskrypcji dostaną na starcie wszystkie te wartości

Przykład stworzenia serwisu dwukierunkowego za pomocą Subject:

ANGULAR 2 – Bidirectional Service

13. Konwencja nazewnicza

Observables oznaczamy znakiem dolara umieszczonym na końcu nazwy. Nie pomyl z jQuery, gdzie dolarek znajduje się na początku 😉 Przykład:

Podsumowanie

Uff, udało nam się dojść do końca ;). Mam nadzieję, że dzięki temu artowi, rozjaśniłem Ci nieco, czym są Observable oraz jaka jest ich rola w Angularze. A przede wszystkim, rozpoczynając pracę z Angularem, nie będziesz zielony w tematach Rxowych. Nie krępuj się zadawać pytań w komentarzach! 🙂

PS. Spodobał się artykuł? To kliknij przycisk FB „Like”, bądź na bieżąco z nowymi artykułami i pomóż mi zwiększać zasięg :)!

 

O autorze

Tomasz Nastały

JavaScript Developer, entuzjasta frameworka Angular oraz TypeScripta. Na co dzień lubię dzielić się wiedzą poprzez prowadzenie zajęć w jednym z trójmiejskich bootcampów i nagrywaniem kursów z Angulara.

Zapisz się do naszego newslettera. Bądź na bieżąco z najnowszymi trendami, poradami, meetupami i stań się częścią społeczności Angulara w Polsce. Rynek pracy docenia członków społeczności.

23 komentarzy

    • Grzegorz

      Cześć, może ktoś będzie się znał i wiedział jak w RX obsłużyć sytuację:

      Użytkownik na stronie ma dostępna wyszukiwarkę.
      Podczas wpisywania wysyłany jest event z aktualną wartością z pola.
      Pierwszy request ma zostać wysłany po wpisaniu 3 pierwszych znaków. (To mam) Następny ma zostać wysłany w odstępie co najmniej 1s. Niezależnie od szybkości wpisywania.

      Jak można to zrobić za pomocą operatorów? Może jakiś inny sposób byłby lepszy?

  1. Sebastian

    Świetny artykuł! Dużo wiedzy!
    A jak najlepiej to wykorzystać? – odpytując API jakieś strony gdzie dane zmieniają się nawet co 5 sekund? Jak to zgrabnie odświeżać?

      • Sebastian

        w Angularze mam serwis, gdzie http.get(’xxx’); pobieram dane z API, w komponencie mam funkcję, którą subskrybuję to. W ngOnInit wywołuję tę funkcję oraz ustawiam setInterval co 5 sekund:
        setInterval(() => {
        this.getData();
        }, 5000);

        Jeśli ustawie na 2-3 sekundy dostaję blokadę na kilka sekund za zbyt częste odpytywanie.

        Czy to dobre rozwiązanie? Jest lepsze? Pomijając fakt websocketów… 🙂

        • odpytywanie API to odpowiedzialność serwisu, pomysł z setInterval w klasie komponentu, zastąpiłbym z intervalem RxJS:

          getData() {
          return interval(5000)
          .pipe(
          switchMap(() => this.http.get('...'))
          .pipe(map(data => ...))
          )
          );
          }

          oczywiście należy pamiętać o unsubscribe w klasie komponentu, bo to jest infinite observable i sam się nie skompletuje

  2. Przemek

    a jak korzystać z asyncPipe gdy często operujemy na danych? Przykładowo na init komponentu przychodzi jakaś lista, na froncie usuwam jeden element ale bez sensu jest wtedy odpytywanie api o nowa liste, ja po prostu usuwam z listy na froncie, jakoś nie potrafię tego ogarnąć na asyncPipie. Mam na myśli takie coś:

    ngOnInit(private service: SomeService) {
    this.service.get().subscribe(data => {
    this.data = data;
    });
    }

    protected getIndex(id) {
    return this.data.map(el => el.id).indexOf(id);
    }

    public delete(id) {
    const index = this.getIndex(id);
    this.service.delete(id).subscribe(
    () => {
    this.data.splice(index, 1);
    }
    );
    }

    Jak to ładniej opakować?

    Pozdrawiam

  3. Marcin

    Artykul fajny. Tylko zawsze ciekawi mnie tendencja do promowania fragmentarycznej notacji wegierskiej i uzywania przedrostkow i przyrostkow w typescipt. Bo, ze juz nikt nie oczekuje nazewnictwa a_pui32SomeArray, ale stream$ i $element sie trzymaja 😉

    Kazde nowoczesne IDE podpowie typ, a kazdy kompilator (a wczesniej najpewniej IDE) pokaze blad przy niewlasciwym uzyciu. Kontekstowo tez przeciez widac uzycie – pipe’y/subscribe’y w logice, | async w widokach. Dla mnie osobiscie to archaizm i nie uzywam 🙂 W przykladach do observables na angular.io tez juz chyba zniknelo.

    Stad ciekawosc dlaczego p.13 😉

    • kwestia ustalonych standardów w projekcie, byłem w teamie gdzie ustaliliśmy, że odpuszczamy dolara dla streamów. U mnie np to nawyk, dodawanie dolara do nazwy streama 🙂 nie uważam tego za nic złego, ani szczególnie też dobrego :).

  4. Pingback: Angular - Asynchroniczne walidatory - Angular.love

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *