이 글에서는 Springboot에 Kafka를 연동하는 방법에 대해 알아보겠습니다.

Kafka가 궁금하시다면 이글을 참고해주세요!

 

이 글에서 다루는 내용은 다음과 같습니다.

Springboot에서 Kafka의 특정 Topic에 메시지를 생산(Produce)하고 해당 Topic을 Listen합니다.

Kafka 서버에 해당 메시지가 전달되고, Springboot에서 이를 소비(Consume)할 준비가 되면 메시지를 pull 하는 아주아주 간단한 예제입니다.

 

전체 소스는 깃헙을 참고하시기 바랍니다.

 

 

 

01. 개발 환경 셋팅

1) 프로젝트 구조

2) Kafka 설치

Kafka 설치는 Docker로 설치하는 글Window에서 설치하는 글을 참고해주시길 바랍니다.

이 글에서는 Docker로 설치한 것을 기준으로 작성하였는데, 시스템에 Kafka만 설치되어 있다면 실행에 크게 문제가 발생하진 않을듯 합니다.

 

3) build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'

    /* kafka */
    implementation 'org.springframework.kafka:spring-kafka'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

kafka 연동을 위해 spring-kafka 의존성을 추가합니다.

 

 

 

02. 구현하기

1) application.yml

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer와 producer에 대한 설정을 해줍니다.

  • spring.kafka.consumer
    •  bootstrap-servers
      • Kafka 클러스터에 대한 초기 연결에 사용할 호스트:포트쌍의 쉼표로 구분된 목록입니다.
      • 글로벌 설정이 있어도, consumer.bootstrap-servers가 존재하면 consuemer 전용으로 오버라이딩 합니다.
    • group-id
      • Consumer는 Consumer Group이 존재하기 때문에, 유일하게 식별 가능한 Consumer Group을 작성합니다.
    • auto-offset-reset
      • Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 없는 경우 수행할 작업을 작성합니다.
      • Consumer Group의 Consumer는 메시지를 소비할 때 Topic내에 Partition에서 다음에 소비할 offset이 어디인지 공유를 하고 있습니다. 그런데 오류 등으로 인해. 이러한 offset 정보가 없어졌을 때 어떻게 offeset을 reset 할 것인지를 명시한다고 보시면 됩니다.
        • latest : 가장 최근에 생산된 메시지로 offeset reset
        • earliest : 가장 오래된 메시지로 offeset reset
        • none : offset 정보가 없으면 Exception 발생
      • 직접 Kafka Server에 접근하여 offset을 reset할 수 있지만, Spring에서 제공해주는 방식은 위와 같습니다.
    • key-deserializer / value-deserializer
      • Kafka에서 데이터를 받아올 때, key / value를 역직렬화 합니다.
      • 여기서 key와 value는 뒤에서 살펴볼 KafkaTemplate의 key, value를 의미합니다.
      • 이 글에서는 메시지가 문자열 데이터이므로 StringDeserializer를 사용했습니다. JSON 데이터를 넘겨줄 것이라면 JsonDeserializer도 가능합니다.
  • spring.kafka.producer
    •  bootstrap-servers
      • consumer.bootstrap-servers와 동일한 내용이며, producer 전용으로 오버라이딩 하려면 작성합니다.
    • key-serializer / value-serializer
      • Kafka에 데이터를 보낼 때, key / value를 직렬화 합니다.
      • consumer에서 살펴본 key-deserializer, value-deserializer와 동일한 내용입니다.

더 많은 설정 내용은 여기서 다룰수 없기 때문에 공식 문서에서 kafka를 검색하셔서 참고해주시기 바랍니다.

 

💡 참고

여기서는 Producer/Consumer 설정을 application.yml에 작성했지만, bean을 통해 설정하는 방법도 있습니다.

Producer, Consumer의 설정을 여러 개로 관리하고 싶다면 bean으로 구현하는 것도 좋은 방법일듯 합니다.

 

 

 

2) KafkaController.java

import com.victolee.kafkaexam.Service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);

        return "success";
    }
}

post 방식으로 message 데이터를 받아서, Producer 서비스로 전달합니다.

 

 

3) KafkaProducer.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

KafkaTemplate에 Topic명과 Message를 전달합니다.

KafkaTemplate.send() 메서드가 실행되면, Kafka 서버로 메시지가 전송됩니다.

 

 

4) KafkaConsumer.java

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "exam", groupId = "foo")
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message : %s", message));
    }
}

Kafka로부터 메시지를 받으려면 @KafkaListener 어노테이션을 달아주면 됩니다.

 

 

03. 테스트

1) Springboot & Kafka연동 확인

테스트를 해보기전에, Kafka가 잘 실행되고 있는지 확인을 해보시기 바랍니다.

Springboot 애플리케이션 실행시 아래의 이미지처럼 커넥션 실패 로그가 계속 출력이 되면, Kafka 서버가 실행이 안됐다던지 등의 이유로 연동이 안되고 있는 상태이므로 우선적으로 해결이 필요합니다.

 

애플리케이션이 실행되면 Kafka 서버와 커넥션이 이루어지는데, Cosunmer의 @KafkaListener에서 설정한 exam 토픽을 자동으로 생성하는 것을 확인할 수 있습니다.

참고로 이는 broker의 설정과 관련이 있습니다.

auto.create.topcis.enable이 설정되어 있으면 topci이 없을경우 topic을 자동으로 생성합니다.

 

 

2) 메시지 pub/sub

Springboot와 Kafka 연동이 확인되었으므로 이제 메시지 pub/sub 테스트를 해보겠습니다.

먼저 Kafka 컨테이너에서 exam 토픽에 메시지가 전송되었는지 확인해봅니다.

# docker exec -it {컨테이너명} bash 
# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic exam

아직은 메시지 발송된 것이 없으니 아무것도 출력되고 있지 않습니다.

 

 

이제 포스트맨으로 API를 call 합니다.

응답은 success가 출력되었으니, 정상 실행된 것을 확인할수 있습니다.

 

다음으로 Kafka 컨테이너의 topic 메시지를 확인해보겠습니다.

 # kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic exam

 

메시지를 publish하면 로그를 남기도록 Producer에서 작성했으므로 Springboot에서도 로그를 확인해보겠습니다.

produce와 consume 메시지가 잘 출력되고 있습니다.

 

 

 

이상으로 Springboot에서 Kafka를 연동해보는 간단한 예제를 구현해보았습니다.

더 많은 사용방법은 역시 공식문서를 참고하시면 좋을듯 합니다.

 

참고 자료

  • https://docs.spring.io/spring-kafka/reference/html/
  • https://www.confluent.io/blog/apache-kafka-spring-boot-application/
  • https://team-platform.tistory.com/37
댓글 펼치기 👇
  1. iam 2021.03.03 20:53

    안녕하세요 블로그 잘 보면서 따라하고 있습니다.
    다름이 아니라 저도 하던 중에 커넥션 에러가 떴는데, 혹시 victolee님은 어떤 문제로 저게 떴었고 어떻게 해결했는지 알 수 있을까요?ㅠㅜ