Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 자격증
- serverless
- codedeploy
- ec2
- goorm x kakao
- sqs
- CICD
- CodeCommit
- Redis
- bootcamp
- 티스토리챌린지
- mapping
- rds
- codebuild
- backenddeveloper
- DynamoDB
- jpa
- 백엔드
- 스터디
- Docker
- 오블완
- 개발자
- MSA
- Spring Boot
- aws
- QueryDSL
- goorm
- s3
- orm
- spring
Archives
- Today
- Total
gony-dev 님의 블로그
[Apache Kafka] Spring boot 3.x.x과 Kafka 연동하기 본문
Apache Kafka의 구성 요소와 동작 방식에 대해 알아보았으니
간단한 실습을 통해 이를 더 이해해보도록 하겠다.
📌 프로젝트 환경
- Spring boot version | 'org.springframework.boot' version '3.3.5'
- JDK | 17
- 가독성 있는 실습을 진행하기 위해 Consumer과 Producer에 대한 프로젝트를 따로 생성해 주었다!
의존성
- 두 어플리케이션 모두 같은 의존성을 추가하였다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
// kafka 의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
Producer Application
1. ProducerConfig
@Configuration
public class ProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- 코드를 좀 살펴보자.
- producerFactory()
- ProducerFactory는 카프카 설정을 위한 역할을 맡는다.
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG | 카프카 브로커의 주소를 지정한다. 즉, 중개 장소를 지정하는 것이다.
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG | 메시지의 키를 직렬화한다.
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG | 메시지의 값을 직렬화한다.
- kafkaTemplate() | Kafka에 메시지를 송신하는 데 사용된다!
2. Producer Service
@Component
@RequiredArgsConstructor
public class TestProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("topic", message);
System.out.println("Sent message: " + message);
}
}
Consumer Application
1. ConsumerConfig
@Configuration
public class ConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
- Consumer는 Producer에서 넣은 키-값의 직렬화의 값을 역직렬화하여 네트워크를 통해 전달받은 저수준의 형식을
읽을 수 있는 객체로 변환하여 메시지를 해석한다.
2. Consumer Service
@Component
public class TestConsumer {
@KafkaListener(topics = "topic", groupId = "group_1")
public void listen(Object message) {
System.out.println(message);
}
}
- KafkaListener는 특정 경로의 메시지를 받을 수 있게 하는 어노테이션으로
이에 대해선 자세하게 다루어볼 예정이다.
Docker-compose
- 우리는 도커를 이용하여 kafka와 zookeeper의 포트를 개방하여 테스트를 진행할 것이다.
블로그가 정말 많은 도움이 되었다 ㅠㅠ- docker-compose 실행 | docker-compose up -d
- docker-compose 종료 | docker-compose down
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
이제 Producer에서 메시지를 보내보자!
@SpringBootTest
class ProducerTest {
@Autowired
private TestProducer testProducer;
@DisplayName("broker로 메시지를 전달합니다.")
@Test
void test(){
testProducer.sendMessage("test Producer");
}
}
- docker-compose를 실행하고, Consumer Application을 실행하고, ProducerTest를 실행시키면..
- 아래의 캡쳐본의 맨 아래 로그줄과 같이 성공적으로 메시지가 보내졌음을 확인할 수 있다!
📌 참고 자료
[서버] Kafka 와 Spring Boot 애플리케이션 연동
Kafka 개념과 Spring Boot + Kafka 간단한 연동
'Kafka' 카테고리의 다른 글
[Apache Kafka] 카프카란? (0) | 2024.11.16 |
---|