2023. 7. 15. 12:59ㆍSpring/webflux
Scheduler란
Reactor Sequence에서 사용되는 스레드를 관리해주는 관리자
주요 역할
스레드 관리
비동기 작업을 실행할 스레드 풀을 관리
별도의 스레드에서 실행되며, 블로킹되지 않고 비동기적으로 처리
작업 스케줄링
비동기 작업 실행을 스케줄링. 즉시/지연/주기적 실행 등 다양한 옵션 제공
백프레셔 관리
백프레셔 : Publisher와 Subscriber 간 데이터 흐름 제어, 작업 속도를 조절하여 효율적 데이터 처리를 가능하게 함
Scheduler 전용 Operator
subscribeOn(), publishOn() 이 Scheduler 전용 Operator임
적절한 Scheduler를 전달하면 해당 특성에 맞는 스레드가 Reactor Sequence에 할당
subscribeOn()
구독이 발생한 직후 실행될 스레드 지정
구독 발생시 원본
Publisher가 데이터를 최초로 emit하는데 subscribeOn()은 구독 시점 직후에 실행됨.
원본 Publisher의 동작을 수행하기 위한 스레드
체인에서 어디에 있든 실행 구독 발생 직후 실행
@Test
@DisplayName("subscribeOn test")
void runSubscribeOn() {
Flux.just("A", "B", "C")
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("doOnNext:{}", data))
.doOnSubscribe(sub -> log.info("doOnSubscribe"))
.subscribe(data -> log.info("subscribe : {}", data));
}
// 실행 결과
12:42:33.292 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- doOnSubscribe
12:42:33.295 [boundedElastic-1] INFO com.example.webfluxdemo.WebfluxTest2 -- doOnNext:A
12:42:33.296 [boundedElastic-1] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : A
12:42:33.296 [boundedElastic-1] INFO com.example.webfluxdemo.WebfluxTest2 -- doOnNext:B
12:42:33.296 [boundedElastic-1] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : B
12:42:33.296 [boundedElastic-1] INFO com.example.webfluxdemo.WebfluxTest2 -- doOnNext:C
12:42:33.296 [boundedElastic-1] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : C
실행 결과를 보면 subscribeOn()을 통해 다른 스레드에서 실행되는 것을 볼 수 있다.
publishOn()
Downstream으로 Signal을 전송할 때 실행되는 스레드를 제어하는 역할
코드 기준으로 publishOn() 기준 아래쪽 스트림들의 실행 스레드를 변경
@Test
@DisplayName("publishOn test")
void runPublishOn() {
Flux.just("A", "B", "C")
.map(data -> {
log.info("-----상단 map:{}", data);
return data;
})
.publishOn(Schedulers.parallel())
.map(data -> {
log.info("---중단 map:{}", data);
return data;
})
.publishOn(Schedulers.parallel())
.map(data -> {
log.info("하단 map:{}", data);
return data;
})
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("subscribe : {}", data));
}
//실행 결과
12:46:32.506 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- -----상단 map:A
12:46:32.508 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- -----상단 map:B
12:46:32.508 [parallel-3] INFO com.example.webfluxdemo.WebfluxTest2 -- ---중단 map:A
12:46:32.508 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- -----상단 map:C
12:46:32.508 [parallel-3] INFO com.example.webfluxdemo.WebfluxTest2 -- ---중단 map:B
12:46:32.508 [parallel-2] INFO com.example.webfluxdemo.WebfluxTest2 -- 하단 map:A
12:46:32.508 [parallel-3] INFO com.example.webfluxdemo.WebfluxTest2 -- ---중단 map:C
12:46:32.508 [parallel-2] INFO com.example.webfluxdemo.WebfluxTest2 -- 하단 map:B
12:46:32.508 [parallel-1] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : A
12:46:32.508 [parallel-2] INFO com.example.webfluxdemo.WebfluxTest2 -- 하단 map:C
12:46:32.508 [parallel-1] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : B
12:46:32.508 [parallel-1] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : C
parallel()
subscribeOn과 publishOn은 논리적 스레드에 해당,
parallel() 병령성을 가지는 물리적 스레드 해당 -> CPU 코어 개수만큼 병렬 실행
논리적 스레드 : SW 레벨에서 동시에 실행하는 작업 단위, 각 스레드가 독립적으로 실행되는 것 처럼 보이나, OS가 번갈아가며 실행하는 것 : SW적인 개념으로 프로그램 내에 실행하는 작업 단위를 나타냄
물리적 스레드 : CPU가 실제 하드웨어 스레드를 나타냄
@Test
@DisplayName("parallel test")
void parallelOn() {
Flux.just("A", "B", "C")
.parallel()
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("subscribe : {}", data));
}
//실행 결과
12:49:23.725 [parallel-1] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : A
12:49:23.725 [parallel-3] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : C
12:49:23.725 [parallel-2] INFO com.example.webfluxdemo.WebfluxTest2 -- subscribe : B
Scheduler 종류
Scheduler.immediate()
별도 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업 처리
Scedulers.single()
스레드를 하나만 생성해서 Scheduler 제거 전까지 재사용
Schedulers.newSingle()
호출때마다 매번 새로운 스레드 생성
Schedulers.boundedElastic()
스레드 풀을 생성한 후 그 안에서 정해진 수 만큼 스레드 사용, 작업 처리 후 종료된 스레드는 반납
Blocking I/O 작업 효과적으로 처리 가능
Schedulers.parallel()
CPU 코어 수 만큼 스레드 생성
non-Blocking I/O 작업 효과적으로 처리 가능
Schedulers.fromExecutorService()
기존에 이미 사용하고 있는 ExecutorService(스레드풀)이 있다면 해당 서비스로 부터 Scheduler를 생성
Reactor에서는 이 방식을 권장하지 않음
Schedulers.newXXXX()
Reactor에서 제공하는 디폴트 Scheduler 인스턴스 사용
'Spring > webflux' 카테고리의 다른 글
Spring Webflux 및 MVC Security환경 비교 (0) | 2023.07.15 |
---|---|
Spring webflux를 활용한 SSE(Server Sent Event) 구현 (0) | 2023.07.15 |
Spring Webflux Sink (0) | 2023.07.15 |
리액티브 스트림즈, Mono, 그리고 Flux (0) | 2023.07.14 |
Reactive programming - Spring webflux (0) | 2023.07.14 |