gony-dev 님의 블로그

[Apache Kafka] Spring boot 3.x.x과 Kafka 연동하기 본문

Kafka

[Apache Kafka] Spring boot 3.x.x과 Kafka 연동하기

minarinamu 2024. 11. 18. 14:40
Apache Kafka의 구성 요소와 동작 방식에 대해 알아보았으니
간단한 실습을 통해 이를 더 이해해보도록 하겠다.

📌 프로젝트 환경

  • Spring boot version | 'org.springframework.boot' version '3.3.5'
  • JDK | 17
  • 가독성 있는 실습을 진행하기 위해 Consumer과 Producer에 대한 프로젝트를 따로 생성해 주었다!

의존성

  • 두 어플리케이션 모두 같은 의존성을 추가하였다.
dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
    
    // kafka 의존성 추가
	implementation 'org.springframework.kafka:spring-kafka'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

 

Producer Application

1. ProducerConfig

@Configuration
public class ProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • 코드를 좀 살펴보자.
  • producerFactory()
    • ProducerFactory는 카프카 설정을 위한 역할을 맡는다.
    • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG | 카프카 브로커의 주소를 지정한다. 즉, 중개 장소를 지정하는 것이다.
    • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG | 메시지의 키를 직렬화한다.
    • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG | 메시지의 값을 직렬화한다.
  • kafkaTemplate() | Kafka에 메시지를 송신하는 데 사용된다!

2. Producer Service

@Component
@RequiredArgsConstructor
public class TestProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("topic", message);
        System.out.println("Sent message: " + message);
    }
}

 

Consumer Application

1. ConsumerConfig

@Configuration
public class ConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}
  • Consumer는 Producer에서 넣은 키-값의 직렬화의 값을 역직렬화하여 네트워크를 통해 전달받은 저수준의 형식을
    읽을 수 있는 객체로 변환하여 메시지를 해석한다.

 

2. Consumer Service

@Component
public class TestConsumer {

    @KafkaListener(topics = "topic", groupId = "group_1")
    public void listen(Object message) {
        System.out.println(message);
    }
}
  • KafkaListener는 특정 경로의 메시지를 받을 수 있게 하는 어노테이션으로
    이에 대해선 자세하게 다루어볼 예정이다.

Docker-compose

  • 우리는 도커를 이용하여 kafka와 zookeeper의 포트를 개방하여 테스트를 진행할 것이다.
  • 블로그가 정말 많은 도움이 되었다 ㅠㅠ
  • docker-compose 실행 | docker-compose up -d
  • docker-compose 종료 | docker-compose down
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
        - /var/run/docker.sock:/var/run/docker.sock

 

이제 Producer에서 메시지를 보내보자!

@SpringBootTest
class ProducerTest {

    @Autowired
    private TestProducer testProducer;

    @DisplayName("broker로 메시지를 전달합니다.")
    @Test
    void test(){
     testProducer.sendMessage("test Producer");
    }
}

 

  • docker-compose를 실행하고, Consumer Application을 실행하고, ProducerTest를 실행시키면..
  • 아래의 캡쳐본의 맨 아래 로그줄과 같이 성공적으로 메시지가 보내졌음을 확인할 수 있다!

📌 참고 자료

[서버] Kafka 와 Spring Boot 애플리케이션 연동

Kafka 개념과 Spring Boot + Kafka 간단한 연동

 

'Kafka' 카테고리의 다른 글

[Apache Kafka] 카프카란?  (0) 2024.11.16