본문 바로가기
Language/Java

[Webflux]💙Mono와 Flux > Reactor > Reactive Streams 이해

by 카프리썬 2022. 2. 8.
728x90

 

이제 진짜 webflux 코드를 보면 낯설은 구문?문법?들이 너무 많다.

그냥 느낌적으로만 이해하고, 정확하게 어떤의미인진 몰라서 mono랑 flux 간단한 튜토리얼처럼 구현해보려고 했다.

그런데 어쩌다 Reactor를 알게 되고, 그 근본인 Reactive Streams까지 올라가버렸다..

근데 이런 흐름으로 이해하고 공부하는건 좋은거 같다.힘들지만 이런 흐름으로 이해하면 mono와 flux를 더 쉽게 이해할 수 있겟지ㅣ..

 

반응형

Webflux

Webflux로 구현한다고 하면, Mono와 Flux를 알아야한다. 대부분의 메소들이 Mono와 Flux로 반환하기 때문이다. 

 Mono와 Flux는 Reactor 라이브러리의 주요객체이다. 

그리고 Reactor 라이브러리는 Reactive Streams의 구현체로 Webflux에서 사용하는 라이브러리이다. 

 

결국 줄줄이 사탕처럼 webflux -> Mono와 Flux -> Reactor -> Reactive Streams 이렇게 계속 끝도 없이 파고들어간다..

그래서 먼저 근본이 되는 Reactive Streams부터 이해하면서 차근차근 올라가보자.

 

Reactive Streams

공식문서에는 아래와 같이 정의한다. 

Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking back pressure.

직역하면 논블로킹(Non-blocking) 백 프레셔(back pressure)를 이용한 비동기 데이터 처리의 표준. 

 

무슨말인지 1도 모르겠다..

키워드를 정리해보자면 "논블로킹/백프레셔/비동기처리/표준" 이정도 될텐데 여기에서 잘 설명해놨다 

논블로킹, 비동기에 대한 개념을 알려면 이글을 참고하라. 내가 정리해서 그런건지 이해하기 쉽다! 

2022.02.05 - ⚽️MVC와 WebFlux의 차이점, MVC가 '벽에 공 던지기'라면 WebFlux는..?

 

아무튼 Reactive Streams API 내부를 간단하게 살펴보면 아래와 같다.

public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s); //Subscriber를 받음
}
 
public interface Subscriber<T> {
   public void onSubscribe(Subscription s); //Subscription을 받음
   public void onNext(T t); //받은 데이터를 처리
   public void onError(Throwable t); //에러처리
   public void onComplete(); //작업완료시 사용
}
 
public interface Subscription {
   public void request(long n); //n개의 데이터요청
   public void cancel(); //구독 취소
}

크게 4가지 구성으로볼 수 있다. 약간 카프카때 했던 pub-sub 구조와 비슷하다. 

  • Publisher : 순차적 데이터를 생성하는 컴포넌트로, Subscriber의 구독을 받기 위한 subscribe API 하나만 있다. 
  • Subscriber : 순차적 데이터를 받아서 처리하는 컴포넌트로, 어떻게 처리해야할지에 따라 onNext,onError,onComplete가 있다.
  • Subscription : publisher가 생성한 구독정보 컴포넌트로, request에 의해 backpressure가 가능하다. 
  • 추가로, Processor: Publisher와 Subscriber의 미들웨어로, Reactor에서 제공하는 프로세이다. 

전체적인 flow는 아래와 같다.

출처&amp;nbsp;https://engineering.linecorp.com/ko/blog/reactive-streams-with-armeria-1/

1. Subscriber  -> Publisher :  구독요청을 한다

2. Publisher -> Subscriber :  Subscription 정보를 전달한다. 

이제 Subscription이 Subscriber 와 Publisher의 통신매개체가 된다.

3. Subscriber -> Subscription -> Publisher 

Subscriber가 Publisher한테 바로 요청하지 않고, Subscription의 requet함수를 통해 Publisher한테 전달한다. 

4. Publisher ->Subscription -> Subscriber

PublisherSubscriber한테 바로 응답하지 않고, Subscription을 통해 시그널(onNext,onComplete,onError 등)을 전달한다.

5. 이제 Subscriber, Subscription, Publisher 이 유기적으로 연결되서 구독요청부터 onComplete까지 백프레셔(?)가 완성

 

Reactor

기본적으로 Reactive Stream은 비동기 프로그래밍 표준을 목표로 하고 있다.

그래서 그 구현체인 Reactor 라이브러리 또한 비동기 지원을 위해 함수형 프로그래밍 형태로 구현이 되어 있다.

결국 Reactor의 가장 중요한 컴포넌트인 Flux와 Mono는 비동기 방식을 구현해주는 주요객체라고 볼 수 있다. 

 

Reactor를 사용하면 Publisher를 구현하고있는 Mono 와 Flux타입을 다양하게 구성할 수 있다. 

간단하게 Mono 와 Flux의 개념을 살펴보고가면, 

Mono는 0~1개의 결과를 주는 반면에 Flux는 1~N개의 결과를 주는 것이라고 볼 수 있다. 

 

예를 들어, HTTP요청은 항상 하나의 응답만 생성하기 때문에 응답개수에 따라서 구분할 필요가 없엇다.

이럴경우 하나의 응답결과만 받기 때문에 Mono<HttpRepsonse>이렇게 표현할 수 있다. 

 

이어서 공식문서에서 설명하는 대로 각각 어떻게 응답하는지 살펴보겠다.

Mono

최대1개의 응답을 만드는데 특화되어 있는 Publisher 구현체이다.

데이터의 전달처리가 완료되면 onComplete, 데이터를 전달하는 과정에서 오류가 발생하면 onError로 종료된다. 

하나의응답결과만 리턴하면 되기 때문에 별도로 값은 필요없고, 완료개념만 있으면 되는 비동기 처리도 표현할 수 있다. 

 

Flux

0개부터 N개까지 다수의 응답을 만드는데 특화되어 있는 Publisher 구현체이다.

0개 또는 1개로 명확하게 구분되는것은 mono를 사용해야한다.

onComplete나 onError가 되기 전까지는 무한 생성가능한 Stream으로 생각해야한다.

데이터를 전달할때마다 onNext 이벤트를 발생시키고, 

모든 데이터의 전달처리가 완료되면 onComplete, 데이터를 전달하는 과정에서 오류가 발생하면 onError로 종료된다. 

 

728x90

Flux 생성하기 

flux를 생성하는 가장 간단한 방법은 flux 클래스에서 제공하는 팩토리 메소드(Factory method)를 사용하는 것이다.

참고로, 팩토리 메소드는란 객체 생성을 반환하는 메소드이다. >> 팩토리 메소드 참고문서

 

대표적으로 아래와 같은 팩토리 메소드를 제공한다.

  • just
  • range
  • empty
  • fromArray, fromIterable, fromStream

just

string을 전달하는 flux 메소드이다.

이때 flux는 subscribe가 실행하기 전까지는 어떤일도 발생하지 않는다.

Publisher는 구독이 되어 있을 경우에만 데이터를 Subscriber에게 전달한다. 

즉,  Publisher의 subscribe() 메소드를 사용해야만 Publisher가 구독이 되어 있다고 인식한다. 

 

이때 subscribe()의 매개변수로 Consumer 함수를 가지고 데이터를 전달할 수 있다.

그래서 onNext()이벤트가 발생했을때 그 데이터를 가지고 어떤걸 실행할 수 있다. 

 

예를 들어 위의 코드에서는 Consumer 함수에 데이터를 하나식 받을때마다 system.out.print를 실행하고, 리스트에 넣었다.

 

그래서 로그를 찍어서 결과를 확인해보면

이벤트들은 OnSubscribe -> request- > onNext -> onNext -> onComplete로 실행이 된다.

 

range

int범위를 지정해서 순차적인 데이터를 전달해주는 flux 메소드이다.

main에서 호출하려고 Static 메소드로 만들어서 테스트해봣다. 

로그결과를 보면 아래처럼 1부터 5까지 데이터를 하나하나 전달하는 onNext()를 볼 수 수 있다.

예를 들어 위의 코드에서는 Consumer 함수에 데이터를 하나식 받을때마다 system.out.print를 실행하고, 리스트에 넣었다.

 

그래서 로그를 찍어서 결과를 확인해보면 이벤트들은
 OnSubscribe -> request- > onNext(1)->onNext(2)->onNext(3)->onNext(4)->onNext(5)-> onComplete
실행이 된다. 1부터 5까지 계속해서 연속적으로 데이터를 받아서 처리한 것이다. 

 

참고로, suscribe()를 실행하면서 매개변수가 consumer 함수 하나인 메소드를 실행할때 이 경우에 request(unbounded)가 실행된다. 

내부적으로 request(MAX)로 적용이 되면서 모든 데이터를 전달하라고 요청하는 것이다. 

 

 

empty

아무값도 전달하지 않는 empty flux메소드이다.

 

출력 결과를 보면 Empty flux값이기 때문에 매개변수로 add할 값이 없어서 안나온다

 

fromXXX(ex: fromArray, fromIterable)

Array, iteralbe, stream을 사용해서 데이터를 전달해주는 flux 메소드이다.

이미 생성되어 있는 데이터 구조를 그대로 가지고 와서 사용할 수 있다. 

 

fromarray 

fromarray에 들어갈 값이 array이여야한다.

fromiterable

fromiterable에 들어갈 값이 list이여야한다.

 

참고

https://brunch.co.kr/@springboot/154

반응형