본문 바로가기

안드로이드/ReactiveX

RxJava 기초

우선 app모듈의 build.gradle에 라이브러리를 추가합니다.

implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'io.reactivex.rxjava2:rxjava:2.x.x'

https://github.com/ReactiveX/RxAndroid 최신버전은 여기서 볼 수 있습니다.

 

rxjava는 보통 람다식과 함께 사용합니다.

람다식을 쓰려면 다음과 같이 설정해줘야 합니다.

람다식을 쓰면,

 

로 표현할 수 있습니다.

매개변수 -> 리턴값으로 이해하면 됩니다.

Observable.map(s -> MainActivity.someThing(s))

 

Observable.map(MainActivity::someThing)

도 가능합니다.

● Rxjava의 기본적인 형태는 데이터를 받고 변형시키고 구독하는 것입니다.

<데이터를 받고>

 

Observable.just(): 데이터를 추가해줍니다.

예) Observable.just(2, 4, 5, 6) Observable.just(textView.getText().toString())

Observable.range(1, 9)는 숫자를 1부터 9 순서로 데이터가 들어갑니다.

Observable.doOnNext()은 받은 데이터를 실행하는 것입니다. 데이터를 변형하기 전의 subscibe의 onNext라고 생각하면 됩니다.

예) Observable.doOnNext(s -> Log.d("test", s))

Observable.create()

Observable observable = Observable.create(emitter -> {
		emitter.onNext("첫 번째");
		emitter.onNext("두 번째");
		emitter.onComplete();
	}
);
observable.subscribe();

onNext() - 다음에 나올 데이터 , onError() - 오류, onCompleted() - 완료 를 설정해줄 수 있다.

Observable.defer()

Observable observable = Observable.defer(() -> {
	String test = "test";
	return Observable.just(test);
});
observable.subscribe();

Observable객체를 리턴받는다.

Observable.fromCallable()

Observable observable = Observable.fromCallable(() -> 3);
observable.subscribe(s -> Log.d("첫번 째 observer", s + ": 성공"));

observable.defer()과 비슷하지만 Observable로 리턴받지 않아도 된다.

<변형>

 

Observable.filter(): 데이터를 걸러줍니다.

예) Observable.filter(s -> s <10): 10이하가 아니면 에러 처리가 됩니다.

Obsevable.map(): 데이터를 변형시켜줍니다.

예) Observable.map(s -> s + 3)

Observable.flatMap: 데이터를 변형시켜줍니다. Observable 객체 형태로 리턴해줘야 합니다.

예) Observable.flatMap(s -> Observable.just(s + 3))

Observable.flatMap(a -> Observable.just(" test"),

(s, a) -> s + a) // 만약 데이터가 Rxjava였다면, Rxjava test가 됩니다.

Observable.flatMap(a -> Observable.just(" test"), 
	(s, a) -> s + a)				// 만약 데이터가 Rxjava였다면, Rxjava test가 됩니다.

<구독>

 

Observable.subscribeOn(): Observable의 작업을 시작할 Thread를 정합니다. 여러 번 선언되었을 경우, 처음에 선언한 것으로 설정되고 이후에 선언한 것은 무시됩니다.

Observable.observeOn(): 이 연산자가 선언된 이후에, Observable이 처리될 Thread를 정합니다. 여러 번 선언될 수 있으며, 선언될 때마다 이후의 Thread가 바뀝니다. 보통 observeOn()은 subscribe연산자를 MainThread에서 처리되도록 사용합니다.(결과를 보여주기 위해)

Observable.subscribe(): 구독합니다.

subscribe(Consumer<? super T> onNext)
subscribe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError)
subscribe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError, Action onComplete)

onNext는 어떻게 실행할 건지를 정의하는 부분

onError는 에러가 났을 때

onComplete은 데이터를 다 받고 나서

-Observable을 subscribe하지 않을 시에는 단순히 정의한 형태입니다. subscribe를 해야지 실행이 됩니다.

● Observable의 여러 형태

<BlockingObservable>

구독을 하지 않더라도 BlockingObservable형태로 실행이 가능합니다.

toBlocking()은 Observable 객체를 BlockingObservable 객체로 변형시켜서 동기적으로 실행할 수 있도록 준비합니다.

single()은 변형이 완료된 데이터를 리턴해줍니다.

예) Object result = observable.toBlocking().single()

만약 observable이 담는 데이터가 여러 개라면,

예) Object result = observable.toList().toBlocking().single();

<Single>

Observable은 임의의 연속된 값을 배출하지만, Single같은 경우에는 한 가지 값이나 오류알림 둘 중 하나만 배출한다.

Single을 구독할 때, 다음의 두 메서드를 사용할 수 있다:

onSuccess: Single은 자신이 배출하는 하나의 값을 이 메서드를 통해 배출한다.

onError: Single은 항목을 배출할 수 없을 때 이 메서드를 통해 Throwable 객체를 전달한다.

예) Single.just("테스트").subscribe(s -> onSuccess(s), throwable -> onError(throwable))

<Completable>

Completable은 onCompleted()와 onError(throwable)만을 가진다. 성공했는지 실패했는지만 알 수 있다.

반환되는 결과가 없을 경우, 사용한다.

예) Completable.just("테스트").subscibe(() -> onCompleted(), throwable -> onError(throwable))

<Maybe>

Maybe는 하나의 아이템을 방출하거나, 아무것도 방출하지 않거나 오류를 방출할 수 있다. Single과 Completable을 합쳐놓은 것 같다.

예) Maybe.just("테스트").subscribe(s -> onSuccess(s), throwable -> onError(throwable), () -> onComplete())

참고:

https://brunch.co.kr/@yudong/33

https://tmondev.blog.me/220330530620?Redirect=Log&from=postView

http://gaemi.github.io/rxjava/2016/12/02/async-task-with-rxjava.html

'안드로이드 > ReactiveX' 카테고리의 다른 글

RxJava - Scheduler 연습  (0) 2020.03.20
뜨거운/차가운 Observable  (0) 2020.03.20
RxJava - Scheduler  (0) 2020.03.20