spring 환경에서 redis pub-sub을 이용한 멀티 캐스팅

2023. 7. 15. 15:41Spring

728x90

도입 계기

기존에 대기자를 매칭하는 서비스를 제작하고 있었다. 해당 서비스는 SSE(Server Sent Event)로 구현하며,

각 서비스가 Sink 상태를 직접 관리하였다.

문제는 서비스가 Sink를 관리하다보니 스케일 아웃 진행시에 각 확장 서버가 대기자를 동기화가 되지 않는 문제가 있었다.

대기자를 동기화하기 위해서 새로운 대기자가 생기면 각 서버에 multicasting 하는 방법을 구상했다.

 

멀티 캐스팅이란?

네트워크에서 그룹으로 지정된 여러 대상에게 대이터를 전송하는 방식.

송신자는 그룹에 데이터를 전송하고, 해당 그룹에 가입된 수신자만 데이터를 수신하는 구조.

 

redis pub-sub란?

인메모리 DB인 redis는 publish-subscribe 메시징 패턴을 지원하여 메시지를 발행하고 구독하는 기능을 제공

 

redis pub-sub 주요 구성 요소

publisher :

Redis에 메시지를 발행하는 클라이언트, 특정 채널에 메시지를 게시

subscriber :

메시지를 수신하는 클라이언트, 하나 이상의 채널을 구독하고, 해당 채널에 게시된 메시지를 수신

channel :

메시지를 발행하고 구독하는 Redis의 주체

 

redis pub-sub 특징

 

비동기 메시지 전달

비동기적으로 메시지를 발행/구독하여 발행자와 구독자 간 결합도 낮음

확장성

redis는 고성능을 제공하는 분산 시스템. 수천 개의 동시 구독자에게 메시지 전달 가능

다중 채널 지원

각 구독자는 여러개의 채널을 구독 가능

지속성

redis는 메모리에 저장하는 동시에 디스크에도 저장하여 안정성을 보장

 

예시

public record Person(String name, int age) {}
@Configuration
public class RedisConfig {
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer3(RedisConnectionFactory connectionFactory, Subscriber subscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(subscriber, channelTopic());
        return container;
    }

    @Bean
    public RedisTemplate<String, Person> redisMemberTemplate(RedisConnectionFactory connectionFactory) {
        Jackson2JsonRedisSerializer<Person> serializer =
                new Jackson2JsonRedisSerializer<>(Person.class);
        RedisTemplate<String, Person> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.afterPropertiesSet();
        template.setDefaultSerializer(serializer);
        template.setKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8));
        template.setValueSerializer(serializer);
        template.setHashKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8));
        template.setHashValueSerializer(serializer);

        return template;
    }

    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic("broadcast-channel");
    }
}

 

레디스에 바이트 형식으로 저장되므로 Object 저장 시에 Serializer를 명시해주어야 한다.

 

@Component
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
@RequiredArgsConstructor
public class Publisher {
    RedisTemplate<String, Person> redisTemplate;
    ChannelTopic topic;


    public void publish(Person message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}
@Slf4j
@Component
public class Subscriber implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        Jackson2JsonRedisSerializer<Person> serializer =
                new Jackson2JsonRedisSerializer<>(Person.class);
        final Person person = serializer.deserialize(message.getBody());

        log.info("{}", channel);
        log.info("{}", person.name());
        log.info("{}", person.age());
    }
}

마찬가지로, byte 형식으로 된 데이터를 가져올 때는 Deserialize하는 작업이 필요하다.

@RestController
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
@RequiredArgsConstructor
public class TestController {

    Publisher publisher;


    @PostMapping("test")
    public void send(@RequestBody Person person) {
        publisher.publish(person);
    }
}

application.yml

spring:
  data:
    redis:
      port: 6379
      host: localhost

 

실행 결과

다른 포트로 서버를 여러 대 띄워두고 실행했다

1회 전송 시 redis 채널을 구독한 모든 서버에게 동일한 메시지를 전달하는 것을 볼 수 있다.

728x90