Understanding RxJava & RxAndroid Scheduler thread in Android

Sharing is caring!

RxJava is the something like awesome when start using this feature in an Android application. In this article, we will talk about the RxJava scheduler or thread to use to build an awesome android application.  When we are talking about the multiple threads is good things to use for multiple background task. But same time you always remember that android application is the very limited memory or resource to execute the task.

The primary reason that you should consider using thread pools is that they maintain a number of pre-created threads that are idle and waiting for work. This means that when you have work to be done, you don’t need to go through the overhead of creating a thread. Once your work is done, that thread can also be re-used for future work instead of constantly creating and destroying threads.

Google always recommended that use the Android API component based thread like Async Task, Handler, Looper and Service to execute any task which is required to run in the background. Android does not suggest to use the java multithreaded task or thread pool because thread always consume resource and it will affect the performance.

Android always provides the customized thread pool executor to solve this problem, for example, executeOnExecutor. We all know how difficult multi-threading can sometimes get in Java. Executing a piece of code in a background thread and getting results back in the UI thread, might sound easy, but in reality, there are a lot of tricky situations to take care of.

Let’s talk these things in RxWorld because today is the Reactive world. RxJava by default is not multiple threads. Because Rx is targeted at asynchronous systems and because Rx can naturally support multithreading, new Rx developers sometimes assume that Rx is multithreaded by default. It is important to clarify before anything else that Rx is single-threaded by default.

Schedulers

Threading in RxJava is done with help of Schedulers. Scheduler can be thought of as a thread pool managing one or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread. The observeOn and subscribeOn methods take as an argument a Scheduler. Let’s see the type of Schedulers is available to for multiple threading.

  1. Schedulers.io()
  2. Schedulers.computation()
  3. Schedulers.newThread()
  4. Schedulers.from(Executor executor)
  5. AndroidSchedulers.mainThread()
  6. Schedulers.single()
  7. Schedulers.trampoline()

Lets disccuss the the Schedular threads in detail and their uses.

Schedulers.io() 

Schedulers.io() is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.

As we know that Async Task is very dirty to use the networking call because it runs by two different threads. One thread is for background thread and results delivered to UI thread. In this case might be we are using the callback to pass result. But callback is really hell because it always generates the memory leaks if the android resource has killed the application while the memory is not enough to execute the program.  In this case, RxJava is more suitable is used to based on subscription, no callback no memory leaks.

Let’s see by an example of RxJava Scheduler IO for networking call as part of subscribeOn and observerOn in combination with Scheduler.

Observable.just("io")
  .subscribeOn(Schedulers.io())
  .subscribe(i -> result += Thread.currentThread().getName());

Result : RxIoScheduler-2

subscribeOn is the computation as part of Observable gets executed only when the methodsubscribe() is called on the Observable.   observerOn method tells RxJava library to perform the computation, as part of Operator or Subscriber declared after its definition, on a thread provided by the Scheduler. observerOn can be used for thread switch execution. One thing always remembers that observerOn run on the upstream concept while subscribeOn  is run on the downstream concept for multiple observerOn and subscribOn.

Observable.create { subscriber ->
            val callback = object : Callback<Tweet>() {
                override fun success(result: Result<Tweet>) {
                    Log.i(TAG, "Tweet tweeted")
                    subscriber.onNext(result)
                }

                override fun failure(e: TwitterException) {
                    Log.e(TAG, e.message, e)
                    subscriber.onError(e)
                }
            }
  timelineFragmentVM!!.sendTweet(tweetText.text.toString()).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            { x ->
                                  showMessage(getString(R.string.alert_tweet_successful))
                            },
                            { e ->
                                   showMessage(getString(R.string.alert_tweet_failed))
                            })
        }

Schedulers.computation()

Schedulers.computation() is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.

Let’s take an example of computation scheduler to generating the bitmap for the user.

public Observable<Bitmap> generateAsync(String userName) {
    return Observable.create(new Observable.OnSubscribe<Bitmap>() {
        @Override
        public void call(Subscriber<? super Bitmap> subscriber) {
            Bitmap bitmap;
            try {
                bitmap = generate(userName);
                subscriber.onNext(bitmap);
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    }).subscribeOn(Schedulers.computation()).observeOn(AndroidSchedulers.mainThread());
}

Schedulers.newThread()

Schedulers.newThread() creates a new thread for each unit of work scheduled. This scheduler is expensive as the new thread is spawned every time and no reuse happens.

Let’s take an example of the new thread.

Observable.just("Hello")
  .observeOn(Schedulers.newThread())
  .doOnNext(s ->
    System.out.println("Thread=:"+ Thread.currentThread().getName());
  )
  .observeOn(Schedulers.newThread())
  .subscribe(s ->
    System.out.println("Thread=:"+ Thread.currentThread().getName());
  );
Thread.sleep(500);

result : RxNewThreadScheduler-1, RxNewThreadScheduler-2

Schedulers.from(Executor executor)

Schedulers.from(Executor executor) creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use.Scheduler.from(Executors.newFixedThreadPool(n)) This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.

AndroidSchedulers.mainThread()

Main thread or AndroidSchedulers.mainThread() is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.

timelineFragmentVM!!.sendTweet(tweetText.text.toString()).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            { x ->
                                  showMessage(getString(R.string.alert_tweet_successful))
                            },
                            { e ->
                                   showMessage(getString(R.string.alert_tweet_failed))
                            })
        }

Schedulers.single()

Schedulers.single() is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.

Schedulers.trampoline()

Schedulers.trampoline() executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.

Observable.just(2, 4, 6, 8)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Thread.sleep(500);
System.out.println(result);

Result: 246813579

As a final note, RxJava makes it very easy to write multithreaded code using simple declarations as part of subscribeOn and observed in combination with Scheduler.

For better understanding I would be recommended to check all post of RxJava are Java 8 Features to start using in Android, Understanding Java 8 Stream and RxJava2 Observable in AndroidBasic understanding and practice of RxJava2 functions in Android part -1Understanding and practice of RxJava & RxBinding of Android part -2Understanding Runtime permission and RxPermission of RxJava in Android part -3.

Please do subscribe your email to get every newsletter from this blog and if you feel that this post helps you then do not forget to share and comment below. I used the reference to write to this post which is written here and here.

Happy coding 🙂

0 0 votes
Article Rating
Understanding RxJava & RxAndroid Scheduler thread in Android
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Scroll to top
0
Would love your thoughts, please comment.x
()
x