سورس اپلیکیشن اندروید TODO (برنامه ریزی روزانه)
سورس اپلیکیشن اندروید TODO (برنامه ریزی روزانه)
2019-11-09
تمامی محصولات فایربیس برای اندروید
معرفی تمامی محصولات یا سرویس های فایربیس firebase برای اندروید
2019-11-16
مفاهیم Observable و Observer در RxJava

مفاهیم Observable و Observer در RxJava

مفاهیم Observable و Observer در RxJava ، ما چندین نوع از Observable ها یا انواع Observable در RxJava داریم که نحوه فراهم کردن داده توسط هرکدام از آنها متفاوت با دیگری ست.با اینکه تمامی Observable ها داده ها را منتشر یا emit میکنند اما بسته به استفاده مورد نیاز بهتر است که بهترین و نزدیک ترین گزینه را انتخاب کنیم تا کدهای بهینه تری داشته باشیم.

مفاهیم Observable و Observer در RxJava

ما در RxJava2 انواع Observable های زیر را داریم.

  • Observable
  • Single
  • Maybe
  • Flowable
  • Completable

در مقابل هرکدام از تولید کننده های داده فوق ، ما مشاهده گر یا observer های زیر را داریم

  • Observer
  • SingleObserver
  • MaybeObserver
  • CompletableObserver

هر Observeble یا تولید کننده داده در تعداد داده تولیدی با یکدیگر متفاوت است.
میتوانیم تفاوت را در جدول زیر مشاهده کنیم.

Observable و Observer در RxJava
Observable و Observer در RxJava

خب حالا بریم سراغ مقایسه های ملموس تر از مفهوم هر Observable تا دقیق تر متوجه تفاوت emit داده ها شویم.

مفهوم های Observable و Observer در RxJava

در RxJava میتوان گفت بیشترین استفاده به Observable تعلق میگیرد. Observable میتواند یک یا چند داده emit یا منتشر کند.همچنین میتوانیم یک لیست را بصورت یکجا و یکباره در Observable دریافت کنیم اما اگر میخواهید که یک یا تعدادی operator برروی داده های emit شده اعمال کنید بهتر است که بصورت تکی یا single داده ها را emit کنید.

 // emitting single Note
 Observable
// emitting list of notes at once, but in this case considering Single Observable is best option
 Observable>

import android.os.Bundle;
 import android.support.v7.app.AppCompatActivity;
 import android.util.Log;
import java.util.ArrayList;
 import java.util.List;
import info.androidhive.rxandroidexamples.R;
 import info.androidhive.rxandroidexamples.observers.model.Note;
 import io.reactivex.Observable;
 import io.reactivex.ObservableEmitter;
 import io.reactivex.ObservableOnSubscribe;
 import io.reactivex.Observer;
 import io.reactivex.android.schedulers.AndroidSchedulers;
 import io.reactivex.disposables.Disposable;
 import io.reactivex.schedulers.Schedulers;

public class ObserverActivity extends AppCompatActivity {
private static final String TAG = ObserverActivity.class.getSimpleName();
private Disposable disposable;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_observer);

    Observable<Note> notesObservable = getNotesObservable();

    Observer<Note> notesObserver = getNotesObserver();

    notesObservable.observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribeWith(notesObserver);
}

private Observer<Note> getNotesObserver() {
    return new Observer<Note>() {

        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
            disposable = d;
        }

        @Override
        public void onNext(Note note) {
            Log.d(TAG, "onNext: " + note.getNote());
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    };
}

private Observable<Note> getNotesObservable() {
    final List<Note> notes = prepareNotes();

    return Observable.create(new ObservableOnSubscribe<Note>() {
        @Override
        public void subscribe(ObservableEmitter<Note> emitter) throws Exception {
            for (Note note : notes) {
                if (!emitter.isDisposed()) {
                    emitter.onNext(note);
                }
            }

            // all notes are emitted
            if (!emitter.isDisposed()) {
                emitter.onComplete();
            }
        }
    });
}

private List<Note> prepareNotes() {
    List<Note> notes = new ArrayList<>();
    notes.add(new Note(1, "Buy tooth paste!"));
    notes.add(new Note(2, "Call brother!"));
    notes.add(new Note(3, "Watch Narcos tonight!"));
    notes.add(new Note(4, "Pay power bill!"));
    return notes;
}

@Override
protected void onDestroy() {
    super.onDestroy();
    disposable.dispose();
}
}

کلاس قالب

 public class Note {
     int id;
     String note;
// getters an setters
}

خروجی

onSubscribe
! onNext: Buy tooth paste
! onNext: Call brother
! onNext: Watch Narcos tonight
! onNext: Pay power bill
onComplete

Single و SingleObserver

برخلاف Observable در Single ما تنها یک داده را منتشر یا emit میکنیم و یا خطای رخ داده را اعلام میکنیم.یکی از موارد استفاده Single را میتوان زمان اتصال به وب سرویس ها دانست. چرا که در این درخواست ها تنها response گرفتن یا نگرفتن و مشاهده خطا در یک مرحله مدنظر است.

نکته
در مفهوم SingleObserver ما دیگر onNext نخواهیم داشت و در عوض آن ما از onSuccess برای گرفتن ریسپانس استفاده میکنیم

import android.os.Bundle;
 import android.support.v7.app.AppCompatActivity;
 import android.util.Log;
import info.androidhive.rxandroidexamples.R;
 import info.androidhive.rxandroidexamples.observers.model.Note;
 import io.reactivex.Single;
 import io.reactivex.SingleEmitter;
 import io.reactivex.SingleObserver;
 import io.reactivex.SingleOnSubscribe;
 import io.reactivex.android.schedulers.AndroidSchedulers;
 import io.reactivex.disposables.Disposable;
 import io.reactivex.schedulers.Schedulers;
public class SingleObserverActivity extends AppCompatActivity {
private static final String TAG = SingleObserverActivity.class.getSimpleName();
private Disposable disposable;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_single_observer);

    Single<Note> noteObservable = getNoteObservable();

    SingleObserver<Note> singleObserver = getSingleObserver();

    noteObservable
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(singleObserver);

}

private SingleObserver<Note> getSingleObserver() {
    return new SingleObserver<Note>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
            disposable = d;
        }

        @Override
        public void onSuccess(Note note) {
            Log.e(TAG, "onSuccess: " + note.getNote());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError: " + e.getMessage());
        }
    };
}

private Single<Note> getNoteObservable() {
    return Single.create(new SingleOnSubscribe<Note>() {
        @Override
        public void subscribe(SingleEmitter<Note> emitter) throws Exception {
            Note note = new Note(1, "Buy milk!");
            emitter.onSuccess(note);
        }
    });
}

@Override
protected void onDestroy() {
    super.onDestroy();
    disposable.dispose();
}
}

خروجی

onSubscribe
onSuccess: Buy milk!

Maybe و MaybeObserver

زمانی از Maybe Observable استفاده کنید که مطمئن نیستید داده ای حتما emit می شود.چرا که Maybe Observable زمانی استفاده می شود که داده ای که شما انتظارش را دارید که emit یا منتشر شود کاملا اختیاری یا Optional است.

import android.support.v7.app.AppCompatActivity;
 import android.os.Bundle;
 import android.util.Log;
import info.androidhive.rxandroidexamples.R;
 import info.androidhive.rxandroidexamples.observers.model.Note;
 import io.reactivex.Maybe;
 import io.reactivex.MaybeEmitter;
 import io.reactivex.MaybeObserver;
 import io.reactivex.MaybeOnSubscribe;
 import io.reactivex.android.schedulers.AndroidSchedulers;
 import io.reactivex.disposables.Disposable;
 import io.reactivex.schedulers.Schedulers;
public class MaybeObserverActivity extends AppCompatActivity {
private static final String TAG = MaybeObserverActivity.class.getSimpleName();
private Disposable disposable;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_maybe_observer);

    Maybe<Note> noteObservable = getNoteObservable();

    MaybeObserver<Note> noteObserver = getNoteObserver();

    noteObservable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(noteObserver);
}

private MaybeObserver<Note> getNoteObserver() {
    return new MaybeObserver<Note>() {
        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
        }

        @Override
        public void onSuccess(Note note) {
            Log.d(TAG, "onSuccess: " + note.getNote());
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete");
        }
    };
}

/**
 * Emits optional data (0 or 1 emission)
 * But for now it emits 1 Note always
 */
private Maybe<Note> getNoteObservable() {
    return Maybe.create(new MaybeOnSubscribe<Note>() {
        @Override
        public void subscribe(MaybeEmitter<Note> emitter) throws Exception {
            Note note = new Note(1, "Call brother!");
            if (!emitter.isDisposed()) {
                emitter.onSuccess(note);
            }
        }
    });
}

@Override
protected void onDestroy() {
    super.onDestroy();
    disposable.dispose();
}
}

Completable و CompletableObserver

زمانی از Completable Observable استفاده کنید که انتظار دریافت هیچ داده ای را ندارید. و تنها می خواهیم مطلع شوید که درخواست موفق بوده یا شکست خورده.از موارد استفاده Completable Observable میتوان به درخواست هایی مثل به روز رسانی داده های سرور با PUT اشاره کرد که نیازی به دریافت داده خاصی از وب سرویس نیست و تنها اطلاع از موفق بودن یا نبودن درخواست کافی ست.

import android.support.v7.app.AppCompatActivity;
 import android.os.Bundle;
 import android.util.Log;
import info.androidhive.rxandroidexamples.R;
 import info.androidhive.rxandroidexamples.observers.model.Note;
 import io.reactivex.Completable;
 import io.reactivex.CompletableEmitter;
 import io.reactivex.CompletableObserver;
 import io.reactivex.CompletableOnSubscribe;
 import io.reactivex.android.schedulers.AndroidSchedulers;
 import io.reactivex.disposables.Disposable;
 import io.reactivex.schedulers.Schedulers;
public class CompletableObserverActivity extends AppCompatActivity {
private static final String TAG = CompletableObserverActivity.class.getSimpleName();
private Disposable disposable;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_completable_observer);

    Note note = new Note(1, "Home work!");

    Completable completableObservable = updateNote(note);

    CompletableObserver completableObserver = completableObserver();

    completableObservable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(completableObserver);
}


/**
 * Assume this making PUT request to server to update the Note
 */
private Completable updateNote(Note note) {
    return Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(CompletableEmitter emitter) throws Exception {
            if (!emitter.isDisposed()) {
                Thread.sleep(1000);
                emitter.onComplete();
            }
        }
    });
}

private CompletableObserver completableObserver() {
    return new CompletableObserver() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
            disposable = d;
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: Note updated successfully!");
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }
    };
}

@Override
protected void onDestroy() {
    super.onDestroy();
    disposable.dispose();
}
}

خروجی

onSubscribe
! onComplete: Note updated successfully

Flowable و Observer

با استفاده از Flowable observable شما می توانید مقدار بسیار بیشتری از آنچه Observer ها میتوانند بررسی کنند را مدیریت کنید.طبق داکیومنت اصلی ، Flowable میتواند برای سورس هایی که داده ی تولیدی آنها بیش از 10K است مورد استفاده قرار گیرد.

 import android.os.Bundle;
 import android.support.v7.app.AppCompatActivity;
 import android.util.Log;
import info.androidhive.rxandroidexamples.R;
 import io.reactivex.Flowable;
 import io.reactivex.SingleObserver;
 import io.reactivex.android.schedulers.AndroidSchedulers;
 import io.reactivex.disposables.Disposable;
 import io.reactivex.functions.BiFunction;
 import io.reactivex.schedulers.Schedulers;
public class FlowableObserverActivity extends AppCompatActivity {
private static final String TAG = FlowableObserverActivity.class.getSimpleName();
private Disposable disposable;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_flowable_observer);

    Flowable<Integer> flowableObservable = getFlowableObservable();

    SingleObserver<Integer> observer = getFlowableObserver();

    flowableObservable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .reduce(0, new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer result, Integer number) {
                    //Log.e(TAG, "Result: " + result + ", new number: " + number);
                    return result + number;
                }
            })
            .subscribe(observer);
}

private SingleObserver<Integer> getFlowableObserver() {
    return new SingleObserver<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
            disposable = d;
        }

        @Override
        public void onSuccess(Integer integer) {
            Log.d(TAG, "onSuccess: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }
    };
}

private Flowable<Integer> getFlowableObservable() {
    return Flowable.range(1, 100);
}

@Override
protected void onDestroy() {
    super.onDestroy();
    disposable.dispose();
}
}
}

خروجی

onSubscribe
onSuccess: 5050

امیدوارم مقاله مفهوم های Observable و Observer در RxJava برای شما مفید بوده باشد.

پاسخی بگذارید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *