Spring Webflux Sink
2023. 7. 15. 10:58ㆍSpring/webflux
728x90
Sinks란?
Reactor에서 Processor(Publisher + Subscriber) 을 개선한 구현체
Signal을 프로그래밍 방식으로 푸시
Flux 및 Mono의 의미 쳬계를 가짐
멀티 스레드 방식으로 Signal을 전송해도 스레드 안정성 보장
Sinks 사용 방법
Sinks.One
한 건의 데이터를 전송
Mono 방식으로 Subscriber가 데이터를 소비할 수 있도록 함
@Test
@DisplayName("mono test")
void sinkTest() {
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
mono.subscribe(v -> log.info("{}", v));
}
// 실행 결과
Hello
FAIL_FAST 는 에러 발생 시 빠른 실패 처리를 의미함
@Test
@DisplayName("mono test")
void sinkTest() {
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("1 data", Sinks.EmitFailureHandler.FAIL_FAST);
sinkOne.emitValue("2 data", Sinks.EmitFailureHandler.FAIL_FAST);
mono.subscribe(v -> log.info("첫 시도 : {}", v));
mono.subscribe(v -> log.info("두번째 시도 : {}", v));
}
// 실행 결과
첫 시도 : 1 data
두번째 시도 : 1 data
단 하나의 데이터만 처리하기 때문에 나머지 데이터는 드롭됨
Sinks.Many
여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 가짐
Flux 방식으로 Subscriber가 데이터를 소비할 수 있도록 함
ManySpec을 통해 다음 기능을 정의
UnicastSpec :
단일 Subscriber에게 데이터를 방출
@Test
@DisplayName("unicast Test")
void unicastTest() {
Sinks.Many<String> sinkMany = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> result = sinkMany.asFlux();
sinkMany.emitNext("data 1", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 2", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 3", Sinks.EmitFailureHandler.FAIL_FAST);
result.subscribe(v -> log.info("{}", v)); // 정상 실행
result.subscribe(); // 두번째 구독 시 에러 발생 : unicast는 단일 Subscriber만 지원
}
// 실행 결과
10:51:32.340 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- data 1
10:51:32.342 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- data 2
10:51:32.342 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- data 3
10:51:32.343 [main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
Caused by: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
at reactor.core.publisher.SinkManyUnicast.subscribe(SinkManyUnicast.java:422)
......
Process finished with exit code 0
MulticastSpec :
데이터를 복제하여 다수 Subscriber에게 독립적으로 방출
@Test
@DisplayName("multicast Test")
void multicastTest() {
Sinks.Many<String> sinkMany = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> result = sinkMany.asFlux();
sinkMany.emitNext("data 1", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 2", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 3", Sinks.EmitFailureHandler.FAIL_FAST);
result.subscribe(v -> log.info("1차 실행 : {}", v));
result.subscribe(v -> log.info("0002차 실행 : {}", v));
sinkMany.emitNext("data 4", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 5", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 6", Sinks.EmitFailureHandler.FAIL_FAST);
}
//실행 결과
10:54:31.200 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 1
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 2
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 3
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 4
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 4
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 5
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 5
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 6
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 6
MulticastReplaySpec :
데이터를 복제하여 다수 Subscriber에게 방출하면서 재전송을 지원, 나중에 구독한 Subscriber가 이전 데이터를 받을 수 있음
@Test
@DisplayName("replay Test")
void replayTest() {
Sinks.Many<String> sinkMany = Sinks.many().replay().all();
Flux<String> result = sinkMany.asFlux();
sinkMany.emitNext("data 1", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 2", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 3", Sinks.EmitFailureHandler.FAIL_FAST);
result.subscribe(v -> log.info("1차 실행 : {}", v));
result.subscribe(v -> log.info("0002차 실행 : {}", v));
sinkMany.emitNext("data 4", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 5", Sinks.EmitFailureHandler.FAIL_FAST);
sinkMany.emitNext("data 6", Sinks.EmitFailureHandler.FAIL_FAST);
}
10:55:30.143 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 1
10:55:30.144 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 2
10:55:30.144 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 3
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 1
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 2
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 3
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 4
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 4
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 5
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 5
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 6
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 6
728x90
'Spring > webflux' 카테고리의 다른 글
Spring Webflux 및 MVC Security환경 비교 (0) | 2023.07.15 |
---|---|
Spring webflux를 활용한 SSE(Server Sent Event) 구현 (0) | 2023.07.15 |
Spring Webflux - Scheduler (0) | 2023.07.15 |
리액티브 스트림즈, Mono, 그리고 Flux (0) | 2023.07.14 |
Reactive programming - Spring webflux (0) | 2023.07.14 |