Kafka
Kafka란, 분산 메시지 스트리밍 플랫폼으로, 여러 개의 서비스가 분리되어 있더라도 각각의 서비스간에 데이터를 실시간으로 전송 및 처리할 수 있도록 돕는 역할을 한다. 이를 통해 마이크로 서비스간의 데이터 동기화 상태를 유지할 수 있다.
위의 그림을 보면 알 수 있듯이 Kafka Cluster는 이벤트 메시지 브로커를 담당한다. Producer에게 이벤트를 받고, 해당 이벤트를 Consumer 가 가져갈 수 있게 한다.
내부 구조는 다음과 같다.
- 브로커
- 브로커는 Kafka 클러스터의 핵심 구성 요소로, Kafka 서버이다.
- 여러 개의 브로커가 모여 Kafka Cluster를 형성하며, 각 브로커는 특정 파티션 데이터를 관리한다.
- Producer가 보낸 메시지를 받아 저장하고, Consumer가 메시지를 요청하면 데이터를 제공한다.
- 토픽
- 토픽은 데이터가 저장되는 논리적인 단위로, 메시지 스트림을 관리하는 개념이다.
- 각각의 토픽은 다수의 파티션(Partition)으로 나뉜다.
- 토픽은 여러 서비스가 데이터를 주고받을 때 그 목적에 맞는 경로를 제공한다. 예를 들어, decrease-product-stock 라는 토픽은 ‘상품 재고 감소’ 를 담당하는 토픽인 것이다.
- 파티션
- 파티션은 토픽을 나눈 물리적인 데이터 단위로, 각각의 파티션에 메시지들이 순서대로 저장된다.
- 하나의 토픽은 반드시 하나 이상의 파티션을 보유한다.
- 파티션은 클러스터 내의 여러 브로커에 분산 저장되며, 이를 통해 병렬 처리와 확장성을 제공한다.
- 각 파티션은 고유한 오프셋(Offset)을 가지고 있어, 메시지의 순서를 보장한다.
(참고) 이벤트 메시지 브로커란?
시스템에서 발생하는 이벤트(상태 변화나 작업 완료 같은 사건)를 다른 서비스에 전달해주는 중간 역할을 하는 시스템이다.
Order 서비스에서 주문이 발생하면, Product에서 해당 제품의 재고가 감소해야 한다. 여기선 Order 서비스가 발행자(Producer)가 되고, Product 서비스가 구독자(Consumer)가 되는 것이다. 그리고 이 두 서비스 사이에 메시지 브로커가 중재자 역할로 존재하는 것이다.
Kafka를 적용한 MSA
위에서 말했듯이 MSA에서 Kafka를 사용하는 목적 중 하나는 서비스간의 데이터 동기화 상태를 유지 하기 위함이다.
이러한 상황의 대표적인 예시가 위에서 언급한 Product 서비스와 Order 서비스 간의 관계이다.
사용자가 주문을 시도하면, 해당 제품의 재고가 주문한 수량만큼 감소해야 한다.
대략적인 로직을 파악해보면 아래의 그림과 같다.
- 사용자가 한 제품을 주문한다.
- Spring Cloud Gateway는 사용자의 구매 요청을 Order Service로 전달한다.
- 해당 제품의 수량이 존재하는 경우, 주문정보를 저장하고 Kafka의 decrease-product-stock 토픽에 제품 정보와 수량을 전송한다.
- decrease-product-stock 토픽을 구독하고 있던 Product Service는 해당 메시지를 가져온 뒤, 제품의 수량이 충분한지 확인하고, 충분하다면 제품의 수량을 감소한다.
Kafka를 적용한 MSA - 구현
이전 글에선 Order Service의 Controller만 생성했는데, Kafka를 사용하기 위해선 Kafka 관련 클래스를 추가해야한다.
(메시지 발행자는 Order고, 메시지 소비자가 Product이므로 Order는 Produce 관련 코드만, Product는 Consume 관련 코드만 작성했습니다!)
product.KafkaConsumerConfig
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 부트스트랩 서버(Kafka 브로커) 주소 설정
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dev-product"); // Kafka Consumer Group 설정
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 외부에서 내부로 들어오는 상황이기 때문에 역직렬화
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") : Kafka 브로커에 접근하기 위한 부트스트랩 서버의 주소를 지정함
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "dev") : 어떤 Consumer Group을 사용할 지 선택함
(참고) BOOTSTRAP_SERVERS 란?
여기서 사용되는 부트스트랩 서버는 Kafka 클러스터와 클라이언트(Producer, Consumer)가 처음 연결될 수 있는 Kafka 브로커들의 주소를 제공하는 초기 진입점을 나타낸다.
product.KafkaConsumer
@Slf4j
@Service
public class KafkaConsumer {
@KafkaListener(topics = "decrease-product-stock", groupId = "dev-product")
public void decreaseProductStock(String message) {
log.info("수신 메시지 = {}", message);
}
}
- @KafkaListener(topics = "decrease-product-stock", groupId = "dev-product") : Kafka 브로커에게 수신할 메시지 토픽, 그룹 ID를 설정함
order.KafkaProducerConfig
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 내부에서 외부로 데이터를 내보내는 상황이기 때문에 직렬화
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
order.KafkaProducer
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void createOrder(String topic, OrderKafkaDto orderKafkaDto) {
ObjectMapper objectMapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = objectMapper.writeValueAsString(orderKafkaDto);
} catch(JsonProcessingException e) {
e.printStackTrace();
}
// 주문이 생성될 경우, 해당 주문의 제품 재고를 차감하기 위해 Kafka 를 사용해 ProductService 로 요청을 보냄
kafkaTemplate.send(topic, jsonInString);
}
}
--------------------------------------------------------------------------------
@Getter
@Builder
@AllArgsConstructor
public class OrderKafkaDto {
private Long productId;
private int count;
}
- createOrder(String topic, OrderKafkaDto orderKafkaDto) : 전달받은 토픽과 Dto를 사용해 Kafka 브로커로 전송되어 해당 토픽을 구독하고 있는 컨슈머(Product)가 소비할 수 있게 함
- Kafka가 메시지를 문자열 또는 바이트 형식으로 주고받기 때문에 전송하기 전에 Dto 객체를 문자열로 변환해야 함
위의 코드를 모두 구현하고, 기본적인 Controller, Service, Dto까지 구현하면 기본적인 코드 작성은 끝난다.
@RestController
@RequiredArgsConstructor
@RequestMapping("/orders")
public class OrderController {
private final OrderService orderService;
@PostMapping("/create")
public ResponseEntity<String> createOrder(@RequestBody OrderCreateRequestDto request) {
orderService.createOrder(request.toService());
return ResponseEntity.ok().body("주문 생성 완료");
}
}
--------------------------------------------------------------------------------
@Getter
@AllArgsConstructor
public class OrderCreateRequestDto {
private Long productId;
private int count;
public OrderCreateServiceRequest toService() {
return OrderCreateServiceRequest.builder()
.productId(productId)
.count(count)
.build();
}
}
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {
private final KafkaProducer producer;
@Override
public void createOrder(OrderCreateServiceRequest request) {
// 1. OrderRepository 주문 정보 저장
// 2. 하나의 Order 가 생성되었으므로, Product 에 있는 재고를 감소시켜야 함
producer.createOrder("decrease-product-stock", request.toKafka());
}
}
--------------------------------------------------------------------------------
@Getter
@Builder
@AllArgsConstructor
public class OrderCreateServiceRequest {
private Long productId;
private int count;
public OrderKafkaDto toKafka() {
return OrderKafkaDto.builder()
.productId(productId)
.count(count)
.build();
}
}
다 만들었으면 API Gateway를 통해 Order 서비스로 주문 생성 요청을 보낸다.
이후 Product 서비스에 가보면 구매를 시도한 제품의 고유 ID와 구매한 수량이 넘어온 것을 볼 수 있다.
'Spring > MSA' 카테고리의 다른 글
[MSA] Spring Boot + Kafka를 사용한 채팅 (3) (Feat : Prometheus + Grafana) (0) | 2025.02.12 |
---|---|
[MSA] Spring Boot + Kafka를 사용한 채팅 (2) (1) | 2024.10.21 |
[MSA] Spring Boot + Kafka를 사용한 채팅 (1) (3) | 2024.10.05 |
[MSA] Spring Boot + MSA (1) (1) | 2024.09.24 |