본문 바로가기
kafka

카프카(Kafka)를 스프링부트와 함께 사용해보자!

by khds 2023. 12. 14.

 

들어가기

 

최근 모놀리식 아키텍처와 MSA( MircroService Architecture)에 대해 공부를 하였다. 모놀리식 아키텍처는 하나의 애플리케이션 안에 서비스의 모든 부분을 담아서 개발을 하는 것이고, MSA는 서비스의 기능별로 다른 서버를 두어 각각의 애플리케이션으로 개발하는 것이다.

이러한 MSA는 기능별로 독립된 서버이기에 서로의 의존성을 낮추고 나눠서 개발할 수 있으며, 단위테스트가 용이하다는 장점이 있다.  단점으로는 모놀리식 아키텍처보다 훨씬 복잡하다는 점이 있다. 

그렇다면 MSA 방식으로 기능을 구현한 애플리케이션들 간에는 어떻게 소통이 이루어질 수 있을까?

gateway를 통해 서로 간의 api를 호출하는 방식이 있고 메시지를 주고받아 소통을 하는 방식이 있다고 한다.

메시지 처리 방식으로는 '카프카(kafka)'가 있는데, 이 글에선 카프카를 Springboot와 함께 간단하게 실행해 볼 것이다.

스프링부트로 프로젝트를 진행하기전 주키퍼, 브로커를 미리 실행한다. 카프카에 대한 기본 정리 및 카프카 브로커 실행에 대한 것은 https://khdscor.tistory.com/107를 참고하길 바란다.

 

 

의존성 추가

 

우선 카프카를 스프링부트에서 사용하기 위해 build.gradle 파일에 의존성을 추가해 주어야 한다.

implementation 'org.springframework.kafka:spring-kafka'

 

 

Producer 애플리케이션 구현

1. Config 파일 작성

 

Producer에 대한 기본 정보를 나타내는 Config 파일을 작성해야 한다.

아래의 코드를 봐보자.

@Configuration
public class KafkaProducerConfig {

    private static final String BOOTSTRAP_SERVER = "localhost:9092";

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

KafkaTemplate는 직접 카프카를 사용하기 위한 객체이며 세부 내용은 ProducerFactory 내부에 작성한다. 

configProps에 put하는 내용들을 살펴보면 첫 번째는 카프카의 브로커 주소를 입력하는 것으로, 현재 동일 PC에서 진행 중이기에 localhost로 작성하였다.

두 번째와 세 번째는 각각 key와 value를 어떻게 직렬화하여 메시지를 보낼지 정하는 것이다. key는 토픽 이름에 해당하고 value는 전하고자 하는 메시지 내용을 뜻한다.

참고로 value의 직렬화를 String으로 지정하였기 때문에 무조건 String으로 변환한 값을 지정해야 한다. 하지만 보내고자 하는 값을 형태가 dto이고, 바로 value로 집어넣어 메시지를 보내고 싶다면 아래와 같이 직렬화 방식을 JSON으로 변경하고 해당 dto의 타입으로 바꾸고 하면 된다.

@Bean
    public ProducerFactory<String, TestMessage> newProducerFactory(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, TestMessage> newKafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }

 

 

2. Service 작성

 

본격적인 서비스에선 controller로부터 받은 데이터를 서버에 저장하고 메시지를 브로커로 보내려 한다.

아래의 코드를 봐보자.

@Service
@RequiredArgsConstructor
public class KafkaProducerService {

    private static final String TOPIC_NAME = "topic-test";

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final ArticleRepository articleRepository;

    private final ObjectMapper objectMapper = new ObjectMapper();

    public void send(TestMessage testMessage){
        try {
            String toJson = objectMapper.writeValueAsString(testMessage);
            kafkaTemplate.send(TOPIC_NAME, toJson);
            articleRepository.save(testMessage);
        } catch (Exception e) {
            System.out.println("에러 발생: " + e.getMessage());
        }
    }
}

 

config 파일에서 등록한 KafkaTemplate를 의존성 주입하여 사용할 수 있다.

우선 dto 객체를 json 형식으로 변환한 후 kafkaTemplate.send()를 통해 토픽 이름인 key와 dto를 json 형식으로 변환시킨 value를 지정한 브로커로 보내도록 하였다. 

이후 추가적인 작업을 실시할 수 있는데, 여기선 단순히 데이터를 저장하는 것으로 마쳤다.

dto를 json 형식의 String으로 변환하기위해 ObjectMapper를 사용하였지만, 위에서 언급한 것처럼 config 파일에 원하는 dto에 맞게 template를 구성할 수도 있다.

 

이렇게 send()를 함으로써 카프카 브로커로 메시지를 보낼 수 있었다.

정말 간단하지 않는가? 

그렇다면 이제 어떻게 수신이 되는지를 알아볼 차례이다.

 

 

Consumer 애플리케이션 구현

1. Config 파일 작성

 

이제 수신측인 Consumer를 작성해 보자.

우선 기본적인 설정 정보를 작성한 config 파일을 작성한다.

아래의 코드를 봐보자.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    private final static String BOOTSTRAP_SERVER = "localhost:9092";

    private final static String GROUP_ID = "group-test";

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

Producer와 비슷하게 ConsumerFactory 객체 내에 필요한 설정 정보들을 작성한다.

configProps.put()으로 넣을 때 첫 번째는 카프카 브로커의 주소를 적는다. Producer와 마찬가지로 로컬서버로 테스트하는 것이기에 localhost로 지정하였다.

두 번 째는 그룹 Id를 지정해 주는 것이다. Consumer 그룹은 무슨 역할을 할까?

같은 Consumer 역할을 가지는 서버가 여러 개 있다고 하자. Consumer는 메시지를 받으면 데이터베이스에 메시지 내용을 저장을 해야 하고 로그 저장도 해야 한다. 하지만 모든 서버가 로그도 저장해야 하고 데이터베이스에도 저장을 해야 할까? 아니다. 오직 한 서버만 위 사항들을 실시하면 된다. 여기서 같은 Consumer들을 같은 그룹으로 지정해 주면 오직 한 서버에서만 메시지를 받을 수 있다. 그리고 메시지를 받는 Consumer 서버가 장애가 생기면 메시지를 받는 다른 서버가 지정되어 지장 없이 서비스를 진행할 수 있다.

그룹 Id를 지정한 다음 Key와 Value를 어떻게 역직렬화할지를 지정해 준다. Producer과 다르게 역직렬화를 뜻하는 DESERIALIZER를 사용한다는 것을 잊지 말자.

이후 DefaultKafkaConsumerFactory로 묶어서 리턴해준다.

 

Producer와는 다르게 Kafkatemplate가 아니라 ConcurrentKafkaListenerConstainerFactory 객체를 빈으로 등록해 줘야 한다.

 

 

2. Service 작성

 

본격적인 서비스에선 특정 토픽에 대한 메시지가 수신되면 지정 메서드를 실행하고 싶다.

아래의 코드를 봐보자.

@Component
@RequiredArgsConstructor
public class KafkaConsumer {

    private static final String TOPIC_NAME = "topic-test";

    private final ArticleRepository articleRepository;

    ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = TOPIC_NAME)
    public void listenMessage(String jsonMessage) {
        try {
            TestMessage message = objectMapper.readValue(jsonMessage, TestMessage.class);
            System.out.println("수신된 메시지: " + message.getTitle() + " : " + message.getContent());
            articleRepository.save(message);
        } catch (Exception e) {
            System.out.println("에러 발생: " + e.getMessage());
        }
    }
}

 

여기서 핵심은 @KafkaListener 어노테이션을 지정해 준 것이다. 토픽 이름을 지정하고 서버를 구동하면 해당 토픽으로 메시직 수신될 때마다 어노테이션이 지정된 메서드가 실행된다.

메서드 내에선 ObjectMapper를 통해 json 형식의 dto를 객체 타입으로 변환한 후 출력하고 데이터베이스에 저장하는 간단한 코드를 작성하였다. 

이후 상황에 따라 추가적인 내용도 진행할 수 있다.

 

 

결론 

 

이렇게 간단하게 카프카를 스프링부트와 함께 실행하였다. 

MSA 방식에서는 서버와 서버 간의 연계된 동작을 위해선 서로 소통할 수 있는 방식이 필요하고, 그중 카프카가 있다. 

실제로 카프카만을 써보았을 때는 단순히 메시지 전달방식이 어떻게 연계될 수 있을지 의문이 들었지만, 실제로 실습을 해보니 제법 유용하게 사용될 수 있다는 것을 알았다.

아직 부족한 것이 많지만 하나하나 공부하고 차후 MSA 방식으로 프로젝트를 진행해 봐야겠다.

 

 

참고

 

https://velog.io/@taehodot/SpringBoot-%EC%B9%B4%ED%94%84%EC%B9%B4%EC%99%80-%EC%8A%A4%ED%94%84%EB%A7%81%EB%B6%80%ED%8A%B8-%EC%97%B0%EB%8F%99

 

[SpringBoot] 카프카와 스프링부트 연동

아파치 카프가 공식 홈페이지 에서 다운로드 (https://kafka.apache.org/downloads)나는 리눅스에서 wget으로 다운로드, 설치는 압축을 풀면 끝wget https://downloads.apache.org/kafka/3.2.0/

velog.io

 

https://taptorestart.tistory.com/entry/Q-kafka%EC%97%90%EC%84%9C-groupid%EB%A5%BC-%EC%84%A4%EC%A0%95%ED%95%98%EB%A9%B4-%EC%96%B4%EB%96%BB%EA%B2%8C-%EB%90%A0%EA%B9%8C

 

Q. kafka에서 group_id를 설정하면 어떻게 될까?

A. group id가 같을 경우 한 클라이언트에서만 메시지를 소비(consume)할 수 있다. 테스트를 위한 코드: python kafka - producer and consumer 아래처럼 group_id를 설정한 경우에 consumer를 터미널 창을 두 개 띄워

taptorestart.tistory.com

 

 

'kafka' 카테고리의 다른 글

카프카(Kafka)는 어떻게 사용하는가?  (0) 2023.12.09