채팅 기능을 구현할 때 Spring Boot에서는 주로 STOMP나 Redis를 이용해 WebSocket 기능을 개발한다.
그런데 MSA를 학습하면서 Kafka를 사용하는 과정에서, Kafka의 동작 방식이 Redis를 이용한 채팅 기능 구현과 매우 유사하다는 느낌을 받았다.
이는 Kafka의 Producer와 Consumer가 Redis의 Publisher와 Subscriber 역할을 하고, Kafka는 Kafka 브로커, Redis는 메시지 브로커가 존재하는 형태가 매우 유사하게 느껴졌기 때문이다.
이후 관련 정보를 더 찾아보니 Kafka를 이용해 채팅 기능을 구현한 사례들을 발견했다. 그래서 이번에는 모놀리틱 아키텍처 기반의 채팅 서버가 아닌, MSA 기반의 채팅 서버를 직접 구현해보고자 결심하게 되었다.
그런데 이제 Kafka를 곁들인...
구조는 아래와 같다.
Kafka
Kafka 실행용 docker-compose
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test-chat:3:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock
depends_on:
- zookeeper
Kafka를 사용하기 위해선 zookeeper가 필요하므로, docker compose를 사용해 함께 실행시킨다.
채팅 서버
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
}
ext {
set('springCloudVersion', "2023.0.2")
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
application.yml
server:
port: 8081
spring:
application:
name: chat-service
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: dev1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 이 아래엔 DB관련 설정
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/websocket?serverTimezone=Asia/Seoul
username: root
jpa:
hibernate:
ddl-auto: update
properties:
hibernate:
format_sql: true
- spring.application.name: chat-service 라는 이름의 서비스로 Eureka 서버에 등록한다.
WebConfig
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("http://localhost:3000")
.allowedMethods("GET", "POST", "PUT", "DELETE")
.exposedHeaders("location")
.allowedHeaders("*")
.allowCredentials(true);
}
}
React를 사용해 테스트를 할 예정이므로, Spring 설정 파일에 CORS 관련 설정 코드를 추가한다.
WebSocketConfig
웹 소켓 통신에 필요한 설정을 정의하는 클래스이다.
@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/sub")
.setHeartbeatValue(new long[] {3000L, 3000L});
config.setApplicationDestinationPrefixes("/pub");
}
// STOMP 엔드포인트
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*");
}
}
- configureMessageBroker() : 웹 소켓 통신에 사용할 메시지 브로커를 설정한다.
- config.enableSimpleBroker("/sub") : 메시지를 구독(수신)하는 요청 엔드포인트
- config.setApplicationDestinationPrefixes("/pub") : 메시지를 발행(송신)하는 엔드포인트
- registerStompEndpoints() : 사용자가 서버와 WebSocket 연결을 맺기 위한 접점을 정의한다. (STOMP 엔드포인트 설정)
- registry.addEndpoint("/ws") : 엔드포인트를 /ws 로 설정
- 실제 요청 URL은 http://localhost:8081/ws
- registry.addEndpoint("/ws") : 엔드포인트를 /ws 로 설정
KafkaProducerConfig
Kafka 프로듀서용 설정파일이다.
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- @EnableKafka : 이 어노테이션은 Kafka 기능을 활성화한다.
- producerFactory() : Kafka에서 메시지를 생성하는 역할을 하는 프로듀서 객체의 생성 로직을 정의한다.
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: Kafka 브로커의 주소를 지정한다.
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG : 메시지 키와 값을 직렬화할 때 사용하는 클래스를 설정한다.
- kafkaTemplate() : 이 Spring Boot에서 Kafka 프로듀서 객체를 사용하기 위한 템플릿을 생성한다.
KafkaConsumerConfig
Kafka 컨슈머용 설정파일이다.
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dev1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
- consumerFactory() : Kafka에서 메시지를 수신하는 역할을 하는 프로듀서 객체의 생성 로직을 정의한다.
- ConsumerConfig.GROUP_ID_CONFIG : 소비자가 속한 그룹을 정의한다. Kafka의 소비자 그룹은 같은 그룹 내의 소비자들끼리 특정 파티션을 공유하지 않고 메시지를 분산 처리하는데, 여기서는 "dev1" 그룹에 속하게 된다.
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG : Kafka가 새로운 소비자 그룹을 처음 실행할 때 읽어야 할 메시지의 오프셋 위치를 설정한다. "earliest" 로 설정하면 가장 처음부터 메시지를 읽는다.
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG : 메시지 키와 값을 역직렬화할 때 사용하는 클래스를 설정한다. 외부에서 내부로 오는 것이기 때문에 역직렬화 가 필요하다!
- kafkaListenerContainerFactory() : Kafka 메시지를 수신하고 처리하는 리스너 컨테이너를 생성합니다.
KafkaProducer
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMassage(String topic, ChatMessage chatMessage) {
ObjectMapper objectMapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = objectMapper.writeValueAsString(chatMessage);
} catch(JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
}
}
- kafkaTemplate : 이전 KafkaProducerConfig에서 만든 템플릿을 가져온다.
- sendMassage() : Kafka 서버(이전에 docker-compose를 사용해 실행시킨 서버)의 특정 토픽에 메시지를 전송한다. 이 프로젝트에선 chat-exchange 이란 토픽을 사용한다.
- 외부로 데이터를 전달하기 때문에 Dto 객체를 JSON 문자열 타입으로 직렬화해야한다.
KafkaConsumer
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final SimpMessagingTemplate template;
@KafkaListener(topics = "chat-exchange")
public void consume(String message) {
ObjectMapper objectMapper = new ObjectMapper();
try {
// String 메시지를 ChatMessage 객체로 변환
ChatMessage chatMessage = objectMapper.readValue(message, ChatMessage.class);
// WebSocket을 통해 해당 채팅방으로 메시지 전송
String destination = "/sub/chat/" + chatMessage.getId();
template.convertAndSend(destination, chatMessage);
} catch (Exception e) {
e.printStackTrace(); // 에러 로그 출력
}
}
}
- SimpMessagingTemplate : 엡 소켓을 통해 메시지를 전송할 때 사용된다.
- @KafkaListener(topics = "chat-exchange") : chat-exchange 라는 토픽을 구독했음을 의미한다. 이 토픽에 메시지가 도착하면, consume() 메서드가 동작한다.
- consume() : Kafka 서버에게 전달받은 JSON 문자열 형태의 데이터를 ChatMessage Dto 형태로 변환한다. 그리고 현재 사용자가 구독중인 주소(/sub/chat/{chatRoomId})에 채팅 내용을 전달한다.
ChatMessage
채팅에 사용할 Dto 클래스다.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {
private Long id;
private String name;
private String message;
}
- id : 채팅방 id (1번 채팅방이면 1, 2번 채팅방이면 2, …)
- name : 채팅을 입력한 사용자의 이름
- message : 입력한 채팅 내용
ChatController
사용자의 채팅 관련 요청을 처리할 컨트롤러이다.
@Slf4j
@RestController
@RequiredArgsConstructor
public class ChatController {
private final ChatService chatService;
private final KafkaProducer kafkaProducer;
@MessageMapping("/chat") // pub
public ResponseEntity<Void> greeting(@RequestBody ChatMessage chatMessage) throws Exception {
Long chatroomId = chatMessage.getId();
chatMessage.setId(chatroomId);
chatService.saveChat(chatMessage);
kafkaProducer.sendMassage("chat-exchange", chatMessage);
return ResponseEntity.ok().build();
}
@GetMapping("/chat/{chatroomId}")
public ResponseEntity<ChatHistory> loadChatHistory(@PathVariable Long chatroomId) {
ChatHistory chatHistory = chatService.loadChatHistory(chatroomId);
return ResponseEntity.ok().body(chatHistory);
}
}
- kafkaProducer : 사용자에게 채팅을 받으면, 해당 채팅방에 있는(채팅방을 구독한) 사용자들에게도 모두 채팅을 전달해야 하므로 KafkaProducer를 선언한다.
- @MessageMapping("/chat") : Publish를 의미한다. 만약 사용자가 채팅 내용을 입력하고 전송 버튼을 클릭하면, React에선 /pub/chat 이라고 적힌 경로에 요청을 보낸다. (아래의 채팅을 전송하는 React 코드에는 /chat 이 아니라 /pub가 앞에 붙는 이유는 WebSocketConfig 에서 메시지를 발행(송신)하는 엔드포인트를 /pub 로 설정했기 때문!!)
const sendMessage = () => {
if (stompClient.current && inputNameValue && inputMessageValue) {
const body = {
id : roomId,
name : inputNameValue,
message : inputMessageValue
};
stompClient.current.send(`/pub/chat`, {}, JSON.stringify(body));
setInputNameValue('');
setInputMessageValue('');
}
};
- kafkaProducer.sendMassage() : KafkaProducer에게 Kafka 서버에 chat-exchange 토픽을 구독한 서버에게 메시지를 전달해달라고 요청을 보낸다.
이게 첫번째 채팅 서버의 코드이다. 두번째, 세번째 서버는 정확하게 두 부분만 수정하면 된다.
두번째 채팅 서버
server:
port: 8082
포트를 8082로 변경한다.
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dev2");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
...
}
ConsumerConfig.GROUP_ID_CONFIG 를 dev2 로 변경한다.
세번째 채팅 서버
server:
port: 8083
포트를 8083로 변경한다.
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dev3");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
...
}
ConsumerConfig.GROUP_ID_CONFIG 를 dev3 로 변경한다.
총 3개의 채팅 서버(Spring Boot)를 실행한다. 그리고 3개의 프론트엔드(React) 서버를 실행한다.
(참고로 React 또한 3000, 3001, 3002 포트로 실행한다.)
프론트엔드
// 웹소켓 연결 설정
const connect = () => {
const socket = new WebSocket("ws://localhost:8081/ws");
stompClient.current = Stomp.over(socket);
stompClient.current.connect({}, () => {
stompClient.current.subscribe(`/sub/chat/`+roomId, (message) => {
const newMessage = JSON.parse(message.body);
console.log("newMessage : ", newMessage);
setMessages((prevMessages) => [...prevMessages, newMessage]);
});
});
};
// 웹소켓 연결 해제
const disconnect = () => {
if (stompClient.current) {
stompClient.current.disconnect();
}
};
// 기존 채팅 메시지를 서버로부터 가져오는 함수
const fetchMessages = () => {
return axios.get(`http://localhost:8081/chat/${roomId}`)
.then(response => {
setMessages(response.data.message);
});
};
useEffect(() => {
connect();
fetchMessages();
// 컴포넌트 언마운트 시 웹소켓 연결 해제
return () => disconnect();
}, []);
//메세지 전송
const sendMessage = () => {
if (stompClient.current && inputNameValue && inputMessageValue) {
const body = {
id : roomId,
name : inputNameValue,
message : inputMessageValue
};
stompClient.current.send(`/pub/chat`, {}, JSON.stringify(body));
setInputNameValue('');
setInputMessageValue('');
}
};
채팅 서버중, 8081 포트를 사용하는 서버와 통신한다.
두번째, 세번째 프론트엔드는 8082, 8083 포트를 사용하는 서버와 통신한다.
채팅 시연
그리고 현재 백엔드는 8081, 8082, 8083 이렇게 총 3개의 같은 서버를 띄우고, 프론트엔드 또한 3000, 3001, 3002 3개의 서버를 띄웠다..
이렇게 같은 서버를 3개씩 띄운 이유는 내가 아직까지 Spring Cloud Gateway를 사용한 웹 소켓 기능을 완벽하게 완성하지 못해서이다..
다음 글에선 Spring Cloud Gateway를 적용한 뒤 돌아오겠습니다!
'Spring > MSA' 카테고리의 다른 글
[MSA] Spring Boot + Kafka를 사용한 채팅 (3) (Feat : Prometheus + Grafana) (0) | 2025.02.12 |
---|---|
[MSA] Spring Boot + Kafka를 사용한 채팅 (2) (1) | 2024.10.21 |
[MSA] Spring Boot + MSA (2) (Feat : Kafka) (0) | 2024.09.25 |
[MSA] Spring Boot + MSA (1) (1) | 2024.09.24 |