How do I handle HTTP errors like 401, 403, 503,500 using RxJava Observer instead of Event Bus

    Observable.just(new Object())
            .subscribeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {
                @Override
                public void onNext(Object o) {
                }

                @Override
                public void onError(Throwable e) {

                    if(e instanceof HttpException) {
                        HttpException httpException = (HttpException) e;

                        if(httpException.code() == 400)
                            Log.d(TAG, "onError: BAD REQUEST");
                        else if(httpException.code() == 401)
                            Log.d(TAG, "onError: NOT AUTHORIZED");
                        else if(httpException.code() == 403)
                            Log.d(TAG, "onError: FORBIDDEN");
                        else if(httpException.code() == 404)
                            Log.d(TAG, "onError: NOT FOUND");
                        else if(httpException.code() == 500)
                            Log.d(TAG, "onError: INTERNAL SERVER ERROR");
                        else if(httpException.code() == 502)
                            Log.d(TAG, "onError: BAD GATEWAY");

                    }
                }

                @Override
                public void onComplete() {

                }
            });

Please make a note that, all responses with code 2xx will be consumed in onNext and codes starts with 4xx and 5xx will be consumed in onError.

FYI. ResponseCodeCheckInterceptor is not required in this case. just give a try without custom interceptor and this should do the trick.

UPDATE

Custom Observer

public abstract class CustomObserver extends DefaultObserver implements Observer{

@Override
public void onNext(Object o) {

}

@Override
public void onError(Throwable e) {
    if(e instanceof HttpException) {
        HttpException httpException = (HttpException) e;

        if(httpException.code() == 400)
            onBadRequest(e);
        else if(httpException.code() == 401)
            onNotAuthorized(e);
        else if(httpException.code() == 502)
            onBadGateway(e);

    }
}

public abstract void onNotAuthorized(Throwable e);

public abstract void onBadGateway(Throwable e);

public abstract void onBadRequest(Throwable e);

@Override
public void onComplete() {

}

}

Implementation

Observable.just(new Object())
            .subscribeOn(Schedulers.io())
            .subscribe(new CustomObserver() {
                @Override
                public void onNext(Object o) {
                    super.onNext(o);
                }

                @Override
                public void onError(Throwable e) {
                    super.onError(e);
                }

                @Override
                public void onNotAuthorized(Throwable e) {

                }

                @Override
                public void onBadGateway(Throwable e) {

                }

                @Override
                public void onBadRequest(Throwable e) {

                }

                @Override
                public void onComplete() {
                    super.onComplete();
                }
            });

Kotlin way

If you use Kotlin and MVVM I suggest another simple way

You can use Kotlin extensions like below:

fun <T> Single<T>.handelNetworkError() =
    onErrorResumeNext { e ->
        when (e) {
            is OfflineException -> return@onErrorResumeNext Single.error(Exception("check your internet connection "))
            is SocketTimeoutException -> return@onErrorResumeNext Single.error(Exception("server not fount"))
            is retrofit2.HttpException -> {
                val responseBody = e.response().errorBody()
///if you want more custom error:
 /* when(e.response().code()){
                    500 ->  return@onErrorResumeNext Single.error( customException500(responseBody?.run { getErrorMessage(responseBody) }))
                    402 ->  return@onErrorResumeNext Single.error(customException402(responseBody?.run { getErrorMessage(responseBody) }))
            .... ->    }  */

                return@onErrorResumeNext Single.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
            }
            is IOException -> return@onErrorResumeNext Single.error(Exception("Network error"))
            else -> return@onErrorResumeNext Single.error(e)
        }
    }

fun <T> Observable<T>.handelNetworkError() =
    onErrorResumeNext { e : Throwable ->
        when (e) {
            is OfflineException -> return@onErrorResumeNext Observable.error(Exception("check your internet connection"))
            is SocketTimeoutException -> return@onErrorResumeNext Observable.error(Exception("server not fount"))
            is retrofit2.HttpException -> {
                val responseBody = e.response().errorBody()

                return@onErrorResumeNext Observable.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
            }
            is IOException -> return@onErrorResumeNext Observable.error(Exception("Network error"))
            else -> return@onErrorResumeNext Observable.error(e)
        }
    }


fun getErrorMessage(responseBody: ResponseBody): String? {
    return try {
        val jsonObject = JSONObject(responseBody.string())
        jsonObject.getString("message")
    } catch (e: Exception) {
        e.message
    }

}

class OfflineException : IOException() {
    override val message: String?
        get() = "no internet!"
}

Usage:

api.getUserList().handelNetworkError().subscribe { }