Reactive Programming using RxJS




Choose an Operator:


  • get acquainted with the big list of functions for transforming, combining, and creating Observables.
  • Designing Streams: Simply list down your requirement Eg. “User calls URL A to get data and then on refresh click, data gets cleared and we call the same URL with new data.” Then create streams for stuff that happens/changes over time and subscriptions for actions to be taken when that stuff happens/changes:
    • refreshClickStream = Observable.fromEvent(“refreshButtonClick”)
    • closeClickStream = Observable.fromEvent(“closeButtonClick”)
    • requestStream = refreshClickStream.startsWith(“URL A”).map(function(){ return “URL\?s=”+data})
    • responseStream = requestStream
      .flatMap(function(requestUrl) {
      return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      }).share(); // responseStream is a cold observable (observables are cold by default), and it had 3 observers suggestion1Stream, suggestion2Stream, and suggestion3Stream. Essentially, these 3 create their own internal response stream, and that’ll causes the 3 API requests unless you SHARE (ie .publish().refCount()) it.
    • suggestionStreamcloseClickStream.startWith(‘startup click’)
      function(click, listUsers) {l
      return listUsers[Math.floor(Math.random()*listUsers.length)];
      }).merge( //on refresh, clear the suggestions{ return null; })
    • suggestionStream.subscribe(function(suggestion) {
    • //get rid of the if and else by using filter()
      if (suggestion === null) {
      // hide the first suggestion DOM element
      else {
      // show the first suggestion DOM element
      // and render the data
  • one hint for “thinking in the right way“: avoid saying that the SuggestionRemoved event would trigger the API stream. Think the other way around: the API stream is triggered by the SuggestionRemoved. It’s an important distinction because Rx is “Push” while imperative is “Pull”. It makes a difference because SuggestionRemoved will have no knowledge of the API stream. It’s the API stream that can listen to (“subscribe”) events from SuggestionRemoved.
  • Thinking in Streams is basically thinking about orchestrating your business logic as composing functions (using CPS ie dot notation eg. instead of function calls. 
    • Everything can be converted into a stream, from sync calls, to static collection(Eg. Observable.fromArray) to single values (The Just operator converts an item into an Observable that emits that item. Eg. Rx.Observable.just(42)) or events (Rx.Observable.fromEvent(refreshButton)) or async actions:
    • requestStream.subscribe(function(requestUrl) {
        // execute the request
        var responseStream = Rx.Observable.create(function (observer) {
          .done(function(response) { observer.onNext(response); })
          .fail(function(jqXHR, status, error) { observer.onError(error); })
          .always(function() { observer.onCompleted(); });
        responseStream.subscribe(function(response) {
          // do something with the response
    • What Rx.Observable.create() does is create your own custom stream by explicitly informing each observer (or in other words, a “subscriber”) about data events (onNext()) or errors (onError()). What we did was just wrap that jQuery Ajax Promise. Excuse me, does this mean that a Promise is an Observable? YES
    • A Promise is simply an Observable with one single emitted value. Rx streams go beyond promises by allowing many returned values.
    • Now back to our example, if you were quick to notice, we have one subscribe() call inside another, which is somewhat akin to callback hell. Also, the creation of responseStream is dependent on requestStream. As you heard before, in Rx there are simple mechanisms for transforming and creating new streams out of others, so we should be doing that.
    • var responseMetastream = requestStream
        .map(function(requestUrl) {
          return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
    • Then we will have created a beast called “metastream“: a stream of streams. Don’t panic yet. A metastream is a stream where each emitted value is yet another stream. You can think of it as pointers: each emitted value is a pointer to another stream. In our example, each request URL is mapped to a pointer to the promise stream containing the corresponding response.
    • A metastream for responses looks confusing, and doesn’t seem to help us at all. We just want a simple stream of responses, where each emitted value is a JSON object, not a ‘Promise’ of a JSON object. Say hi to Mr. Flatmap: a version of map() that “flattens” a metastream, by emitting on the “trunk” stream everything that will be emitted on “branch” streams. Flatmap is not a “fix” and metastreams are not a bug, these are really the tools for dealing with asynchronous responses in Rx.
      var responseStream = requestStream
        .flatMap(function(requestUrl) {
          return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  • Reactive programming is programming with asynchronous data streams.
    • In a way, this isn’t anything new. Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some side effects.
    • Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc. For example, imagine your Twitter feed would be a data stream in the same fashion that click events are. You can listen to that stream and react accordingly.
  • A stream is a sequence of ongoing events ordered in time. It can emit three different things: a value (of some type), an error, or a “completed” signal. Consider that the “completed” takes place, for instance, when the current window or view containing that button is closed.
  • We capture these emitted events only asynchronously, by defining a function that will execute when a value is emitted, another function when an error is emitted, and another function when ‘completed’ is emitted. Sometimes these last two can be omitted and you can just focus on defining the function for values. The “listening” to the stream is called subscribing. The functions we are defining are observers. The stream is the subject (or “observable”) being observed. This is precisely the Observer Design Pattern.
  • In common Reactive libraries, each stream has many functions attached to it, such as map, filter, scan, etc. When you call one of these functions, such as, it returns a new stream based on the click stream. It does not modify the original click stream in any way. This is a property called immutability, and it goes together with Reactive streams. That allows us to chain functions like
  • Cold observables start running upon subscription, i.e., the observable sequence only starts pushing values to the observers when Subscribe is called. Values are also not shared among subscribers. This is different from hot observables such as mouse move events or stock tickers which are already producing values even before a subscription is active. When an observer subscribes to a hot observable sequence, it will get all values in the stream that are emitted after it subscribes. The hot observable sequence is shared among all subscribers, and each subscriber is pushed the next value in the sequence. For example, even if no one has subscribed to a particular stock ticker, the ticker will continue to update its value based on market movement. When a subscriber registers interest in this ticker, it will automatically receive the next tick.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s