Buildtide
Author: Hussain Mir Ali

I am interested in web and mobile technologies.

If you have any questions or feedback then message me at devtips@Buildtide.com.

Observable in RxJS


The Observer design pattern: 

The Observer design pattern allows the object of interest to notify other objects if there is a change in its state. The object of interest is called the 'Subject' and other objects that will be notified are called 'Observer'. As it can be seen in Fig 1.0 that the 'Subject' class has the 'registerObserver(observer)' method that allows multiple observers to register for notification. This allows the 'Subject' to notify the 'Observer' if there is any change in its state. 
The 'notifyObservers()' method in 'Subject' class calls the 'notify()' method for each of its observers. The RxJS 'Observable' works in a similar way where the '.subscribe()' method in RxJS allows observers to get notified if the state of stream changes.The 'Observable' in RxJS is comparable to the 'Subject' in Observer design pattern. 



Fig 1.0 UML diagram for Observer pattern


Hot vs Cold Observable:

Cold Observable is not created unless it is subscribed to. The code example below shows how the '.defer()' method can be used to demonstrate a cold Observable. When 'B' is subscribed only then the value is returned. In this example by the value of 'A' will be changed from 3 to 5. 

let A = 3;

let B = Rx.Observable.defer(()=> {
return Rx.Observable.return(A);
});

A = 5;

B.subscribe((value)=>{
console.log(value === 5);//true
},
(error) =>{

},
(complete)=>{

});

Hot Observable exists even before it is subscribed. Most use cases are in the UI where events are represented as streams. For instance the moving of a mouse pointer on the window generates a stream of events. This stream can be subscribed by an observer to perform any computation relating to X and Y coordinates. The code below shows an example of how the mouse move stream can be subscribed. 

let mousestrm = Rx.Observable.fromEvent(document, 'mousemove');

mousestrm.subscribe(
(e)=> {console.log(e.clientX, e.clientY);},
(error)=>{ console.log(error)},
()=>{ console.log('complete')});

Shared Observable:

By default  an Observable is not shared in RxJS. Each time an Observable is subscribed it will lead to a new chain of operations. While this is good for implementing factories it might have some unintended consequences for other use cases.

The code below shows how an observable that is not shared might lead to unintended output.


var observable = Rx.Observable.interval(500).take(5)
.do(i => console.log("obs value "+ i) );

observable.subscribe(value => console.log("observer A received " + value));

observable.subscribe(value => console.log("observer B received " + value));

The above code leads to the following output.

obs value 0
observer A received 0
obs value 0
observer B received 0
obs value 1
observer A received 1
obs value 1
observer B received 1
obs value 2
observer A received 2
obs value 2
observer B received 2
obs value 3
observer A received 3
obs value 3
observer B received 3
obs value 4
observer A received 4
obs value 4
observer B received 4

So each time an observer subscribes it creates a new chain of process execution. In this case for both observers 'A' and 'B' the observable's '.do()' method fires twice. But in specific use cases if we want it to only fire once for both observers it can be an issue. For this reason it is better to share the observable among its observers. The '.share()' operator allows an 'Observable' to be shared between observers.

var observable = Rx.Observable.interval(500).take(5)
.do(i => console.log("obs value "+ i) ).share();
observable.subscribe(value => console.log("observer A received " + value));

observable.subscribe(value => console.log("observer B received " + value));

Now the observable will be shared and the anomaly will disappear.


obs value 0
observer A received 0
observer B received 0
obs value 1
observer A received 1
observer B received 1
obs value 2
observer A received 2
observer B received 2
obs value 3
observer A received 3
observer B received 3
obs value 4
observer A received 4
observer B received 4

It is evident that the '.do()' operator is only fired once for both observers 'A' and 'B'.

Observable from Events:

Below are some of the most common use cases in RxJS to handle UI events.

Click Event

let loginbuttn = $('#login');
let loginbuttnstrm = Rx.Observable.fromEvent(loginbuttn, 'click');


loginbuttnstrm .subscribe(
(success)=>{/*Login button successfully clicked.*/ },
(error)=>{/*Error*/ },
()=>{/*Complete*/}
);

Input Event


let inputname = $('#name');
let inputNameStrm = Rx.Observable.fromEvent(inputname, 'keyup');

inputNameStrm.subscribe(
(e)=> {console.log(e.target.value);},
(error)=>{ console.log(error)},
()=>{ console.log('complete');});

Mouse Event

let mousestrm = Rx.Observable.fromEvent(document, 'mousemove');

mousestrm.subscribe(
(e)=> {console.log(e.clientX, e.clientY);},
(error)=>{ console.log(error)},
()=>{ console.log('complete')});