Wednesday, September 20, 2023

스프링 부트와 카프카로 구축하는 실전 마이크로서비스 아키텍처

서론: 왜 지금 마이크로서비스와 비동기 메시징인가?

현대의 소프트웨어 개발 패러다임은 거대한 단일 애플리케이션(Monolithic Architecture)에서 작고 독립적으로 배포 가능한 여러 서비스의 집합(Microservices Architecture)으로 빠르게 이동하고 있습니다. 이러한 변화의 중심에는 비즈니스 민첩성 향상, 기술 스택의 유연성, 그리고 서비스 단위의 확장성 확보라는 명확한 목표가 있습니다. 하지만 여러 서비스가 분산되어 동작하는 환경은 필연적으로 서비스 간의 통신이라는 새로운 복잡성을 야기합니다.

가장 단순한 통신 방식은 REST API를 이용한 동기(Synchronous) 호출입니다. 서비스 A가 서비스 B의 기능이 필요할 때, 서비스 B의 API를 호출하고 응답이 올 때까지 기다리는 방식이죠. 이 방식은 직관적이지만, 치명적인 약점을 가집니다. 만약 서비스 B가 일시적인 장애를 겪거나 응답이 지연되면, 이를 호출한 서비스 A 역시 장애가 전파되어 전체 시스템의 안정성을 해칠 수 있습니다. 이를 '결합(Coupling)'이 높다고 표현하며, 분산 시스템에서는 이러한 강한 결합을 피하는 것이 매우 중요합니다.

이 문제에 대한 효과적인 해결책이 바로 이벤트 기반 아키텍처(Event-Driven Architecture)비동기(Asynchronous) 메시징입니다. 서비스들은 직접 통신하는 대신, '이벤트'라는 메시지를 중앙 메시지 시스템에 발행(Publish)하고, 해당 이벤트에 관심 있는 다른 서비스들이 이를 구독(Subscribe)하여 처리합니다. 이 과정은 비동기적으로 일어나므로, 메시지를 발행한 서비스는 수신 서비스의 상태와 관계없이 자신의 작업을 계속할 수 있습니다. 서비스 간의 결합이 느슨해지고(Loosely Coupled), 개별 서비스의 장애가 전체 시스템으로 확산되는 것을 방지하여 시스템의 회복탄력성(Resilience)을 극적으로 향상시킵니다.

이러한 이벤트 기반 마이크로서비스 아키텍처를 구현하는 데 있어 스프링 부트(Spring Boot)아파치 카프카(Apache Kafka)는 현재 업계에서 가장 강력하고 대중적인 조합 중 하나입니다. 스프링 부트는 자바 생태계에서 마이크로서비스 개발을 위한 사실상의 표준으로, 개발자가 비즈니스 로직에만 집중할 수 있도록 수많은 편의 기능을 제공합니다. 아파치 카프카는 단순한 메시지 큐를 넘어, 대용량의 실시간 데이터 스트림을 안정적으로 처리하기 위해 설계된 분산 스트리밍 플랫폼입니다.

본 글에서는 스프링 부트와 카프카를 결합하여 어떻게 견고하고 확장 가능한 마이크로서비스를 구축할 수 있는지, 이론적 배경부터 실전 예제 코드, 그리고 운영 환경에서 고려해야 할 고급 주제까지 단계별로 상세하게 다룰 것입니다. 단순히 코드를 나열하는 것을 넘어, 각 기술의 핵심 원리를 이해하고 왜 이러한 방식으로 설계해야 하는지에 대한 깊이 있는 통찰을 제공하는 것을 목표로 합니다.

1장: 핵심 기반 다지기 - 스프링 부트와 아파치 카프카

본격적인 구현에 앞서, 우리가 사용할 두 가지 핵심 기술인 스프링 부트와 카프카의 본질을 정확히 이해하는 것이 중요합니다. 이들의 철학과 내부 동작 원리를 알면, 단순히 라이브러리를 사용하는 것을 넘어 발생하는 문제에 효과적으로 대처하고 시스템을 최적화할 수 있는 능력을 갖추게 됩니다.

1.1. 스프링 부트(Spring Boot)의 철학

스프링 부트는 기존 스프링 프레임워크(Spring Framework)의 복잡성을 해결하기 위해 탄생했습니다. 과거 스프링 기반 애플리케이션을 개발하기 위해서는 수많은 XML 설정과 라이브러리 버전 호환성 문제로 인해 개발 초기 단계부터 상당한 시간과 노력을 소모해야 했습니다. 스프링 부트는 다음 세 가지 핵심 철학을 통해 이러한 문제를 해결합니다.

  • Convention over Configuration (설정보다 관례): "개발자가 특별히 설정하지 않으면, 가장 보편적이고 합리적인 방식으로 동작해야 한다"는 원칙입니다. 예를 들어, H2 데이터베이스 라이브러리가 클래스패스에 존재하면, 스프링 부트는 별도의 설정 없이 자동으로 인메모리 데이터베이스 연결을 구성해줍니다. 개발자는 꼭 필요한 경우에만 설정을 변경하면 되므로, 설정 파일의 양이 획기적으로 줄어듭니다.
  • Auto-configuration (자동 설정): 스프링 부트는 프로젝트의 클래스패스에 포함된 라이브러리들을 분석하여, 해당 라이브러리들을 사용하는 데 필요한 스프링 빈(Bean)들을 자동으로 등록하고 설정합니다. 예를 들어, spring-boot-starter-web 의존성을 추가하면 내장 톰캣(Tomcat) 서버, 디스패처 서블릿(DispatcherServlet), Jackson 메시지 컨버터 등이 모두 자동으로 구성되어 즉시 웹 애플리케이션을 개발할 수 있습니다.
  • Starter Dependencies (스타터 의존성): 특정 기능을 개발하는 데 필요한 라이브러리들의 묶음을 '스타터'라는 이름으로 제공합니다. 예를 들어, 카프카 연동을 위해서는 spring-kafka 스타터를, JPA를 사용하기 위해서는 spring-boot-starter-data-jpa 스타터를 추가하기만 하면 됩니다. 이를 통해 개발자는 라이브러리 간의 복잡한 버전 호환성 문제를 고민할 필요 없이, 검증된 조합을 손쉽게 사용할 수 있습니다.

이러한 특징들 덕분에 스프링 부트는 개발자가 인프라 설정이 아닌 비즈니스 로직 구현에 온전히 집중할 수 있는 환경을 제공하며, 마이크로서비스 개발의 생산성을 극대화하는 강력한 도구로 자리매김했습니다.

1.2. 아파치 카프카(Apache Kafka)의 구조와 원리

카프카를 단순한 메시지 큐(Message Queue)로 생각하면 그 잠재력의 절반도 활용하지 못하는 것입니다. 카프카는 분산 커밋 로그(Distributed Commit Log)라는 핵심 아이디어를 기반으로 하는 분산 스트리밍 플랫폼입니다.

카프카의 핵심 구성 요소는 다음과 같습니다.

  • 브로커(Broker): 카프카 서버의 단일 인스턴스를 의미합니다. 일반적으로 3대 이상의 브로커가 클러스터(Cluster)를 구성하여 동작하며, 이를 통해 고가용성과 확장성을 보장합니다.
  • 주키퍼(Zookeeper) / KRaft: 카프카 클러스터의 메타데이터(브로커 정보, 토픽 설정, 리더 선출 등)를 관리하는 코디네이션 시스템입니다. 최신 버전의 카프카에서는 주키퍼 의존성을 제거하고 자체적인 합의 프로토콜인 KRaft(Kafka Raft) 모드를 도입하여 운영 복잡성을 낮추고 있습니다.
  • 토픽(Topic): 메시지를 구분하기 위한 논리적인 채널 또는 카테고리입니다. 마치 데이터베이스의 테이블과 유사한 개념으로, 프로듀서는 특정 토픽에 메시지를 발행하고, 컨슈머는 특정 토픽의 메시지를 구독합니다.
  • 파티션(Partition): 각 토픽은 하나 이상의 파티션으로 나뉘어 저장됩니다. 파티션은 메시지가 실제로 저장되는 물리적인 단위이며, 순서가 보장되는 불변의(immutable) 로그 파일입니다. 토픽을 여러 파티션으로 나누면, 여러 컨슈머가 각기 다른 파티션을 병렬로 처리할 수 있게 되어 처리량을 극대화할 수 있습니다. 하나의 파티션 내에서는 메시지의 순서가 보장되지만, 토픽 전체적으로는 메시지 순서가 보장되지 않습니다.
  • 오프셋(Offset): 파티션 내에서 각 메시지가 가지는 고유한 순번(ID)입니다. 컨슈머는 이 오프셋을 기준으로 자신이 어디까지 메시지를 읽었는지 추적합니다.
  • 프로듀서(Producer): 카프카 토픽으로 메시지(이벤트)를 발행하는 클라이언트 애플리케이션입니다.
  • 컨슈머(Consumer): 카프카 토픽의 메시지를 구독하여 가져와서 처리하는 클라이언트 애플리케이션입니다.
  • 컨슈머 그룹(Consumer Group): 하나 이상의 컨슈머가 모여 구성된 그룹입니다. 특정 토픽을 구독하는 컨슈머 그룹 내에서는 하나의 파티션이 오직 하나의 컨슈머에게만 할당됩니다. 만약 컨슈머 그룹에 새로운 컨슈머가 추가되거나 기존 컨슈머가 종료되면, 파티션의 소유권이 재조정되는 '리밸런싱(Rebalancing)' 과정이 발생합니다. 이 메커니즘을 통해 카프카는 장애 허용(Fault Tolerance)과 수평적 확장(Horizontal Scaling)을 동시에 달성합니다.

카프카는 메시지를 컨슈머가 읽어가도 바로 삭제하지 않고, 설정된 보관 주기(retention period) 동안 디스크에 안전하게 보관합니다. 이 특징 덕분에 여러 다른 컨슈머 그룹이 각자의 필요에 따라 동일한 데이터를 여러 번 읽어갈 수 있으며, 실시간 처리뿐만 아니라 배치(batch) 처리나 데이터 분석 파이프라인 구축에도 매우 유용합니다.

2장: 개발 환경 구축 및 초기 설정

이제 이론적 배경을 바탕으로 실제 개발을 위한 환경을 구축해 보겠습니다. 로컬 환경에서 카프카 클러스터를 손쉽게 실행하고, 스프링 부트 프로젝트를 생성하여 카프카 연동을 위한 기본 설정을 완료하는 과정을 단계별로 안내합니다.

2.1. 필수 준비물

개발을 시작하기 전에 다음 소프트웨어가 시스템에 설치되어 있는지 확인하세요.

  • Java Development Kit (JDK): 11 버전 이상 (17 LTS 권장)
  • Build Tool: Maven 3.6+ 또는 Gradle 6.8+
  • IDE: IntelliJ IDEA, Eclipse, VS Code 등 선호하는 Java IDE
  • Docker & Docker Compose: 로컬에서 카프카와 주키퍼를 컨테이너로 실행하기 위해 필요합니다.

2.2. Docker Compose를 활용한 카프카 클러스터 실행

카프카와 주키퍼를 직접 다운로드하여 설치하는 것은 번거로울 수 있습니다. Docker Compose를 사용하면 단 몇 줄의 설정 파일과 명령어 하나로 손쉽게 전체 클러스터를 실행할 수 있습니다.

프로젝트 루트 디렉터리에 docker-compose.yml 파일을 생성하고 아래 내용을 작성하세요.


version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    container_name: kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

이 설정 파일은 Confluent Platform 이미지를 사용하여 주키퍼와 단일 노드 카프카 브로커를 실행합니다. 특히 KAFKA_ADVERTISED_LISTENERS 설정이 중요합니다. localhost:9092는 Docker 호스트(우리의 PC)에서 카프카에 접근할 때 사용하는 주소이고, kafka:29092는 Docker 네트워크 내부에서 다른 컨테이너가 카프카에 접근할 때 사용하는 주소입니다.

이제 터미널에서 아래 명령어를 실행하여 카프카 클러스터를 시작합니다.


docker-compose up -d

-d 옵션은 컨테이너를 백그라운드에서 실행하도록 합니다. docker ps 명령어로 zookeeper와 kafka 컨테이너가 정상적으로 실행 중인지 확인할 수 있습니다.

2.3. Spring Initializr를 통한 프로젝트 생성

스프링 부트 프로젝트를 가장 쉽게 시작하는 방법은 Spring Initializr 웹사이트를 이용하는 것입니다. 다음 설정으로 프로젝트를 생성합니다.

  • Project: Gradle - Groovy (또는 Maven)
  • Language: Java
  • Spring Boot: 3.X.X 버전대 (안정적인 최신 버전 선택)
  • Project Metadata:
    • GroupId: com.example
    • ArtifactId: kafka-microservice
    • Name: kafka-microservice
    • Packaging: Jar
    • Java: 17 (또는 설치된 JDK 버전)
  • Dependencies:
    • Spring Web
    • Spring for Apache Kafka

'GENERATE' 버튼을 클릭하여 프로젝트 압축 파일을 다운로드하고, 원하는 위치에 압축을 해제한 후 IDE로 프로젝트를 엽니다.

2.4. application.yml 상세 설정 파헤치기

프로젝트의 src/main/resources/ 경로에 있는 application.properties 파일을 application.yml로 변경하고, 카프카 연동을 위한 설정을 추가합니다. YAML 형식은 계층 구조를 표현하기에 더 용이하여 복잡한 설정에 유리합니다.


spring:
  application:
    name: kafka-microservice-app

  kafka:
    # --- Producer Configurations ---
    producer:
      # 카프카 클러스터에 연결하기 위한 브로커 서버 목록
      bootstrap-servers: localhost:9092
      # 메시지 키를 직렬화(byte array로 변환)하는 클래스
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 메시지 값을 직렬화하는 클래스 (JSON 전송을 위해 추후 변경 예정)
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 프로듀서가 리더 파티션으로부터 응답을 기다리는 기준
      # all: ISR(In-Sync Replicas)에 포함된 모든 복제본에 쓰기가 완료되었을 때 응답을 받음. 데이터 유실 가능성 최소화.
      acks: all 
      properties:
        # Exactly-once 시맨틱스를 위한 설정 (6장에서 상세히 다룸)
        "enable.idempotence": "true" 
        # 재시도 사이에 발생하는 지연 시간(ms)
        "delivery.timeout.ms": "120000"
        # 재시도 횟수
        "retries": "3"

    # --- Consumer Configurations ---
    consumer:
      # 카프카 클러스터에 연결하기 위한 브로커 서버 목록
      bootstrap-servers: localhost:9092
      # 컨슈머가 속할 그룹 ID (동일 그룹 내에서는 파티션이 분배됨)
      group-id: my-group
      # 브로커에 초기 오프셋 정보가 없을 때 어디서부터 읽을지 결정
      # earliest: 가장 오래된 메시지부터, latest: 가장 최신 메시지부터
      auto-offset-reset: earliest
      # 메시지 키를 역직렬화(byte array에서 객체로 변환)하는 클래스
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 메시지 값을 역직렬화하는 클래스
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 오프셋을 자동으로 커밋할지 여부
      enable-auto-commit: false # false로 설정하여 수동 커밋 사용 권장 (4장에서 상세히 다룸)

    # --- Listener (Consumer) Configurations ---
    listener:
      # 컨슈머 리스너의 동작 방식 설정
      # MANUAL_IMMEDIATE: 수동으로 즉시 오프셋을 커밋
      ack-mode: manual_immediate

각 설정 항목에 대한 주석을 통해 그 의미를 파악하는 것이 중요합니다. 특히 프로듀서의 acks 설정은 데이터의 신뢰성과 성능 사이의 트레이드오프를 결정하는 핵심 요소이며, 컨슈머의 auto-offset-resetenable-auto-commit 설정은 메시지 처리 시맨틱스를 결정하는 데 매우 중요합니다.

이로써 우리는 카프카를 사용할 준비를 마쳤습니다. 다음 장에서는 이 설정을 바탕으로 메시지를 발행하는 프로듀서를 구현해 보겠습니다.

3장: 프로듀서 구현 - 안정적인 메시지 발행

카프카 프로듀서는 마이크로서비스 아키텍처에서 이벤트의 시작점입니다. 서비스에서 발생한 중요한 상태 변화(예: 주문 생성, 회원 가입)를 안정적으로 카프카 토픽에 전달하는 역할을 맡습니다. 이번 장에서는 스프링 카프카가 제공하는 KafkaTemplate을 활용하여 단순한 문자열 메시지부터 복잡한 객체(DTO)까지 전송하는 방법을 다룹니다.

3.1. Java Configuration을 통한 프로듀서 설정

application.yml을 통한 설정은 편리하지만, 더 세밀한 제어나 커스텀 로직이 필요한 경우 Java Configuration 클래스를 사용하는 것이 좋습니다. 여기서는 KafkaTemplate을 생성하는 데 필요한 ProducerFactory 빈을 직접 등록해 보겠습니다.

config 패키지를 생성하고 KafkaProducerConfig.java 클래스를 작성합니다.


package com.example.kafkamicorservice.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // application.yml에 정의한 다른 프로듀서 속성들도 필요에 따라 추가할 수 있습니다.
        // configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

이 설정 클래스는 application.ymlbootstrap-servers 값을 읽어와 프로듀서 설정을 구성합니다. 이렇게 Java 코드로 설정을 관리하면 컴파일 시점에 타입 체크가 가능하고, 프로그래밍적으로 설정을 동적으로 변경하는 등의 유연성을 확보할 수 있습니다.

3.2. DTO와 JSON을 활용한 구조화된 데이터 전송

실제 애플리케이션에서는 단순 문자열보다 구조화된 데이터를 주고받는 경우가 훨씬 많습니다. 이때 DTO(Data Transfer Object)를 정의하고, 이를 JSON 형식으로 직렬화하여 카프카에 전송하는 것이 일반적입니다.

먼저, 전송할 데이터를 담을 OrderEvent.java DTO 클래스를 생성합니다.


package com.example.kafkamicorservice.event;

// Lombok 어노테이션을 사용하면 Getter, Setter, 생성자 등을 자동으로 생성해줍니다.
// import lombok.AllArgsConstructor;
// import lombok.Data;
// import lombok.NoArgsConstructor;
//
// @Data
// @AllArgsConstructor
// @NoArgsConstructor
public class OrderEvent {
    private String orderId;
    private String product;
    private int quantity;
    private long timestamp;

    // Lombok을 사용하지 않을 경우
    public OrderEvent() {}

    public OrderEvent(String orderId, String product, int quantity, long timestamp) {
        this.orderId = orderId;
        this.product = product;
        this.quantity = quantity;
        this.timestamp = timestamp;
    }
    // Getters and Setters ...
}

이제 OrderEvent 객체를 JSON으로 직렬화하기 위해 스프링 카프카가 제공하는 JsonSerializer를 사용하도록 설정을 변경해야 합니다. 먼저, KafkaProducerConfig.java를 수정합니다.


// KafkaProducerConfig.java

import org.springframework.kafka.support.serializer.JsonSerializer;
// ... (다른 import문 생략)

@Configuration
public class KafkaProducerConfig {
    
    // ... (bootstrapServers 필드 생략)

    // DTO를 전송하기 위한 ProducerFactory 및 KafkaTemplate 빈을 추가
    @Bean
    public ProducerFactory<String, Object> producerFactoryWithJson() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // Value Serializer를 JsonSerializer로 변경
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplateWithJson() {
        return new KafkaTemplate<>(producerFactoryWithJson());
    }

    // 기존 문자열 전송을 위한 KafkaTemplate은 그대로 유지하거나 필요 없다면 삭제
    // @Bean
    // public ProducerFactory producerFactory() { ... }
    // @Bean
    // public KafkaTemplate kafkaTemplate() { ... }
}

이제 이 KafkaTemplate<String, Object>를 사용하여 OrderEvent 객체를 전송하는 서비스를 만들어 보겠습니다. service 패키지를 만들고 EventProducerService.java를 작성합니다.


package com.example.kafkamicorservice.service;

import com.example.kafkamicorservice.event.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class EventProducerService {
    private static final Logger logger = LoggerFactory.getLogger(EventProducerService.class);
    private static final String TOPIC_NAME = "order-events";

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public EventProducerService(@Qualifier("kafkaTemplateWithJson") KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrderEvent(OrderEvent event) {
        logger.info("Producing message: {}", event);
        kafkaTemplate.send(TOPIC_NAME, event.getOrderId(), event);
    }
}

위 코드에서 kafkaTemplate.send() 메소드의 두 번째 인자로 event.getOrderId()를 전달한 것에 주목하세요. 이것이 바로 메시지 키(Key)입니다. 키를 지정하면, 카프카는 키의 해시값을 계산하여 특정 파티션에 메시지를 일관되게 보냅니다. 즉, 동일한 주문 ID(키)를 가진 이벤트들은 항상 같은 파티션으로 들어가게 되어, 해당 주문에 대한 이벤트 처리 순서를 보장하는 데 매우 중요한 역할을 합니다.

3.3. 비동기 전송과 콜백을 이용한 결과 처리

KafkaTemplate.send() 메소드는 기본적으로 비동기로 동작합니다. 즉, 메소드는 메시지를 내부 버퍼에 추가하고 즉시 리턴하며, 실제 전송은 백그라운드 스레드에서 이루어집니다. 이는 높은 처리량을 보장하지만, 전송이 성공했는지 실패했는지 바로 알 수는 없습니다.

전송 결과를 확인하고 싶다면 send 메소드가 반환하는 CompletableFuture(최신 스프링 카프카 버전) 또는 ListenableFuture(이전 버전)를 사용하여 콜백을 등록할 수 있습니다.

EventProducerService.java를 다음과 같이 수정해 보겠습니다.


package com.example.kafkamicorservice.service;

import com.example.kafkamicorservice.event.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class EventProducerService {
    private static final Logger logger = LoggerFactory.getLogger(EventProducerService.class);
    private static final String TOPIC_NAME = "order-events";

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public EventProducerService(@Qualifier("kafkaTemplateWithJson") KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrderEvent(OrderEvent event) {
        CompletableFuture<SendResult<String, Object>> future = 
                kafkaTemplate.send(TOPIC_NAME, event.getOrderId(), event);

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                // 전송 성공
                logger.info("Sent message=[{}] with offset=[{}] on partition=[{}]",
                        event,
                        result.getRecordMetadata().offset(),
                        result.getRecordMetadata().partition());
            } else {
                // 전송 실패
                logger.error("Unable to send message=[{}] due to : {}", event, ex.getMessage());
            }
        });
    }
}

이제 메시지 전송 후 성공 시에는 해당 메시지가 저장된 파티션과 오프셋 정보를 로그로 남기고, 실패 시에는 에러 로그를 남겨서 전송 상태를 명확하게 추적할 수 있습니다. 이는 장애 상황을 분석하고 디버깅하는 데 매우 중요합니다. 예를 들어, 브로커에 연결할 수 없거나 메시지 크기가 너무 커서 전송에 실패하는 경우, 이 콜백을 통해 즉시 문제를 인지하고 재시도 로직이나 관리자 알림 등의 후속 조치를 취할 수 있습니다.

4장: 컨슈머 구현 - 신뢰성 있는 메시지 소비

프로듀서가 발행한 이벤트를 받아 실질적인 비즈니스 로직을 수행하는 것이 컨슈머의 역할입니다. 컨슈머를 구현할 때는 메시지를 안정적으로 처리하고, 장애 발생 시 메시지 유실 없이 복구할 수 있도록 설계하는 것이 무엇보다 중요합니다. 이번 장에서는 @KafkaListener 어노테이션을 중심으로, JSON 데이터 처리, 컨슈머 그룹을 통한 확장, 그리고 에러 핸들링 전략에 대해 심도 있게 다룹니다.

4.1. Java Configuration을 통한 컨슈머 설정

프로듀서와 마찬가지로 컨슈머도 Java Configuration을 통해 더 상세하게 설정할 수 있습니다. 특히 @KafkaListener가 사용할 ConcurrentKafkaListenerContainerFactory 빈을 정의하여 컨슈머의 동작 방식을 제어합니다.

config 패키지에 KafkaConsumerConfig.java 클래스를 작성합니다.


package com.example.kafkamicorservice.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    
    // JSON 메시지를 수신하기 위한 설정은 다음 섹션에서 추가됩니다.

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 수동 커밋을 위한 AckMode 설정
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

여기서 가장 중요한 설정은 setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)입니다. 이는 application.ymlenable-auto-commit: false 설정과 함께, 컨슈머가 메시지를 언제 "처리 완료"로 표시할지(오프셋 커밋)를 애플리케이션 코드에서 직접 제어하겠다는 의미입니다. 메시지 처리가 완전히 성공했을 때만 커밋함으로써, 처리 도중 에러가 발생하더라도 메시지가 유실되는 것을 방지할 수 있습니다.

4.2. JSON 메시지 역직렬화 및 DTO 변환

프로듀서가 OrderEvent DTO를 JSON으로 보내므로, 컨슈머는 이 JSON 문자열을 다시 OrderEvent 객체로 변환(역직렬화)해야 합니다. 이를 위해 스프링 카프카의 JsonDeserializer를 사용합니다.

KafkaConsumerConfig.java에 JSON 역직렬화를 위한 팩토리와 리스너 컨테이너 팩토리를 추가합니다.


// KafkaConsumerConfig.java

import com.example.kafkamicorservice.event.OrderEvent;
import org.springframework.kafka.support.serializer.JsonDeserializer;
// ... (다른 import문 생략)

@Configuration
public class KafkaConsumerConfig {
    
    // ... (기존 필드 및 빈 설정 생략)

    @Bean
    public ConsumerFactory<String, OrderEvent> orderEventConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // JsonDeserializer 설정
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        // 신뢰할 수 없는 패키지로부터의 역직렬화를 방지하기 위한 설정
        // '*'는 모든 패키지를 허용. 보안상 특정 패키지만 명시하는 것이 좋음.
        deserializer.addTrustedPackages("com.example.kafkamicorservice.event");
        
        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> orderEventKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderEventConsumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

JsonDeserializer를 설정할 때 addTrustedPackages를 통해 역직렬화할 DTO 클래스가 위치한 패키지를 명시해주는 것이 중요합니다. 이는 보안 취약점을 방지하기 위한 조치입니다.

이제 service 패키지에 EventConsumerService.java를 생성하고 @KafkaListener를 사용하여 메시지를 수신합니다.


package com.example.kafkamicorservice.service;

import com.example.kafkamicorservice.event.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class EventConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumerService.class);

    @KafkaListener(topics = "order-events",
                   groupId = "${spring.kafka.consumer.group-id}",
                   containerFactory = "orderEventKafkaListenerContainerFactory")
    public void consumeOrderEvent(@Payload OrderEvent event,
                                  @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                                  @Header(KafkaHeaders.OFFSET) long offset,
                                  Acknowledgment acknowledgment) {
        
        logger.info("Received message: partition={}, offset={}, event={}", partition, offset, event.getOrderId());
        
        try {
            // 여기에 실제 비즈니스 로직을 구현합니다.
            // 예: 데이터베이스에 저장, 외부 API 호출, 알림 발송 등
            logger.info("Processing order for product: {}", event.getProduct());
            Thread.sleep(1000); // 로직 처리 시간 시뮬레이션

            // 처리가 성공적으로 완료되면 오프셋을 커밋
            acknowledgment.acknowledge();
            logger.info("Successfully processed and acknowledged the message.");

        } catch (Exception e) {
            logger.error("Error processing message: {}", e.getMessage());
            // 에러 발생 시 커밋하지 않음. 
            // 이 메시지는 나중에 다시 처리 시도됨 (에러 핸들링 전략에 따라 동작이 달라짐)
        }
    }
}

@KafkaListener 어노테이션의 containerFactory 속성을 통해 방금 설정한 orderEventKafkaListenerContainerFactory 빈을 사용하도록 지정했습니다. 리스너 메소드의 파라미터로 @Payload를 사용해 메시지 본문을 OrderEvent 객체로 바로 받을 수 있으며, Acknowledgment 객체를 받아 acknowledge()를 호출함으로써 수동으로 오프셋을 커밋합니다. 로직 수행 중 예외가 발생하면 acknowledge()가 호출되지 않으므로, 이 메시지는 다음 폴링(polling) 시에 다시 전달되어 재처리를 시도하게 됩니다.

4.3. 컨슈머 그룹과 파티션 리밸런싱을 통한 확장성 확보

카프카의 진정한 힘은 컨슈머 그룹을 통한 수평적 확장에서 나옵니다. 만약 `order-events` 토픽이 6개의 파티션으로 구성되어 있다고 가정해 보겠습니다.

  • 컨슈머 1개 실행: 해당 컨슈머가 6개 파티션의 모든 메시지를 처리합니다.
  • 컨슈머 2개 실행 (동일 그룹 ID): 카프카는 리밸런싱을 통해 각 컨슈머에게 3개의 파티션을 할당합니다. 전체 처리량이 이론적으로 2배가 됩니다.
  • 컨슈머 6개 실행 (동일 그룹 ID): 각 컨슈머가 1개의 파티션을 전담하여 처리합니다. 처리량이 최대로 확장됩니다.
  • 컨슈머 7개 실행 (동일 그룹 ID): 6개의 컨슈머가 각각 파티션을 할당받고, 1개의 컨슈머는 아무 일도 하지 않고 대기(idle)합니다. 파티션 수보다 많은 컨슈머는 낭비입니다.

스프링 부트 애플리케이션을 여러 인스턴스로 실행하기만 하면, 동일한 group-id를 가진 컨슈머들이 자동으로 파티션을 나누어 처리하게 되므로, 메시지 처리량이 많아질 경우 별도의 코드 수정 없이 서버 인스턴스를 늘리는 것만으로 손쉽게 시스템을 확장할 수 있습니다.

4.4. 견고한 시스템을 위한 컨슈머 에러 핸들링 전략

메시지 처리 중 예외가 발생했을 때, 무한정 재시도하는 것은 시스템에 큰 부담을 줄 수 있습니다. 예를 들어, 메시지 형식이 잘못되어 역직렬화에 계속 실패하는 경우, 해당 메시지는 계속해서 재처리되며 다른 정상적인 메시지들의 처리를 막는 '블로킹(blocking)' 현상을 유발합니다.

이를 해결하기 위해 스프링 카프카는 다양한 에러 핸들링 전략을 제공합니다. 그중 가장 널리 사용되는 것은 Dead Letter Topic (DLT) 패턴입니다.

DLT는 특정 횟수 이상 재시도에 실패한 메시지를 별도의 '죽은 메시지' 토픽으로 보내고, 원래 토픽에서는 해당 메시지를 커밋하여 다음 메시지 처리를 계속 진행하는 방식입니다. 이를 통해 문제의 메시지를 격리하고 시스템 전체의 흐름을 유지할 수 있습니다.

KafkaConsumerConfig.java에 DLT를 위한 에러 핸들러를 설정해 보겠습니다.


// KafkaConsumerConfig.java
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

// ...

@Configuration
public class KafkaConsumerConfig {

    // ... (기존 설정 생략)

    // DLT로 메시지를 보내기 위해 KafkaTemplate이 필요합니다.
    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) {
        // 재시도 실패 시 DeadLetterPublishingRecoverer를 호출합니다.
        // dlt-order-events 토픽으로 메시지를 보냅니다.
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (rec, ex) -> new TopicPartition("dlt-order-events", rec.partition()));

        // 1초 간격으로 최대 3번 재시도합니다.
        FixedBackOff backOff = new FixedBackOff(1000L, 2L);
        return new DefaultErrorHandler(recoverer, backOff);
    }
    
    // orderEventKafkaListenerContainerFactory 빈 설정을 수정합니다.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> orderEventKafkaListenerContainerFactory(
            DefaultErrorHandler errorHandler) { // errorHandler를 주입받습니다.
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderEventConsumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 생성한 에러 핸들러를 컨테이너 팩토리에 설정합니다.
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

이제 컨슈머에서 처리할 수 없는 예외(예: NullPointerException)가 발생하면, 애플리케이션은 1초 간격으로 두 번 더 재시도합니다. 세 번의 시도 모두 실패하면 해당 메시지는 dlt-order-events라는 이름의 토픽으로 전송되고, 원래 메시지는 커밋 처리됩니다. 개발자는 나중에 DLT에 쌓인 메시지들을 분석하여 원인을 파악하고, 버그를 수정한 뒤 수동으로 재처리하는 등의 조치를 취할 수 있습니다. 이는 시스템의 안정성과 데이터 무결성을 보장하는 핵심적인 장치입니다.

5장: 실전 예제 - 이벤트 기반 주문/알림 시스템 구축

지금까지 배운 개념들을 종합하여 간단하지만 현실적인 마이크로서비스 아키텍처 예제를 구현해 보겠습니다. 사용자가 주문을 생성하면 '주문 서비스'가 이를 처리하고, 카프카를 통해 '주문 생성됨' 이벤트를 발행합니다. 그러면 '알림 서비스'가 이 이벤트를 구독하여 사용자에게 알림을 보내는 시나리오입니다.

실제로는 두 서비스를 별개의 스프링 부트 프로젝트로 구성해야 하지만, 여기서는 설명을 위해 하나의 프로젝트 내에 두 서비스의 역할을 모두 구현하겠습니다.

5.1. 시나리오 정의: 주문 서비스와 알림 서비스

  1. 사용자: HTTP POST 요청으로 새로운 주문을 생성합니다.
  2. 주문 서비스 (Order Service - Producer 역할):
    • REST API 엔드포인트를 통해 주문 요청을 받습니다.
    • 주문 데이터를 (가상) 데이터베이스에 저장합니다.
    • 주문이 성공적으로 생성되었음을 알리는 OrderEvent를 생성하여 order-events 카프카 토픽에 발행합니다.
  3. 알림 서비스 (Notification Service - Consumer 역할):
    • order-events 토픽을 구독하고 있습니다.
    • 새로운 OrderEvent 메시지를 수신하면, 해당 주문 정보를 바탕으로 사용자에게 이메일이나 SMS를 발송하는 로직을 수행합니다(여기서는 로그 출력으로 대체).

이 구조에서 주문 서비스와 알림 서비스는 서로를 전혀 알지 못합니다. 오직 카프카 토픽을 통해서만 상호작용하므로, 한쪽 서비스가 장애가 나거나 배포 중이더라도 다른 서비스는 영향을 받지 않는 느슨한 결합(Loose Coupling)이 달성됩니다.

5.2. 주문 서비스(Producer) 구현

먼저 주문 요청을 받을 REST 컨트롤러를 생성합니다. controller 패키지를 만들고 OrderController.java를 작성합니다.


package com.example.kafkamicorservice.controller;

import com.example.kafkamicorservice.event.OrderEvent;
import com.example.kafkamicorservice.service.EventProducerService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("/orders")
public class OrderController {

    private final EventProducerService producerService;

    public OrderController(EventProducerService producerService) {
        this.producerService = producerService;
    }

    // 주문 요청을 위한 DTO
    public static class CreateOrderRequest {
        private String product;
        private int quantity;
        
        // Getters and Setters
        public String getProduct() { return product; }
        public void setProduct(String product) { this.product = product; }
        public int getQuantity() { return quantity; }
        public void setQuantity(int quantity) { this.quantity = quantity; }
    }

    @PostMapping
    public ResponseEntity<String> createOrder(@RequestBody CreateOrderRequest request) {
        // 1. (가상) 데이터베이스에 주문 저장 로직
        String orderId = UUID.randomUUID().toString();
        System.out.println("Order saved to DB with ID: " + orderId);

        // 2. 카프카에 이벤트 발행
        OrderEvent event = new OrderEvent(
                orderId,
                request.getProduct(),
                request.getQuantity(),
                System.currentTimeMillis()
        );
        producerService.sendOrderEvent(event);

        return ResponseEntity.ok("Order created successfully with ID: " + orderId);
    }
}

이 컨트롤러는 /orders 경로로 POST 요청을 받습니다. 요청 본문(body)에 포함된 상품명과 수량을 바탕으로 고유한 주문 ID를 생성하고, 이를 포함한 OrderEvent를 만들어 EventProducerService를 통해 카프카로 전송합니다. EventProducerService는 3장에서 이미 구현했습니다.

5.3. 알림 서비스(Consumer) 구현

알림 서비스의 역할은 4장에서 구현한 EventConsumerService가 그대로 수행합니다. `order-events` 토픽을 리스닝하다가 OrderEvent가 들어오면 로그를 출력하는 로직이 이미 작성되어 있습니다. 실제 시스템이라면 로그 출력 대신 이메일 발송 라이브러리나 SMS 게이트웨이 연동 코드가 위치하게 될 것입니다.

다시 한번 EventConsumerService.java의 코드를 살펴보겠습니다.


package com.example.kafkamicorservice.service;

// ... (imports)

@Service
public class EventConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumerService.class);

    @KafkaListener(topics = "order-events",
                   groupId = "${spring.kafka.consumer.group-id}",
                   containerFactory = "orderEventKafkaListenerContainerFactory")
    public void consumeOrderEvent(@Payload OrderEvent event,
                                  // ... (other parameters)
                                  Acknowledgment acknowledgment) {
        
        logger.info("Received Order Event: orderId={}", event.getOrderId());
        
        try {
            // --- 알림 서비스의 핵심 비즈니스 로직 ---
            logger.info("Sending notification for order [{}]: Product '{}', Quantity '{}'", 
                event.getOrderId(), event.getProduct(), event.getQuantity());

            // 여기에 실제 이메일 발송 또는 SMS 전송 로직이 들어갑니다.
            // sendEmail(toUser, "Order Confirmation", createEmailBody(event));

            // 성공적으로 알림을 보냈다고 가정
            acknowledgment.acknowledge();
            logger.info("Notification sent and message acknowledged.");

        } catch (Exception e) {
            logger.error("Failed to send notification for order [{}]. Error: {}", event.getOrderId(), e.getMessage());
            // 에러 발생 시 커밋하지 않음 (재시도 또는 DLT로 이동)
        }
    }
}

5.4. End-to-End 테스트 및 동작 확인

이제 모든 준비가 끝났습니다. 애플리케이션을 실행하고 전체 흐름이 정상적으로 동작하는지 확인해 보겠습니다.

  1. 카프카 클러스터 실행: docker-compose up -d 명령으로 카프카와 주키퍼가 실행 중인지 확인합니다.
  2. 스프링 부트 애플리케이션 실행: IDE에서 메인 애플리케이션 클래스를 실행합니다.
  3. 주문 생성 요청: cURL이나 Postman과 같은 API 테스트 도구를 사용하여 주문 서비스에 요청을 보냅니다.

    cURL 명령어 예시:

    
        curl -X POST http://localhost:8080/orders \
        -H "Content-Type: application/json" \
        -d '{
            "product": "Kafka in Action Book",
            "quantity": 2
        }'
        
  4. 콘솔 로그 확인:

    요청을 보내면 애플리케이션의 콘솔 로그에 다음과 같은 순서로 로그가 출력되는 것을 확인할 수 있습니다.

    (1) 주문 서비스(OrderController) 로그:

    Order saved to DB with ID: 123e4567-e89b-12d3-a456-426614174000

    (2) 프로듀서(EventProducerService) 로그:

    Sent message=[OrderEvent(orderId=123...)] with offset=[0] on partition=[2]

    (3) 컨슈머(EventConsumerService) 로그:

    Received Order Event: orderId=123e4567-e89b-12d3-a456-426614174000
    Sending notification for order [123e...]: Product 'Kafka in Action Book', Quantity '2'
    Notification sent and message acknowledged.

이 로그를 통해 사용자의 HTTP 요청이 주문 서비스에 의해 처리되고, 카프카를 통해 이벤트가 비동기적으로 알림 서비스에 전달되어 독립적인 로직이 수행되었음을 명확하게 확인할 수 있습니다. 이것이 바로 이벤트 기반 마이크로서비스 아키텍처의 핵심 동작 방식입니다.

6장: 더 나아가기 - 고급 주제 및 운영 고려사항

지금까지 구현한 내용만으로도 기본적인 마이크로서비스를 구축할 수 있지만, 실제 운영 환경에서는 데이터 정합성, 성능, 안정성 등을 위해 더 많은 요소들을 고려해야 합니다. 이 장에서는 시스템을 한 단계 더 발전시킬 수 있는 몇 가지 고급 주제를 소개합니다.

6.1. 멱등성 프로듀서 (Idempotent Producer)

네트워크 문제 등으로 인해 프로듀서가 메시지를 보낸 후 브로커로부터 정상적인 응답(ACK)을 받지 못하는 경우가 발생할 수 있습니다. 이때 프로듀서는 메시지 전송이 실패했다고 판단하고 재시도를 하게 되는데, 만약 첫 번째 메시지가 실제로는 성공적으로 브로커에 저장되었다면 재시도로 인해 동일한 메시지가 중복으로 저장될 수 있습니다.

멱등성 프로듀서는 이러한 메시지 중복을 방지하기 위한 기능입니다. application.yml에서 enable.idempotence: true로 설정하면 (이미 2장에서 설정했습니다), 프로듀서는 각 메시지에 고유한 ID(PID와 시퀀스 번호)를 부여하여 브로커로 전송합니다. 브로커는 이 ID를 확인하여 이전에 이미 처리한 메시지라면 중복으로 저장하지 않고 성공 응답만 반환합니다. 이를 통해 'At-Least-Once' 전송 보장을 'Exactly-Once'에 가깝게 (정확히는 스트림 내 중복 없는 At-Least-Once) 만들어 줍니다.

6.2. 카프카 트랜잭션과 Exactly-Once 시맨틱스

주문 서비스 예제에서 "데이터베이스에 주문을 저장"하는 작업과 "카프카에 이벤트를 발행"하는 작업은 두 개의 분리된 단계입니다. 만약 DB 저장은 성공했는데, 카프카 발행 직전에 애플리케이션이 다운된다면 어떻게 될까요? 주문은 생성되었지만 이벤트는 발생하지 않아 알림이 가지 않는 데이터 불일치 상태가 됩니다.

이 문제를 해결하기 위해 카프카 트랜잭션을 사용할 수 있습니다. 스프링에서는 @Transactional 어노테이션을 사용하여 데이터베이스 트랜잭션과 카프카 메시지 발행을 하나의 원자적인 작업으로 묶을 수 있습니다.


// OrderService.java (가상 코드)
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {
    // ... (DB Repository, Kafka Producer 주입)

    @Transactional("kafkaTransactionManager") // Kafka 트랜잭션 매니저를 지정
    public void createOrderAndPublishEvent(OrderData data) {
        // 1. 데이터베이스에 주문 저장
        orderRepository.save(data);

        // 2. 카프카에 이벤트 발행
        producerService.sendOrderEvent(new OrderEvent(...));

        // 만약 이 지점에서 예외가 발생하면?
        // DB 작업은 롤백되고, 카프카 메시지 발행도 취소(abort)됩니다.
    }
}

이 방식을 사용하려면 KafkaTransactionManager 빈을 설정해야 합니다. 트랜잭션이 성공적으로 커밋되면 DB 변경 사항이 적용되고 카프카 메시지도 발행됩니다. 만약 어느 한쪽이라도 실패하면 모든 작업이 롤백되어 데이터 정합성을 보장할 수 있습니다. 이는 'Exactly-Once Semantics'를 달성하기 위한 핵심 기술입니다.

6.3. 스키마 레지스트리를 이용한 데이터 계약 관리

JSON은 사용하기 편리하지만, 데이터의 구조(스키마)를 강제하지 않는다는 단점이 있습니다. 만약 주문 서비스에서 OrderEvent에 새로운 필드를 추가했는데, 알림 서비스는 아직 업데이트되지 않았다면 역직렬화 오류나 NullPointerException이 발생할 수 있습니다. 여러 팀이 독립적으로 서비스를 개발하는 대규모 환경에서는 이러한 '데이터 계약' 파괴가 빈번하게 발생합니다.

스키마 레지스트리(Schema Registry)Avro와 같은 스키마 정의 포맷을 사용하면 이 문제를 해결할 수 있습니다.

  • Avro: 데이터를 바이너리 형태로 직렬화하며, 데이터 자체에 스키마 정보를 포함하지 않고 스키마는 별도로 관리합니다. JSON보다 훨씬 효율적인 직렬화/역직렬화 성능과 작은 메시지 크기를 가집니다.
  • 스키마 레지스트리: Avro 스키마를 중앙에서 관리하고 버저닝하는 서버입니다. 프로듀서는 메시지를 보낼 때 스키마 레지스트리에 스키마를 등록하고, 컨슈머는 메시지를 받을 때 스키마 레지스트리에서 해당 스키마를 가져와 역직렬화에 사용합니다.

스키마 레지스트리는 스키마 변경 시 하위 호환성(Backward Compatibility)을 검사하는 규칙을 강제할 수 있어, 한 서비스의 변경이 다른 서비스에 미치는 영향을 사전에 차단하고 안정적인 데이터 파이프라인을 유지하는 데 결정적인 역할을 합니다.

6.4. 모니터링: 컨슈머 랙(Lag)과 시스템 상태 추적

운영 환경에서는 우리 시스템이 정상적으로 동작하고 있는지 지속적으로 확인해야 합니다. 카프카 기반 시스템에서 가장 중요한 모니터링 지표 중 하나는 컨슈머 랙(Consumer Lag)입니다.

컨슈머 랙이란, 특정 파티션에 마지막으로 발행된 메시지의 오프셋과 특정 컨슈머 그룹이 마지막으로 커밋한 오프셋의 차이를 의미합니다. 즉, "컨슈머가 처리해야 할 메시지가 얼마나 밀려있는가"를 나타내는 지표입니다.

랙이 지속적으로 증가한다면, 이는 메시지 발행 속도를 컨슈머의 처리 속도가 따라가지 못하고 있다는 신호입니다. 이때는 컨슈머 인스턴스를 추가하여 처리량을 늘리거나, 컨슈머의 비즈니스 로직을 최적화하는 등의 조치가 필요합니다. 스프링 부트 액추에이터(Actuator), 프로메테우스(Prometheus), 그라파나(Grafana)와 같은 도구들을 연동하여 컨슈머 랙을 비롯한 각종 카프카 지표(메시지 처리량, 브로커 상태 등)를 시각화하고 알림을 설정하는 것이 일반적입니다.

결론: 진정한 분산 시스템을 향한 첫걸음

이 글을 통해 우리는 스프링 부트와 아파치 카프카라는 강력한 두 도구를 사용하여 현대적인 이벤트 기반 마이크로서비스를 구축하는 전 과정을 살펴보았습니다. 단순한 개념 소개를 넘어, 실용적인 개발 환경 구축부터 상세한 설정, 구조화된 데이터 처리, 비동기 콜백, 신뢰성 있는 컨슈머 설계, 에러 핸들링, 그리고 실제 시나리오를 바탕으로 한 통합 예제까지 깊이 있게 다루었습니다.

스프링 부트와 카프카의 조합은 단순히 두 기술을 함께 사용하는 것을 넘어, 시스템의 회복탄력성, 확장성, 유연성을 극대화하는 시너지를 창출합니다. 서비스 간의 강한 결합을 끊어내고 비동기적인 이벤트 스트림을 통해 상호작용하게 함으로써, 우리는 장애에 더 강하고 변화에 더 빠르게 대응할 수 있는 아키텍처를 만들 수 있습니다.

물론 오늘 다룬 내용이 전부는 아닙니다. 카프카 스트림즈(Kafka Streams)를 이용한 실시간 스트림 처리, 스키마 레지스트리를 통한 엄격한 데이터 관리, 쿠버네티스(Kubernetes) 환경에서의 운영 등 더 넓고 깊은 세계가 여러분을 기다리고 있습니다. 하지만 본 글에서 다진 탄탄한 기본기는 앞으로 마주할 더 복잡한 도전들을 해결해 나가는 훌륭한 초석이 될 것입니다.

이제 여러분은 이론을 넘어, 직접 코드를 작성하고 시스템을 구축하며 진정한 분산 시스템 개발자로서의 여정을 시작할 준비가 되었습니다. 이벤트의 흐름 속에서 더 나은 소프트웨어를 만들어 나가시길 바랍니다.


0 개의 댓글:

Post a Comment