Spring Webflux Sink

2023. 7. 15. 10:58Spring/webflux

728x90

Sinks란?

Reactor에서 Processor(Publisher + Subscriber) 을 개선한 구현체

Signal을 프로그래밍 방식으로 푸시

Flux 및 Mono의 의미 쳬계를 가짐

멀티 스레드 방식으로 Signal을 전송해도 스레드 안정성 보장

 

Sinks 사용 방법

 

Sinks.One

한 건의 데이터를 전송

Mono 방식으로 Subscriber가 데이터를 소비할 수 있도록 함

 

    @Test
    @DisplayName("mono test")
    void sinkTest() {
        Sinks.One<String> sinkOne = Sinks.one();
        Mono<String> mono = sinkOne.asMono();
        
        sinkOne.emitValue("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
        
        mono.subscribe(v -> log.info("{}", v));
    }
    
    // 실행 결과
    
  Hello

FAIL_FAST 는 에러 발생 시 빠른 실패 처리를 의미함

    @Test
    @DisplayName("mono test")
    void sinkTest() {
        Sinks.One<String> sinkOne = Sinks.one();
        Mono<String> mono = sinkOne.asMono();

        sinkOne.emitValue("1 data", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkOne.emitValue("2 data", Sinks.EmitFailureHandler.FAIL_FAST);

        mono.subscribe(v -> log.info("첫 시도 : {}", v));
        mono.subscribe(v -> log.info("두번째 시도 : {}", v));
    }


// 실행 결과

첫 시도 : 1 data
두번째 시도 : 1 data

단 하나의 데이터만 처리하기 때문에 나머지 데이터는 드롭됨

 

 

Sinks.Many

여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 가짐

Flux 방식으로 Subscriber가 데이터를 소비할 수 있도록 함

 

ManySpec을 통해 다음 기능을 정의

UnicastSpec : 

단일 Subscriber에게 데이터를 방출

    @Test
    @DisplayName("unicast Test")
    void unicastTest() {
        Sinks.Many<String> sinkMany = Sinks.many().unicast().onBackpressureBuffer();

        Flux<String> result = sinkMany.asFlux();

        sinkMany.emitNext("data 1", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 2", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 3", Sinks.EmitFailureHandler.FAIL_FAST);

        result.subscribe(v -> log.info("{}", v)); // 정상 실행

        result.subscribe(); // 두번째 구독 시 에러 발생 : unicast는 단일 Subscriber만 지원
    }
    
    // 실행 결과
10:51:32.340 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- data 1
10:51:32.342 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- data 2
10:51:32.342 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- data 3
10:51:32.343 [main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
Caused by: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
	at reactor.core.publisher.SinkManyUnicast.subscribe(SinkManyUnicast.java:422)
......
Process finished with exit code 0

MulticastSpec : 

데이터를 복제하여 다수 Subscriber에게 독립적으로 방출

    @Test
    @DisplayName("multicast Test")
    void multicastTest() {
        Sinks.Many<String> sinkMany = Sinks.many().multicast().onBackpressureBuffer();

        Flux<String> result = sinkMany.asFlux();

        sinkMany.emitNext("data 1", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 2", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 3", Sinks.EmitFailureHandler.FAIL_FAST);

        result.subscribe(v -> log.info("1차 실행 : {}", v));
        result.subscribe(v -> log.info("0002차 실행 : {}", v));
        sinkMany.emitNext("data 4", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 5", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 6", Sinks.EmitFailureHandler.FAIL_FAST);
    }
    
    //실행 결과
    
10:54:31.200 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 1
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 2
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 3
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 4
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 4
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 5
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 5
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 6
10:54:31.202 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 6

MulticastReplaySpec : 

데이터를 복제하여 다수 Subscriber에게  방출하면서 재전송을 지원, 나중에 구독한 Subscriber가 이전 데이터를 받을 수 있음

    @Test
    @DisplayName("replay Test")
    void replayTest() {
        Sinks.Many<String> sinkMany = Sinks.many().replay().all();

        Flux<String> result = sinkMany.asFlux();

        sinkMany.emitNext("data 1", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 2", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 3", Sinks.EmitFailureHandler.FAIL_FAST);

        result.subscribe(v -> log.info("1차 실행 : {}", v));
        result.subscribe(v -> log.info("0002차 실행 : {}", v));
        sinkMany.emitNext("data 4", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 5", Sinks.EmitFailureHandler.FAIL_FAST);
        sinkMany.emitNext("data 6", Sinks.EmitFailureHandler.FAIL_FAST);
    }
    
10:55:30.143 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 1
10:55:30.144 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 2
10:55:30.144 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 3
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 1
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 2
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 3
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 4
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 4
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 5
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 5
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 1차 실행 : data 6
10:55:30.145 [main] INFO com.example.webfluxdemo.WebfluxTest2 -- 0002차 실행 : data 6

 

 

 

728x90