Redis pub sub ( Publisher / Subscriber )  

Redis는 key / value 저장소이며, 캐시 또는 메시지 중개자로 사용되는 DB입니다.

Redis는 메시지 중개자로서 역할을 할 수 있는데, 이를 Redis pub sub이라 합니다.


pub sub 디자인은 메시지를 전송하는 Publishers와 메시지를 구독 받는 Subscribers로 나뉩니다.

그렇다면 메시지를 주고 받는 공간이 있어야 하는데, 이를 Channel( 채널 )이라고 표현합니다.

Subscriber는 여러 개의 채널을 구독할 수 있습니다.

( 유튜브와 비슷한 개념이라고 생각하면 좋을 것 같습니다. )


즉 Publisher가 채널에 메시지를 보내면, 그 채널을 구독하고 있는 Subscriber들에게 메시지가 전달됩니다.

메시지를 보낼 때 Publisher는 채널에 어떤 Subscriber가 있는지 알 필요가 없이, 그냥 메시지를 전송만 하면 됩니다.

메시지를 받은 Subscriber들은 각자 다양한 패턴을 가질 수 있습니다.

예를 들어 어떤 Subscriber는 log를 남길 수 있고, 어떤 Subscriber는 Slack에 메시지를 남길 수 있고, 다른 애플리케이션 알림을 보낼 수도 있는 등의 다양한 패턴을 갖습니다.




Redis pub sub 특징

Reids pub sub은 다음과 같은 특징이 존재합니다.

1) No persistence

Redis pub sub은 지속적이지 않습니다.
메시지는 Subscriber에게 직접 전달된 후 삭제되며, 메모리 또는 디스크에 기록이 남지 않습니다.
pub sub은 오직 새로운 메시지 전달만 용이하게 한다는 점에서 live stream과 같습니다.

2) No delivery guarantees
pub sub 모델에서 Subscriber가 메시지 받는 것을 보장하지는 않습니다.
예를 들어 Subscriber의 네트워크에 문제가 생기면, 메시지를 읽는데 실패할 것입니다.
즉, Subscriber가 만약 메시지를 놓쳤다면, 메시지를 받을 수 없습니다.

Publisher는 구독하는 Subscriber가 없는 경우에도 채널에 메시지를 보낼 수 있지만, 그 메시지는 삭제됩니다.
이 방식은 빠른 처리가 가능하다는 것이 장점입니다.





이제 Spring에서 Redis pub sub을 사용하는 방법에 대해 알아보겠습니다.

Spring Redis에 대한 환경 설정은 이글을 참고해주세요 !

( 이 글의 프로젝트 구조 역시 위 링크의 글을 그대로 사용합니다. )


준비 작업

프로젝트 구조는 다음과 같습니다.


1)

Redis 설정 클래스인 SpringRedisConfig에서 두 개의 Bean을 등록했었습니다.

JedisConnectionFactory  :  jedis를 사용하기 위해 Redis와 연결하는 Factory

RedisTemplate                 : 메시지 제공자가 메시지를 보내기 위한 Template

            => 문자열을 다룰 때 문자열 깨짐 방지를 위해 StringRedisTemplate을 사용합니다.


마찬가지로 Spring에서 pub sub 모델을 사용하기 위해서, 메시지 큐를 위한 Bean을 추가해야 합니다.

com.victolee.controller.SpringRedisConfig
@Bean
MessageListenerAdapter messageListener() { 
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

MessageListnerAdapter Bean은 pub sub 모델에서 subscriber 역할을 수행하며, 조금 뒤에 구현 할 RedisMessageSubscriber을 포함합니다.




2)

다음으로 Channel의 메시지를 받는데 사용되는 컨테이너를 구현해야 합니다.

com.victolee.controller.SpringRedisConfig

@Bean
RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory()); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
}

RedisMessageListenerContainer는 Spring Data Redis에서 제공하는 클래스입니다.

컨테이너는 Redis 채널로부터 메시지를 받는데 사용하며, 구독자들에게 메시지를 dispatch 하는 역할을 합니다.

즉 메시지를 수신하는데 관련한 비즈니스 로직을 작성할 수 있습니다


topic() 메서드는 조금 뒤에 구현할 것인데, 채널을 반환하는 메서드입니다.




3)

1,2) 단계에서 메시지를 구독하기 위한 Bean을 생성했습니다.

이제 메시지를 게시하는 Bean을 작성하겠습니다.

com.victolee.controller.SpringRedisConfig

@Bean
MessagePublisher redisPublisher() { 
    return new RedisMessagePublisher(strRedisTemplate(), topic());
}

MessagePublisher 타입은 인터페이스로서 조금 뒤에 구현할 것입니다.

그리고 RedisMessagePublisher는 MessagePublisher 인터페이스를 구현한 클래스이며, 여기서는 이를 반환하는 Bean을 생성합니다.

RedisMessagePublisher 인스턴스를 생성할 때 생성자에 StringRedisTemplate과 topic을 넘겨줍니다.

이렇게 작성을 하면, 메시지를 게시하는 API를 사용할 수 있습니다.




4)

메시지를 게시하고 구독하는 Bean을 모두 작성했습니다.

마지막으로 메시지를 주고 받는 공간인 채널 Bean을 작성합니다.

@Bean
ChannelTopic topic() {
    return new ChannelTopic("messageQueue");
}

채널의 이름은 messageQueue입니다.





메시지 보내기

Bean을 모두 등록했으므로 이제 메시지를 채널로 보내보겠습니다.

메시지를 보내기 위해서, 준비 작업 3) 단계에서 미뤄뒀던 MessagePublisher 인터페이스와 RedisMessagePublisher 클래스를 구현해야 합니다.


1)

먼저 MessagePublisher 인터페이스를 작성하겠습니다.

com.victolee.redis.MessagePublisher 인터페이스

public interface MessagePublisher {
    void publish(String message);
}

Spring Data Redis는 메시지 분배를 위한 기능을 제공하지 않으므로, MessagePublisher 인터페이스를 생성합니다.




2)

이제 실제로 메시지를 보내기 위해 MessagePublisher 인터페이스를 구현한 RedisMessagePublisher 클래스를 작성해 보겠습니다.

com.victolee.redis.RedisMessagePublisher
public class RedisMessagePublisher implements MessagePublisher{
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private ChannelTopic topic;
 
    public RedisMessagePublisher() { }
 
    public RedisMessagePublisher(StringRedisTemplate redisTemplate, ChannelTopic topic) {
      this.redisTemplate = redisTemplate;
      this.topic = topic;
    }
 
    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

RedisMessagePublisher는 Template을 이용하여 메시지 게시를 위한 디테일한 작업을 정의합니다.

Template에는 다양한 기능이 있는데, 그 중에서 convertAndSend() 메서드는 목적지( 채널 )에 메시지를 전달하는 역할을 합니다.

pub sub 모델의 특징 상 채널을 구독하고 있는 모든 구독자에게 메시지가 전달됩니다.





메시지 구독하기

준비 작업 1) 단계에서 MessageListenerAdapter 생성자의 인자로 RedisMessageSubscriber 인스턴스를 전달합니다.

RedisMessageSubscriber는 MessageListner 인터페이스를 구현한 클래스입니다.

메시지를 구독하기 위해서는 Spring Data Redis에서 제공하는 MessageListner 인터페이스를 구현해야 하는데,

이 인터페이스는 메시지와 채널에 대한 접근을 제공합니다.


이제 RedisMessageSubscriber 클래스를 구현해보겠습니다.

com.victolee.redis.RedisMessageSubscriber.java

public class RedisMessageSubscriber implements MessageListener {
 
    public static List<String> messageList = new ArrayList<String>();
 
    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString());
        System.out.println("Message received: " + message.toString());
    }
}

onMessage() 메서드를 통해 메시지를 받을 때 동작하는 로직을 작성할 수 있습니다.

또한 onMessage() 메서드의 두 번째 파라미터인 pattern을 통해 채널 매칭을 위한 패턴을 정의할 수 있습니다. ( 여기서는 사용하지 않습니다. )

pattern을 작성하면 여러 채널로부터 구독을 할 수 있다.





메시지 주고 받기

Redis pub sub을 사용하기 위한 모든 작업이 끝났습니다.

준비 작업에서 Publisher, Subscriber, Channel에 대한 Bean을 모두 등록했고,

Bean에서 사용하는 메시지를 보내고, 구독하기 위한 클래스도 모두 작성했습니다.


이제 메시지를 보내는 함수를 작성하겠습니다.

com.victolee.service.SpringRedisExample

@Service
public class SpringRedisExample {
	public void exam() {
		ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(SpringRedisConfig.class);
		try {
			StringRedisTemplate redisTemplate = (StringRedisTemplate)ctx.getBean("strRedisTemplate");
			ChannelTopic topic = (ChannelTopic)ctx.getBean("topic");
			
			RedisMessagePublisher redisMessagePublisher
						= (RedisMessagePublisher)ctx.getBean("redisPublisher", redisTemplate, topic);
			String message = "Message!!";
	        	redisMessagePublisher.publish(message);
			
		}	
		catch(Exception e) {
			e.printStackTrace();
		}
		finally {
			ctx.close();
		}
		
	}
}

메시지를 보내려면 RedisMessagePublisher의 publish() 메서드를 호출하기만 하면 됩니다.


이제 테스트를 위해 컨트롤러에서 exam() 메서드를 호출하도록 코드를 작성한 후 서버를 실행하여 브라우저에서 접근해보세요.


보시는 바와 같이 콘솔에 메시지가 출력되는 것을 확인할 수 있습니다.

이 내용은 RedisMessageSubscriber 에서 작성한 onMessage() 메서드의 내용입니다.

( 그런데 왜 두 번이 출력되는지 모르겠네요... )




이번에 redis-cli에서 messageQueue 채널을 구독하여, 메시지가 오는지 확인해보겠습니다.

redis-cli에서 아래의 명령어를 작성하신 후, 브라우저에서 새로 고침을 눌러보세요.

pubsub channels            :  채널 목록을 확인

subscribe messageQueue  : messageQueue 채널을 구독




이상으로 Spring에서 Redis pub sub을 사용하는 방법에 대해 알아보았습니다.

Publisher, Subscriber, Channel에 대한 Bean을 등록하는 방법을 알아보았고,

메시지를 보내는 publish() 메서드, 메시지를 받는 onMessage() 메서드를 구현했습니다.


[ 참고 자료 ]

http://arahansa.github.io/docs_spring/redis.html#pubsub

http://www.baeldung.com/spring-data-redis-pub-sub