5 reasons to use RxJava in your projects

Share
  • April 10, 2019

Reactive Extensions (Rx) are a set of methods and interfaces often implemented in the code to solve a lot of developers’ problems. It may look too complicated at first glance but in fact, it helps you write elegant code without having to sacrifice its simplicity.

Let’s take Java as an example. Reactive extensions in Java allow you to manipulate multiple actions that occur due to certain system events simultaneously. Rx gives you a possibility to use functional transformations over streams of events and it doesn’t require using nasty things like callbacks and global state management.

Inexperienced programmers might find working with RxJava too difficult, its tools redundant, and its usage worthless. But those who took their time to comprehend Rx admit a lot of advantages it offers:

  • Intuitivity — Rx uses the same syntax for action descriptions as in functional programming like Java Streams.
  • Declarativity — you can express what is required at a higher level of abstraction.
  • Expandability — You can extend Rx with your own custom extension methods.
  • Composability — operators in RxJava are easily assembled to conduct difficult operations.
  • Convertibility — operators in RxJava can transform data types by filtering, processing and expanding data streams.

Rx is built on the Observer pattern. The key elements are the Observer and the Subscriber. Observable is the base type. This class contains the main part of the Rx implementation and includes all basic operators.

 

So, why RxJava?

  1. Asynchronous streams
  2. Functional approach
  3. Caching is easy
  4. Operators with Schedulers
  5. Using of Subjects

SEE ALSO: RxJava-based RxRedux is now open source

1. Asynchronous streams

Here’s an example:

You need to send a request to the database, and then you should start getting both messages and settings immediately. After everything is completed, you need a welcome message to be displayed.

If we want to do the same in Java SE or Android, we have to do all of the following:

  1. Run 3-4 different AsyncTasks
  2. Create a semaphore that waits for both requests to complete (settings and messages)
  3. Create fields at the Objects level to store the results

We can optimize the process by working with RxJava. In this case, the code looks like a thread which is located in one place, being built on the basis of a functional paradigm.

Alternatively, we can use optional binding:

Observable.fromCallable(createNewUser())
        .subscribeOn(Schedulers.io())
        .flatMap(new Func1>>>() {
     	        //  works with settings})
        .doOnNext(new Action1>>() {
            @Override
            public void call(Pair> pair) {
                System.out.println("Received settings" + pair.first);
            }
        })
        .flatMap(new Func1>, Observable>() {
                    //works with messages)
        .subscribe(new Action1() {
            @Override
            public void call(Message message) {
                System.out.println("New message " + message);
            }
        }); 

SEE ALSO: Top 5 most popular Java projects on GitHub

2. Functional approach

If you are familiar with the functional programming, especially with map and zip concepts, RxJava will be even easier for you to handle. We can say that functional programming consists of the active use of functions as the parameters as well as results in other functions. For example, map is a function of higher order, used in many programming languages. It is applied to each element in the list, returning a list of results.

Here’s how it looks:

Observable.from(jsonFile)
            .map(new Func1() {
        @Override public String call(File file) {
            try {
                return new Gson().toJson(new FileReader(file), Object.class);
            } catch (FileNotFoundException e) {
                // this exception is a part of rx-java
                throw OnErrorThrowable.addValueAsLastCause(e, file);
            }
        }
    }); 

3. Caching becomes easy

The next piece of code uses the caching method in the intent that n only one copy saves its result after it was successful for the first time.

Single < List < Todo >> todosSingle = Single.create(emitter - > {
      Thread thread = new Thread(() - > {
          try {
              List < Todo > todosFromWeb = // query a webservice
                  System.out.println("I am only called once!");
              emitter.onSuccess(todosFromWeb);
          } catch (Exception e) {
              emitter.onError(e);
          }
      });
      thread.start();
  }); 
 
  // cache the result of the single, so that the web query is only done once
      Single < List < Todo >> cachedSingle = todosSingle.cache(); 

SEE ALSO: Airbnb open sources MvRx, its Kotlin-first, Kotlin-only Android framework

4. Operators with schedulers

There is a variety of operators which require to define Scheduler in order to function. At the same time, they have their overloaded methods that use computation(), delay () as a Scheduler
Here’s an example:

TestSubscriber subscriber = new TestSubscriber<>();
Observable.just(1).delay(1, TimeUnit.SECONDS).subscribe(subscriber);
subscriber.awaitTerminalEvent();
Logger.d("LastSeenThread: " + subscriber.getLastSeenThread().getName()); 

Though we didn’t declare any Scheduler, we can still see the next result.

LastSeenThread: RxComputationThreadPool-1

You should always declare the required Schedulers as the third argument if you don’t want to use the computation scheduler.

.delay(1, TimeUnit.SECONDS, Schedulers.immediate())

Apart from delay(), there are lots of other operators that can change Scheduler: timer (), interval (), debounce (), overloads of buffer (), take (), skip (), timeout () etc.

5. Using Subjects

Working with Objects, you should take into account, that by default the sequence of changes in data, sent to onNext subject will be executed(implemented)in the same thread, that was used to call onNext() method, until observeOn() operator doesn’t appear in this sequence.

BehaviorSubject subject = BehaviorSubject.create();
                subject
                .doOnNext(obj -> Logger.logThread("doOnNext"))
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber() {
                    @Override
                    public void onNext(Object o) {
                        Logger.logThread("onNext");
                    }
                });
        subject.onNext("str");
        Handler handler = new Handler();
        handler.postDelayed(() -> subject.onNext("str"), 1000); 

Both observeOn and subscribeOn are mentioned here, but the result will be as follows:

doOnNext: RxCachedThreadScheduler-1
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1 

This means, that when we subscribe to subject it returns the value immediately and then it is processed in a thread of Shedulers.io (). And then, when the following message arrives to subject, we use the thread in which onNext() was called.

SEE ALSO: GitHub Trending: The community is interested in Flutter, Xray, TensorFlow & Vue.js

Conclusion

When we use Rx, it helps us by:

  • handling the cache without creating caching classes
  • combining the reception of requests and results processing and getting rid of standard AsyncTask
  • decreasing memory leak by 90%
  • optimizing the code to increase an application response
  • making methods easier to combine

As you can see, the usage of RxJava is fully justified. It may seem like the fifth wheel, but programmers around the world appreciate its tendency to simplify and shorten the code. It has proven its efficiency with network requests and threads, process response and memory leak prevention. This tool will save you a lot of nerves.

The post 5 reasons to use RxJava in your projects appeared first on JAXenter.

Source : JAXenter