[MSA] Spring Boot + Kafka를 사용한 채팅 (2)

2024. 10. 21. 20:10·Spring/MSA

 

이전 글에서 여러개의 서버에서도 Kafka를 사용한다면 채팅기능을 충분히 구현할 수 있다는 것을 증명했다.

 

 

React 애플리케이션이 각각 3000, 3001 등의 포트를 사용하고, 백엔드 서버는 8081, 8082 등의 포트에서 실행될 경우, 각 프론트엔드에서 직접 해당 백엔드로 요청을 보내는 구조를 구현할 수 있다.

 

 

그러나 Spring Cloud Gateway를 사용하면, 프론트엔드에서 게이트웨이로 요청을 전송하고, 게이트웨이가 각 백엔드 서비스로 적절히 분산 처리해주는 방식으로 관리할 수 있다.

→ 더 유연한 아키텍처 설계 가능!

 

 

그리고 이전엔 하나의 EC2에서 채팅 서버를 실행했지만, 이번엔 MSA 구조로 변경해보기로 했다.

 

 

아키텍처 구조는 아래와 같다.

 

위의 구조를 보면 알 수 있듯이 프론트엔드에서 백엔드로 요청을 보낼 때, 로드밸런싱을 Spring Cloud Gateway가 담당한다.

 

 

결국 사용자의 요청이 처음으로 마주치는 것이 이 게이트웨이인 것이다.

 

 

그럼 이 요청을 백엔드로 전달하기 위해 백엔드에 해당하는 서비스를 모두 게이트웨이의 설정파일에 작성해야 한다.

 

 

application.yml

server:
  port: 9000

eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: <http://localhost:8761/eureka>

spring:
  application:
    name: apigateway-service
  cloud:
    gateway:
      mvc:
        routes:
          - id: chat-service
            uri: lb://CHAT-SERVICE
            predicates:
              - Path=/chat/*, /*/chat/**

이렇게 application.yml 을 작성하면 /chat/*, /*/chat/** 에 해당하는 요청을 모두 Eureka 서버에 등록된 chat-service 로 보낸다.

→ 보통 HTTP 요청에 대한 로드밸런싱은 저렇게 application.yml 에서도 충분히 설정이 가능하다.

 

 

하지만 지금 내가 구현하고자 하는건 웹 소켓 통신 이다.

 

 

HTTP는 요청, 응답이 끝나면 연결이 종료되지만, 웹 소켓은 아니다.

 

 

그러므로 application.yml의 routes 부분도 수정해줘야 한다.

 

 

물론 application.yml에서 설정해도 되지만, 자바 코드로 설정하는 방법도 있다.

 

 

그래서 이번엔 자바 코드로 설정해봤다.

 

 

참고로 Eureka 서버는 Spring Cloud Gateway나 주요 백엔드 서버를 실행하기 전에 먼저 실행해야 합니다!!

 

만약 Eureka 서버를 아직 구현하지 않은 경우 https://gudtjr2949.tistory.com/36 의 “서비스 등록 센터 - Eureka Sever” 부분을 보면 됩니다.

 

 

 

Spring Cloud Gateway


 

build.gradle

...

ext {
    set('springCloudVersion', "2023.0.3")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
    implementation 'io.netty:netty-resolver-dns-native-macos:4.1.68.Final:osx-aarch_64'
    implementation 'org.springframework.cloud:spring-cloud-starter-gateway'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

...

 

 

 

application.yml

server:
  port: 9000

eureka:
  instance:
    hostname: localhost
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://localhost:8761/eureka

spring:
  application:
    name: apigateway-service
  • Eureka 서버에 게이트웨이를 등록하기 위해 Eureka 서버가 실행중인 인스턴스 IP, 실행중인 Spring Boot URL을 각각 hostname과 service-url:defaultZone 에 입력한다.
  • Eureka 서버에 apigateway-service 라는 이름으로 서비스를 등록한다.
  • 참고로 hostname과 service-url:defaultZone이 localhost인 이유는 하나의 EC2에 Gateway와 Eureka를 배포했기 때문이다. 만약 서로 다른 EC2에 위치한다면 해당 IP에 맞게 변경해줘야 한다.

 

 

 

main

@SpringBootApplication
@EnableDiscoveryClient
public class ApiGatewayApplication {

    public static void main(String[] args) {
        SpringApplication.run(ApiGatewayApplication.class, args);
    }
}
  • @EnableDiscoveryClient : Eureka 서버에 해당 프로젝트를 등록한다.

 

 

 

WebConfig

@Configuration
public class WebConfig implements WebFluxConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("프론트엔드 URL")
                .allowedMethods("GET", "POST", "PUT", "DELETE")
                .exposedHeaders("location")
                .allowedHeaders("*")
                .allowCredentials(true);
    }
}
  • WebFluxConfigurer : 보통 CORS 설정을 다룰 때, WebMvcConfigurer을 상속받지만, 여기선 WebFluxConfigurer을 상속받아야한다. 그 이유는 Spring Cloud Gateway는 비동기적으로 동작하는 리액티브(Reactive) 기반으로 만들어졌기 때문이다.
  • addCorsMappings() : CORS 설정 메서드이다.

 

 

 

GatewayConfig

@Configuration
public class GatewayConfig {

    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()
                .route("chat-service", r -> r
                        .path("/chat/*", "/*/chat/**", "/ws/**")
                        .uri("lb://CHAT-SERVICE"))
                .build();
    }
}
  • customRouteLocator() : 라우팅 규칙을 정의하는 부분이다.
    • chat-service : 라우트 명 정의
    • /chat/*, /*/chat/**, /ws/** : 요청 경로 (웹 소켓 요청에 해당하는 /ws/** 가 추가됨)
    • lb://CHAT-SERVICE : 요청을 보낼 서비스명

 

 

채팅 서버


 

build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'com.mysql:mysql-connector-j'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

    // Spring Cloud Config
    implementation 'org.springframework.cloud:spring-cloud-starter-config'
    implementation 'org.springframework.cloud:spring-cloud-config-server'

    // AWS Parameter Store
    implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.1.0")
    implementation 'io.awspring.cloud:spring-cloud-aws-starter-parameter-store'
}

ext {
    set('springCloudVersion', "2023.0.2")
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}
  • 만약 AWS Parameter Store 을 사용하지 않는다면 아래의 의존성 코드는 지워도 된다.
implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.1.0")
implementation 'io.awspring.cloud:spring-cloud-aws-starter-parameter-store'

 

 

 

application.yml

config:
  type: aws-parameterstore:/chatback/parameter/

eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: ${eureka_url}

spring:
  application:
    name: chat-service
  config:
    import: ${config.type}
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://${db_url}?serverTimezone=Asia/Seoul
    username: ${db_username}
    password: ${db_password}
  jpa:
    hibernate:
      ddl-auto: update
    properties:
      hibernate:
        format_sql: true
  kafka:
    consumer:
      bootstrap-servers: ${bootstrap_server_url}
      group-id: ${group_id}
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: ${bootstrap_server_url}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • ${eureka_url} : 배포된 Eureka 서버 URL
  • ${config.type} : AWS Parameter Store 환경변수 경로
  • ${db_url}, ${db_username}, ${db_password} : DB URL, 사용자 이름, DB 패스워드
  • ${bootstrap_server_url} : 실행중인 Kafka 서버
  • ${group_id} : Kafka Consumer 그룹ID

 

 

KafkaConsumerConfig

@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {

    private final KafkaProperties kafkaProperties;

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServerUrl;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); // Kafka 브로커 서버 설정
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // Kafka Consumer Group 의 고유한 식별자를 설정
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    // 메시지를 수신하는 KafkaListenerContainerFactory 빈 정의
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory =
                new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        // Kafka 메시지를 병렬로 처리하기 위한 설정을 추가할 수 있음(스레드 풀 크기, 에러 핸들링..)
        return kafkaListenerContainerFactory;
    }
}

  • bootstrapServerUrl, groupId에 application.yml에 있는 것을 가져온다.

 

 

 

KafkaProducerConfig

@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
    
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServerUrl;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • bootstrapServerUrl에 application.yml에 있는 것을 가져온다.

 

 

나머지 코드는 이전 글과 동일합니다!! (https://gudtjr2949.tistory.com/40)

 

 

 

Kafka 서버


 

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: Kafka가 외부에서 접근할 수 있는 IP
      KAFKA_CREATE_TOPICS: "chat-exchange:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock
    depends_on:
      - zookeeper
  • KAFKA_ADVERTISED_HOST_NAME: Kafka가 외부에서 접근할 수 있는 IP를 명시한다. 만약 내가 Kafka를 배포한 EC2의 IP가 43.202.53.20일 경우, 이 IP로 설정한다.
  • KAFKA_CREATE_TOPICS: "chat-exchange:3:1"
    • chat-exchange : 토픽 명
    • 3 : 파티션 수 -> 파티션의 수가 2개 이상일 경우 병렬 처리가 가능하다.
    • 1 : 복제 인덱스 수 -> 각 파티션의 데이터가 몇 개의 복제본을 가질지를 나타낸다. 여기서는 복제본이 1개이다.

 

 

 

과부화 테스트 결과


 

 

모놀리틱

→ 1000명의 사용자가 10번의 채팅을 시도할 경우,

  • 평균 응답 시간 (Average): 213ms (0.213초)
  • 에러율 (Error %): 0.00%
  • 처리량 (Throughput): 1424.5/sec → 1초당 약 1425개 요청 처리 가능

 

 

MSA

→ 1000명의 사용자가 10번의 채팅을 시도할 경우,

  • 평균 응답 시간 (Average): 11ms (0.011초)
  • 에러율 (Error %): 0.00%
  • 처리량 (Throughput): 2069.9/sec → 1초당 약 2070개 요청 처리 가능

 

모놀리틱에서 MSA로 변경한 뒤, 테스트한 결과 평균 응답 시간에서 약 94.84% 감소하고, 처리량은 45.31% 증가하여 MSA가 더 많은 요청을 처리할 수 있다는 것을 보여준다.

 

'Spring > MSA' 카테고리의 다른 글

[MSA] Spring Boot + Kafka를 사용한 채팅 (3) (Feat : Prometheus + Grafana)  (0) 2025.02.12
[MSA] Spring Boot + Kafka를 사용한 채팅 (1)  (3) 2024.10.05
[MSA] Spring Boot + MSA (2) (Feat : Kafka)  (0) 2024.09.25
[MSA] Spring Boot + MSA (1)  (1) 2024.09.24
'Spring/MSA' 카테고리의 다른 글
  • [MSA] Spring Boot + Kafka를 사용한 채팅 (3) (Feat : Prometheus + Grafana)
  • [MSA] Spring Boot + Kafka를 사용한 채팅 (1)
  • [MSA] Spring Boot + MSA (2) (Feat : Kafka)
  • [MSA] Spring Boot + MSA (1)
오도형석
오도형석
  • 오도형석
    형석이의 성장일기
    오도형석
  • 전체
    오늘
    어제
    • 분류 전체보기 N
      • MSA 모니터링 서비스
        • DB
      • 스파르타 코딩클럽
        • SQL
        • Spring
      • 백엔드
        • Internet
        • Java
        • DB
      • 캡스톤
        • Django
        • 자연어처리
      • Spring
        • JPA
        • MSA
      • ETC
        • ERROR
      • 개발 일기 N
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 인기 글

  • 태그

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
오도형석
[MSA] Spring Boot + Kafka를 사용한 채팅 (2)
상단으로

티스토리툴바