시냅스

Spring WebFlux 이해하기 - Reactor 본문

Java, Spring

Spring WebFlux 이해하기 - Reactor

ted k 2024. 2. 3. 17:19
Spring WebFlux 의 작동방식을 이해하기 위해 단계별로 진행하는 포스팅입니다.
이 글에서는 Reactor 에 대해 설명합니다.

 

이전 글 : https://liltdevs.tistory.com/209

 

Spring WebFlux 이해하기 - Reactive Streams

Spring WebFlux 의 작동방식을 이해하기 위해 단계별로 진행하는 포스팅입니다. 이 글에서는 Reactive Streams 에 대해 설명합니다. Reactive Programming 리액티브 프로그래밍은 빠른 반응을 하고자 하는 시스

liltdevs.tistory.com

 

이전 글에서는 Reactive Streams 에 대하여 알아보았습니다.

이번에는 Spring WebFlux 에서 사용하는 Reactive Streams 의 구현체인 Reactor 에 대해서 알아보도록 하겠습니다.

 

 

Reactor

Reactor is fully non-blocking and provides efficient demand management. It directly interacts with Java's functional API, CompletableFuture, Stream, and Duration.

Well-suited for a microservices architecture, Reactor offers backpressure-ready network engines for HTTP
(including Websockets), TCP, and UDP.

Reactor offers two reactive and composable APIs, Flux [N] and Mono [0|1], which extensively implement Reactive Extensions

요약하자면, Java 의 API 들과 호환이 가능하고 MSA 환경에서의 Network IO 에 backpressure 를 제공하고

Flux, Mono 라고 하는 객체를 제공한다고 합니다.

 

설명하는 것처럼 Reactor 는 JVM 환경에서 Reactive Programming 을 지원하기 위한 라이브러리 입니다.

Spring 5 이후부터는 Reactive 스택에 포함되어

Spring WebFlux 기반의 리액티브 애플리케이션을 제작하기 위한 핵심 역할을 담당합니다.

 

핵심이라고 할 수 있는 Reactor Core 는 Flux / Mono 라는 주요 API 를 제공합니다.

Flux / Mono 는 전 글에서 살펴본 Publisher 의 구현체로 데이터를 발행하는 주체입니다.

Flux 는 여러개의 데이터를 emit(방출) 할 때 사용하고,

Mono 는 0 또는 1 개의 데이터를 emit 할 때 사용됩니다.

Flux 와 Mono 는 당연히 Publisher 이므로 데이터 emit 과정에서 비동기적 수행 / 데이터 가공 및 변환 을 수행할 수 있습니다.

 

아래에서 예시코드들과 함께 살펴보겠습니다.

  

 

Mono

Mono.just("Hello Mono") // Hello Mono 라는 데이터를 emit
        .map(String::toUpperCase) // 데이터를 대문자로 변환
        .subscribe(System.out::println); // Hello Mono -> HELLO MONO

Mono.empty() // empty 는 대체로 작업을 통해 데이터를 전달받을 필요는 없지만 작업이 끝났음을 알리고 이에 따른 후처리를 하고 싶을 때 사용한다.
        .subscribe(
                none -> System.out.println("emitted onNext signal"), // onNext
                error -> {},  // onError
                () -> System.out.println("emitted onComplete signal") // onComplete
                // data 를 emit 하지 않고 onComplete signal 만 emit 한다.
        );
        
Mono.zip( // 두 개의 Mono 를 결합하여 Tuple2 를 emit
        Mono.just(true),
        Mono.just("Hello")
).doOnNext(data -> { // onNext signal 을 emit 할 때 호출
        Tuple2<Boolean, String> now = data;
        Boolean t1 = now.getT1();
        String t2 = now.getT2();
        System.out.println("t1 = " + t1);
        System.out.println("t2 = " + t2);
    }).subscribe();

 

Mono 는 0 또는 1 개의 데이터를 emit 할 수 있는 Publisher 입니다.

Mono.just 로 데이터를 emit 하거나 Mono.empty 로 데이터를 반환하지 않을 수 있습니다.

만약 Mono.empty 를 수행한다면 어떤 데이터도 emit 하지 않았으므로 subscribe 하는 쪽에서는 onComplete 만 수행하게 됩니다.

 

또한 Reactor 는 Mono 에서 여러개의 데이터를 emit 해야될 때 Tuple 이라는 자료구조를 지원합니다.

.zip Operator 를 사용하여 여러개의 데이터를 Tuple에 조합해 하나의 데이터인 것 처럼 사용할 수 있게 지원합니다.

 

Flux

Flux.just(6, 9, 13)
        .map(num -> num % 2) // 2로 나눈 나머지 값으로 변환
        .subscribe(System.out::println);

Flux.fromArray(new Integer[] {3, 6, 7, 9})
        .filter(num -> num > 6) // 6보다 큰 값만 통과
        .map(num -> num * 2) // 2를 곱함
        .subscribe(System.out::println);
        
Flux<String> stringFlux = Mono.justOrEmpty("Steve") // Mono 로 데이터 1개를 emit
        .concatWith(Mono.justOrEmpty("Jobs")); // 다른 Mono 를 연결하여 데이터를 emit

stringFlux.subscribe(System.out::println); // Steve, Jobs

Mono<List<String>> listMono = Flux.concat(
                Flux.just("alpha", "bravo", "charlie"),
                Flux.just("delta", "echo", "foxtrot")
        )
        .collectList(); // Flux 에서 emit 된 데이터를 List 로 변환
listMono.subscribe(System.out::println); // [alpha, bravo, charlie, delta, echo, foxtrot]

 

Flux.just 로 Data Stream 을 만들어 냅니다.

그 이후 filter / map 과 같이 Operator 를 사용하여 데이터를 가공 및 변형할 수 있습니다.

Java 8 의 Stream API 와 유사합니다.

또한 Flux 는 Data 여러개를 emit 할 수 있는 Publisher 이므로 Mono 나 Flux 로 병합하여 새로운 Flux 를 만들어 낼 수도 있습니다.

 

Flux 와 Mono 에 대해 간략히 알아보았으니, 아래에서 조금 더 확장된 기능을 알아보도록 하겠습니다.

 

 

Operator

Operator 는 Reactor 에서 사용하는 Processor 입니다.

이는 데이터 스트림을 처리하고 변환하는 데 사용되는 함수입니다.

Flux / Mono 에 적용 가능하고 Marble Diagram 이라는 도표로 설명합니다.

 

 

map은 데이터를 map 에 정의한대로 변환한 후 다음 publisher 에 전달합니다.

알고있는 Stream API와 비슷합니다.

 

 

 

flatMap은 Upstream 에서 emit 된 데이터가 Inner Sequence 에서 평탄화 작업을 거치면서

하나의 Sequence 로 병합되어 Downstream 으로 emit 됩니다.

Mono.just("Hello") // Mono<String>
    .flatMap(s -> Mono.just(s + " Mono!")) // Mono<Mono<String>> -> Mono<String>
    .subscribe(System.out::println); // Hello Mono!

 

위에서 설명을 난해하게 했으나, 결국 이중화되어 있는 Mono / Flux 를 풀어준다고 보시면 됩니다.

2차원 배열을 1차원으로 만드는 Stream API 의 flatMap 과 비슷합니다.

 

 

 

concat은 paramter 로 입력되는 Publisher 의 Sequence 를 연결해서 데이터를 순차적으로 emit합니다.

두개의 Publisherr 를 concat 하는 Operator 입니다.

 

 

 

defer는 subscribe 호출 시점에 emit 하는 operator 입니다.

lazy 한 동작으로 꼭 필요한 시점에 데이터를 emit 하여 불필요한 프로세스를 줄여줍니다.

        log.info(" # start {}", LocalDateTime.now());

        Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
        Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now()));

        Thread.sleep(2000L);

        justMono.subscribe(t -> log.info("justMono: {}", t));
        deferMono.subscribe(t -> log.info("deferMono: {}", t));
//        12:17:57.881 [main] INFO prac.DeferPrac -- justMono: 2024-01-28T12:17:55.833663
//        12:17:57.881 [main] INFO prac.DeferPrac -- deferMono: 2024-01-28T12:17:57.881499

        Thread.sleep(2000L);

        justMono.subscribe(t -> log.info("justMono: {}", t));
        deferMono.subscribe(t -> log.info("deferMono: {}", t));
//        12:17:59.885 [main] INFO prac.DeferPrac -- justMono: 2024-01-28T12:17:55.833663
//        12:17:59.886 [main] INFO prac.DeferPrac -- deferMono: 2024-01-28T12:17:59.886306

 

위의 수행 결과 처럼 just 에서 subscribe 한 데이터와 시차가 발생된 로그가 찍히는 것을 확인하실 수 있습니다.

subscribe 를 하는 시점에 데이터를 생성하여 emit 합니다.

 

추가로

  • fromIterable : iterable 로부터 publisher 생성
  • justOrEmpty : data or null -> data 가 null 이어도 NPE 발생하지 않고 onComplete 호출
  • using : 파라미터로 전달받은 resource 를 emit 하는 Flux 를 생성
  • generate : 프로그래밍 방식으로 signal 이벤트를 발생시키며 파라미터로 전달받은 state를 이용해서 데이터를 생성하는 Flux 를 생성
  • skip : 파라미터로 전달받은 개수만큼 데이터를 skip 하고 나머지 데이터를 emit
  • take / takeLast : 파라미터로 전달받은 개수만큼 데이터를 emit 하고 onComplete
  • next : upstream 에서 emit 되는 데이터 중에서 첫 번째 데이터만 Downstream 으로 emit

등의 Operator 가 있습니다.

자세한 내용은 아래의 링크에서 확인하실 수 있습니다.

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

 

 

Scheduler

Reactor 의 Scheduler 는 병렬 처리와 비동기 처리를 지원하는 컴포넌트입니다.

성능 최적화와 리소스 관리를 위한 도구로서 사용됩니다.

 

Scheduler 를 사용하여 어떤 스레드에서 무엇을 처리할지 제어할 수 있습니다.

이런 동작은 동시에 여서 작업을 처리하게 하여 병렬성을 증가시켜 시스템의 응답성을 높여줍니다.

대용량 데이터를 처리하거나, 동시에 여러 요청을 처리해야하는 서버 애플리케이션에서 유용하게 사용할 수 있습니다.

또한 리소스 관리를 효율적으로 활용하여 애플리케이션 전반의 성능을 향상시키는 데에 도움이 될 수 있습니다.

 

아래에서 코드와 함께 살펴보도록 하겠습니다.

 

Flux.fromArray(new Integer[]{1, 3, 5, 7})
        .doOnNext(data -> log.info("# doOnNext: {}", data))
        .publishOn(Schedulers.parallel()) // 이후 연산을 수행할 스레드를 지정
        .filter(data -> data > 3) // parallel-2
        .doOnNext(data -> log.info("# doOnNext filtered: {}", data))
        .publishOn(Schedulers.parallel()) // 이후 연산을 수행할 스레드를 지정
        .map(data -> data * 10) // parallel-1
        .doOnNext(data -> log.info("# doOnNext mapped: {}", data))
        .subscribe(data -> log.info("# onNext: {}", data));

//        21:46:07.260 [main] INFO prac.PublishOnPrac -- # doOnNext: 1
//        21:46:07.262 [main] INFO prac.PublishOnPrac -- # doOnNext: 3
//        21:46:07.262 [main] INFO prac.PublishOnPrac -- # doOnNext: 5
//        21:46:07.262 [main] INFO prac.PublishOnPrac -- # doOnNext: 7
//        21:46:07.262 [parallel-2] INFO prac.PublishOnPrac -- # doOnNext filtered: 5
//        21:46:07.262 [parallel-2] INFO prac.PublishOnPrac -- # doOnNext filtered: 7
//        21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # doOnNext mapped: 50
//        21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # onNext: 50
//        21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # doOnNext mapped: 70
//        21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # onNext: 70

 

publishOn() 은 Downstream 으로 signal 을 전송할 때 실행되는 스레드를 제어하는 역할을 하는 Operator 입니다.

즉 publishOn 이후의 Operator sequence 에 대해 어떤 스케줄러를 사용할지 결정합니다.

 

  

Flux.fromArray(new Integer[]{1, 3, 5, 7})
        .doOnNext(data -> log.info("# doOnNext: {}", data)) // boundedElastic-1
        .subscribeOn(Schedulers.boundedElastic()) // 구독이 발생한 직후 실행될 스레드를 지정
        .filter(data -> data > 3) // boundedElastic-1, 따로 지정하지 않으면 위의 subscribeOn 스레드에서 실행
        .doOnNext(data -> log.info("# doOnNext filtered: {}", data))
        .publishOn(Schedulers.parallel())
        .map(data -> data * 10) // 위에서 parallel 로 스레드를 변경했기 때문에 parallel-1
        .doOnNext(data -> log.info("# doOnNext mapped: {}", data))
        .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
        
//        21:48:32.620 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 1
//        21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 3
//        21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 5
//        21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext filtered: 5
//        21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 7
//        21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext filtered: 7
//        21:48:32.622 [parallel-1] INFO prac.PubOnSubOnPrac -- # doOnNext mapped: 50
//        21:48:32.622 [parallel-1] INFO prac.PubOnSubOnPrac -- # onNext: 50
//        21:48:32.622 [parallel-1] INFO prac.PubOnSubOnPrac -- # doOnNext mapped: 70
//        21:48:32.623 [parallel-1] INFO prac.PubOnSubOnPrac -- # onNext: 70

 

subscribeOn() 으로 소스에서 데이터를 생성할 때 사용할 스케줄러를 결정합니다.

즉, 구독이 시작되는 시점부터 데이터의 생성과 초기 처리가 이루어지는 스레드를 지정합니다.

여러 subscribeOn 이 연속해서 호출되더라도 첫 번째 subscribeOn 의 스케줄러만 고려됩니다.

 

subscribeOn 과 publishOn 을 고루 활용하면 CPU-bound 작업과 IO-bound 작업 등을

적절한 스레드에서 처리하도록 스케쥴링하여 시스템의 병렬 처리 성능을 최적화할 수 있습니다.

 

스케쥴러의 종류는 다음과 같습니다.

  • immediate : 현재 스레드에서 실행
  • single : 단일 스레드에서 실행
  • newSingle : 새로운 스레드에서 실행
  • boundedElastic
    • 스레드 풀에서 실행(재사용 가능)
    • CPU 코어 수 * 10개의 스레드 생성
    • 이용가능한 스레드가 생길 때까지 최대 100,000 개의 작업이 큐에서 대기할 수 있음
    • Blocking IO 데이터를 처리하기에 유용
  • parallel
    • CPU 코어 수만큼 스레드 생성
    • Non-Blocking IO 데이터를 처리하기에 유용
  • fromExcutorService
    • ExecutorService 를 이용하여 스레드 생성
    • Reactor 에서는 이 방식을 권장하지 않음

 

 

이번 글에서는 Reactor 라이브러리와 핵심 요소인 Flux / Mono, Scheduler 에 대해서 알아보았습니다.

이외에도 Reactor 는 Cold / Hot Sequence, Backpressure, Sinks, Context 등을 제공합니다.

위에 대해서는 다음번에 다뤄보도록 하겠습니다.

다음 글은 실제로 Spring WebFlux 는 어떻게 구현되어있는지 코드를 통해 확인해보도록 하겠습니다.

 

끝!

 

 

참고

https://projectreactor.io/

 

Comments