일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- IT
- MySQL
- spring webflux
- Spring
- redis
- C
- 컴퓨터구조
- 디자인 패턴
- Galera Cluster
- 파이썬
- Algorithm
- Kafka
- 운영체제
- 자료구조
- Heap
- OS
- Java
- JPA
- react
- MSA
- mongoDB
- JavaScript
- 자바
- design pattern
- 백준
- 네트워크
- 알고리즘
- Data Structure
- Proxy
- c언어
- Today
- Total
시냅스
Spring WebFlux 이해하기 - Reactor 본문
Spring WebFlux 의 작동방식을 이해하기 위해 단계별로 진행하는 포스팅입니다.
이 글에서는 Reactor 에 대해 설명합니다.
이전 글 : https://liltdevs.tistory.com/209
이전 글에서는 Reactive Streams 에 대하여 알아보았습니다.
이번에는 Spring WebFlux 에서 사용하는 Reactive Streams 의 구현체인 Reactor 에 대해서 알아보도록 하겠습니다.
Reactor
Reactor is fully non-blocking and provides efficient demand management. It directly interacts with Java's functional API, CompletableFuture, Stream, and Duration.
Well-suited for a microservices architecture, Reactor offers backpressure-ready network engines for HTTP
(including Websockets), TCP, and UDP.
Reactor offers two reactive and composable APIs, Flux [N] and Mono [0|1], which extensively implement Reactive Extensions
요약하자면, Java 의 API 들과 호환이 가능하고 MSA 환경에서의 Network IO 에 backpressure 를 제공하고
Flux, Mono 라고 하는 객체를 제공한다고 합니다.
설명하는 것처럼 Reactor 는 JVM 환경에서 Reactive Programming 을 지원하기 위한 라이브러리 입니다.
Spring 5 이후부터는 Reactive 스택에 포함되어
Spring WebFlux 기반의 리액티브 애플리케이션을 제작하기 위한 핵심 역할을 담당합니다.
핵심이라고 할 수 있는 Reactor Core 는 Flux / Mono 라는 주요 API 를 제공합니다.
Flux / Mono 는 전 글에서 살펴본 Publisher 의 구현체로 데이터를 발행하는 주체입니다.
Flux 는 여러개의 데이터를 emit(방출) 할 때 사용하고,
Mono 는 0 또는 1 개의 데이터를 emit 할 때 사용됩니다.
Flux 와 Mono 는 당연히 Publisher 이므로 데이터 emit 과정에서 비동기적 수행 / 데이터 가공 및 변환 을 수행할 수 있습니다.
아래에서 예시코드들과 함께 살펴보겠습니다.
Mono
Mono.just("Hello Mono") // Hello Mono 라는 데이터를 emit
.map(String::toUpperCase) // 데이터를 대문자로 변환
.subscribe(System.out::println); // Hello Mono -> HELLO MONO
Mono.empty() // empty 는 대체로 작업을 통해 데이터를 전달받을 필요는 없지만 작업이 끝났음을 알리고 이에 따른 후처리를 하고 싶을 때 사용한다.
.subscribe(
none -> System.out.println("emitted onNext signal"), // onNext
error -> {}, // onError
() -> System.out.println("emitted onComplete signal") // onComplete
// data 를 emit 하지 않고 onComplete signal 만 emit 한다.
);
Mono.zip( // 두 개의 Mono 를 결합하여 Tuple2 를 emit
Mono.just(true),
Mono.just("Hello")
).doOnNext(data -> { // onNext signal 을 emit 할 때 호출
Tuple2<Boolean, String> now = data;
Boolean t1 = now.getT1();
String t2 = now.getT2();
System.out.println("t1 = " + t1);
System.out.println("t2 = " + t2);
}).subscribe();
Mono 는 0 또는 1 개의 데이터를 emit 할 수 있는 Publisher 입니다.
Mono.just 로 데이터를 emit 하거나 Mono.empty 로 데이터를 반환하지 않을 수 있습니다.
만약 Mono.empty 를 수행한다면 어떤 데이터도 emit 하지 않았으므로 subscribe 하는 쪽에서는 onComplete 만 수행하게 됩니다.
또한 Reactor 는 Mono 에서 여러개의 데이터를 emit 해야될 때 Tuple 이라는 자료구조를 지원합니다.
.zip Operator 를 사용하여 여러개의 데이터를 Tuple에 조합해 하나의 데이터인 것 처럼 사용할 수 있게 지원합니다.
Flux
Flux.just(6, 9, 13)
.map(num -> num % 2) // 2로 나눈 나머지 값으로 변환
.subscribe(System.out::println);
Flux.fromArray(new Integer[] {3, 6, 7, 9})
.filter(num -> num > 6) // 6보다 큰 값만 통과
.map(num -> num * 2) // 2를 곱함
.subscribe(System.out::println);
Flux<String> stringFlux = Mono.justOrEmpty("Steve") // Mono 로 데이터 1개를 emit
.concatWith(Mono.justOrEmpty("Jobs")); // 다른 Mono 를 연결하여 데이터를 emit
stringFlux.subscribe(System.out::println); // Steve, Jobs
Mono<List<String>> listMono = Flux.concat(
Flux.just("alpha", "bravo", "charlie"),
Flux.just("delta", "echo", "foxtrot")
)
.collectList(); // Flux 에서 emit 된 데이터를 List 로 변환
listMono.subscribe(System.out::println); // [alpha, bravo, charlie, delta, echo, foxtrot]
Flux.just 로 Data Stream 을 만들어 냅니다.
그 이후 filter / map 과 같이 Operator 를 사용하여 데이터를 가공 및 변형할 수 있습니다.
Java 8 의 Stream API 와 유사합니다.
또한 Flux 는 Data 여러개를 emit 할 수 있는 Publisher 이므로 Mono 나 Flux 로 병합하여 새로운 Flux 를 만들어 낼 수도 있습니다.
Flux 와 Mono 에 대해 간략히 알아보았으니, 아래에서 조금 더 확장된 기능을 알아보도록 하겠습니다.
Operator
Operator 는 Reactor 에서 사용하는 Processor 입니다.
이는 데이터 스트림을 처리하고 변환하는 데 사용되는 함수입니다.
Flux / Mono 에 적용 가능하고 Marble Diagram 이라는 도표로 설명합니다.
map은 데이터를 map 에 정의한대로 변환한 후 다음 publisher 에 전달합니다.
알고있는 Stream API와 비슷합니다.
flatMap은 Upstream 에서 emit 된 데이터가 Inner Sequence 에서 평탄화 작업을 거치면서
하나의 Sequence 로 병합되어 Downstream 으로 emit 됩니다.
Mono.just("Hello") // Mono<String>
.flatMap(s -> Mono.just(s + " Mono!")) // Mono<Mono<String>> -> Mono<String>
.subscribe(System.out::println); // Hello Mono!
위에서 설명을 난해하게 했으나, 결국 이중화되어 있는 Mono / Flux 를 풀어준다고 보시면 됩니다.
2차원 배열을 1차원으로 만드는 Stream API 의 flatMap 과 비슷합니다.
concat은 paramter 로 입력되는 Publisher 의 Sequence 를 연결해서 데이터를 순차적으로 emit합니다.
두개의 Publisherr 를 concat 하는 Operator 입니다.
defer는 subscribe 호출 시점에 emit 하는 operator 입니다.
lazy 한 동작으로 꼭 필요한 시점에 데이터를 emit 하여 불필요한 프로세스를 줄여줍니다.
log.info(" # start {}", LocalDateTime.now());
Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now()));
Thread.sleep(2000L);
justMono.subscribe(t -> log.info("justMono: {}", t));
deferMono.subscribe(t -> log.info("deferMono: {}", t));
// 12:17:57.881 [main] INFO prac.DeferPrac -- justMono: 2024-01-28T12:17:55.833663
// 12:17:57.881 [main] INFO prac.DeferPrac -- deferMono: 2024-01-28T12:17:57.881499
Thread.sleep(2000L);
justMono.subscribe(t -> log.info("justMono: {}", t));
deferMono.subscribe(t -> log.info("deferMono: {}", t));
// 12:17:59.885 [main] INFO prac.DeferPrac -- justMono: 2024-01-28T12:17:55.833663
// 12:17:59.886 [main] INFO prac.DeferPrac -- deferMono: 2024-01-28T12:17:59.886306
위의 수행 결과 처럼 just 에서 subscribe 한 데이터와 시차가 발생된 로그가 찍히는 것을 확인하실 수 있습니다.
subscribe 를 하는 시점에 데이터를 생성하여 emit 합니다.
추가로
- fromIterable : iterable 로부터 publisher 생성
- justOrEmpty : data or null -> data 가 null 이어도 NPE 발생하지 않고 onComplete 호출
- using : 파라미터로 전달받은 resource 를 emit 하는 Flux 를 생성
- generate : 프로그래밍 방식으로 signal 이벤트를 발생시키며 파라미터로 전달받은 state를 이용해서 데이터를 생성하는 Flux 를 생성
- skip : 파라미터로 전달받은 개수만큼 데이터를 skip 하고 나머지 데이터를 emit
- take / takeLast : 파라미터로 전달받은 개수만큼 데이터를 emit 하고 onComplete
- next : upstream 에서 emit 되는 데이터 중에서 첫 번째 데이터만 Downstream 으로 emit
등의 Operator 가 있습니다.
자세한 내용은 아래의 링크에서 확인하실 수 있습니다.
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
Scheduler
Reactor 의 Scheduler 는 병렬 처리와 비동기 처리를 지원하는 컴포넌트입니다.
성능 최적화와 리소스 관리를 위한 도구로서 사용됩니다.
Scheduler 를 사용하여 어떤 스레드에서 무엇을 처리할지 제어할 수 있습니다.
이런 동작은 동시에 여서 작업을 처리하게 하여 병렬성을 증가시켜 시스템의 응답성을 높여줍니다.
대용량 데이터를 처리하거나, 동시에 여러 요청을 처리해야하는 서버 애플리케이션에서 유용하게 사용할 수 있습니다.
또한 리소스 관리를 효율적으로 활용하여 애플리케이션 전반의 성능을 향상시키는 데에 도움이 될 수 있습니다.
아래에서 코드와 함께 살펴보도록 하겠습니다.
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data))
.publishOn(Schedulers.parallel()) // 이후 연산을 수행할 스레드를 지정
.filter(data -> data > 3) // parallel-2
.doOnNext(data -> log.info("# doOnNext filtered: {}", data))
.publishOn(Schedulers.parallel()) // 이후 연산을 수행할 스레드를 지정
.map(data -> data * 10) // parallel-1
.doOnNext(data -> log.info("# doOnNext mapped: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
// 21:46:07.260 [main] INFO prac.PublishOnPrac -- # doOnNext: 1
// 21:46:07.262 [main] INFO prac.PublishOnPrac -- # doOnNext: 3
// 21:46:07.262 [main] INFO prac.PublishOnPrac -- # doOnNext: 5
// 21:46:07.262 [main] INFO prac.PublishOnPrac -- # doOnNext: 7
// 21:46:07.262 [parallel-2] INFO prac.PublishOnPrac -- # doOnNext filtered: 5
// 21:46:07.262 [parallel-2] INFO prac.PublishOnPrac -- # doOnNext filtered: 7
// 21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # doOnNext mapped: 50
// 21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # onNext: 50
// 21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # doOnNext mapped: 70
// 21:46:07.262 [parallel-1] INFO prac.PublishOnPrac -- # onNext: 70
publishOn() 은 Downstream 으로 signal 을 전송할 때 실행되는 스레드를 제어하는 역할을 하는 Operator 입니다.
즉 publishOn 이후의 Operator sequence 에 대해 어떤 스케줄러를 사용할지 결정합니다.
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data)) // boundedElastic-1
.subscribeOn(Schedulers.boundedElastic()) // 구독이 발생한 직후 실행될 스레드를 지정
.filter(data -> data > 3) // boundedElastic-1, 따로 지정하지 않으면 위의 subscribeOn 스레드에서 실행
.doOnNext(data -> log.info("# doOnNext filtered: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10) // 위에서 parallel 로 스레드를 변경했기 때문에 parallel-1
.doOnNext(data -> log.info("# doOnNext mapped: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
// 21:48:32.620 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 1
// 21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 3
// 21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 5
// 21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext filtered: 5
// 21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext: 7
// 21:48:32.622 [boundedElastic-1] INFO prac.PubOnSubOnPrac -- # doOnNext filtered: 7
// 21:48:32.622 [parallel-1] INFO prac.PubOnSubOnPrac -- # doOnNext mapped: 50
// 21:48:32.622 [parallel-1] INFO prac.PubOnSubOnPrac -- # onNext: 50
// 21:48:32.622 [parallel-1] INFO prac.PubOnSubOnPrac -- # doOnNext mapped: 70
// 21:48:32.623 [parallel-1] INFO prac.PubOnSubOnPrac -- # onNext: 70
subscribeOn() 으로 소스에서 데이터를 생성할 때 사용할 스케줄러를 결정합니다.
즉, 구독이 시작되는 시점부터 데이터의 생성과 초기 처리가 이루어지는 스레드를 지정합니다.
여러 subscribeOn 이 연속해서 호출되더라도 첫 번째 subscribeOn 의 스케줄러만 고려됩니다.
subscribeOn 과 publishOn 을 고루 활용하면 CPU-bound 작업과 IO-bound 작업 등을
적절한 스레드에서 처리하도록 스케쥴링하여 시스템의 병렬 처리 성능을 최적화할 수 있습니다.
스케쥴러의 종류는 다음과 같습니다.
- immediate : 현재 스레드에서 실행
- single : 단일 스레드에서 실행
- newSingle : 새로운 스레드에서 실행
- boundedElastic
- 스레드 풀에서 실행(재사용 가능)
- CPU 코어 수 * 10개의 스레드 생성
- 이용가능한 스레드가 생길 때까지 최대 100,000 개의 작업이 큐에서 대기할 수 있음
- Blocking IO 데이터를 처리하기에 유용
- parallel
- CPU 코어 수만큼 스레드 생성
- Non-Blocking IO 데이터를 처리하기에 유용
- fromExcutorService
- ExecutorService 를 이용하여 스레드 생성
- Reactor 에서는 이 방식을 권장하지 않음
이번 글에서는 Reactor 라이브러리와 핵심 요소인 Flux / Mono, Scheduler 에 대해서 알아보았습니다.
이외에도 Reactor 는 Cold / Hot Sequence, Backpressure, Sinks, Context 등을 제공합니다.
위에 대해서는 다음번에 다뤄보도록 하겠습니다.
다음 글은 실제로 Spring WebFlux 는 어떻게 구현되어있는지 코드를 통해 확인해보도록 하겠습니다.
끝!
참고
'Java, Spring' 카테고리의 다른 글
Spring WebFlux 에서 ProxySQL 을 사용할 때 문제점 (1) | 2024.02.07 |
---|---|
구현하며 이해하는 Spring WebFlux (1) | 2024.02.04 |
Spring WebFlux 이해하기 - Reactive Streams (0) | 2024.02.03 |
JVM, Spring 에서의 시스템 변수와 환경변수 이해 (0) | 2023.11.28 |
Spring Boot 3++ 를 위한 Spring Batch migration 요약 (1) | 2023.11.18 |