Showing posts with label kafka. Show all posts
Showing posts with label kafka. Show all posts

Wednesday, June 18, 2025

Kafka vs RabbitMQ: 당신의 시스템에 적합한 메시지 브로커는?

현대적인 소프트웨어 아키텍처, 특히 마이크로서비스(MSA) 환경에서 비동기 통신은 시스템의 확장성과 안정성을 보장하는 핵심 요소입니다. 이러한 비동기 통신을 구현하기 위해 우리는 '메시지 브로커(Message Broker)'를 사용합니다. 수많은 메시지 브로커 솔루션 중에서 Apache Kafka와 RabbitMQ는 단연코 가장 유명하고 널리 사용되는 두 거인입니다.

많은 개발자와 아키텍트가 프로젝트 초기에 "카프카를 써야 할까, 아니면 래빗엠큐를 써야 할까?"라는 질문에 부딪힙니다. 이 질문에 대한 답은 간단하지 않습니다. 두 솔루션 모두 훌륭하지만, 서로 다른 철학과 아키텍처를 기반으로 설계되었기 때문에 특정 사용 사례에 더 적합한 쪽이 있습니다. 이 글에서는 Kafka와 RabbitMQ의 핵심적인 차이점을 심도 있게 분석하고, 어떤 시나리오에서 각각을 선택해야 하는지에 대한 명확한 가이드를 제공하고자 합니다.

RabbitMQ란 무엇인가? 전통적인 메시지 브로커의 강자

RabbitMQ는 AMQP(Advanced Message Queuing Protocol)라는 표준 프로토콜을 구현한 가장 대표적인 오픈소스 메시지 브로커입니다. 2007년에 처음 등장하여 오랜 시간 동안 안정성과 신뢰성을 인정받아 왔습니다. RabbitMQ의 핵심 철학은 '스마트 브로커 / 멍청한 컨슈머(Smart Broker / Dumb Consumer)' 모델에 기반합니다.

여기서 '스마트 브로커'란, 메시지를 어디로 어떻게 전달할지에 대한 복잡한 라우팅 로직을 브로커 자체가 책임진다는 의미입니다. 생산자(Producer)는 메시지를 Exchange라는 곳에 보내기만 하면, Exchange가 설정된 규칙(라우팅 키, 바인딩)에 따라 적절한 큐(Queue)에 메시지를 분배합니다. 그러면 소비자(Consumer)는 해당 큐에서 메시지를 가져와 처리합니다.

RabbitMQ의 주요 특징

  • 유연한 라우팅: Direct, Topic, Fanout, Headers 등 다양한 Exchange 타입을 제공하여 매우 복잡하고 정교한 메시지 라우팅 시나리오를 구현할 수 있습니다. 예를 들어, 특정 패턴의 라우팅 키를 가진 메시지만 특정 큐로 보내는 등의 작업이 가능합니다.
  • 메시지 확인(Acknowledgement): 컨슈머가 메시지를 성공적으로 처리했음을 브로커에게 알리는 기능을 기본으로 지원합니다. 이를 통해 메시지 유실을 방지하고 작업의 신뢰성을 보장할 수 있습니다.
  • 다양한 프로토콜 지원: 핵심인 AMQP 0-9-1 외에도 STOMP, MQTT 등을 플러그인 형태로 지원하여 다양한 클라이언트 환경과 통합이 용이합니다.
  • 작업 큐(Task Queues): 여러 컨슈머에게 작업을 분산하여 처리하는 '일 처리' 시나리오에 매우 강력합니다. 예를 들어, 이미지 리사이징, PDF 생성 등 시간이 오래 걸리는 작업을 백그라운드에서 처리하는 데 이상적입니다.

RabbitMQ 아키텍처의 핵심

RabbitMQ의 흐름은 Producer → Exchange → Binding → Queue → Consumer 순서로 이루어집니다.

  1. Producer: 메시지를 생성하고 Exchange에 발행(Publish)합니다.
  2. Exchange: Producer로부터 메시지를 받아 어떤 Queue로 보낼지 결정하는 라우터 역할을 합니다.
  3. Queue: 메시지가 Consumer에게 전달되기 전에 대기하는 저장소입니다.
  4. Consumer: Queue에 연결하여 메시지를 구독(Subscribe)하고 처리합니다.

이 구조 덕분에 RabbitMQ는 메시지 단위의 정교한 제어가 필요한 전통적인 메시징 시스템에 매우 적합합니다.

Apache Kafka란 무엇인가? 분산 이벤트 스트리밍 플랫폼

Apache Kafka는 LinkedIn에서 대규모 실시간 데이터 피드를 처리하기 위해 2011년에 개발한 분산 이벤트 스트리밍 플랫폼입니다. RabbitMQ가 '메시지 브로커'에 가깝다면, Kafka는 '분산 커밋 로그(Distributed Commit Log)'에 더 가깝습니다. Kafka의 철학은 RabbitMQ와 정반대인 '멍청한 브로커 / 똑똑한 컨슈머(Dumb Broker / Smart Consumer)' 모델입니다.

여기서 '멍청한 브로커'란, 브로커가 복잡한 라우팅 로직을 수행하지 않고 단순히 데이터를 받아 순서대로 로그에 저장하는 역할만 한다는 의미입니다. 대신 '똑똑한 컨슈머'가 자신이 어디까지 데이터를 읽었는지(오프셋)를 스스로 추적하고 관리합니다. 이 단순한 구조가 Kafka의 경이로운 처리량(Throughput)과 확장성의 비결입니다.

Kafka의 주요 특징

  • 높은 처리량: 디스크에 순차적으로 I/O를 수행하는 방식으로 설계되어 초당 수백만 건의 메시지를 처리할 수 있습니다. 대규모 로그 수집, IoT 데이터 스트리밍 등 대용량 데이터 처리에 독보적인 성능을 보입니다.
  • 데이터 지속성 및 재생(Replay): 메시지는 컨슈머가 읽어 가도 바로 삭제되지 않고, 설정된 보관 주기(Retention Period) 동안 디스크에 안전하게 보관됩니다. 덕분에 여러 다른 컨슈머 그룹이 각자의 필요에 따라 동일한 데이터를 여러 번 읽거나, 장애 발생 시 특정 시점부터 데이터를 다시 처리(Replay)하는 것이 가능합니다.
  • 확장성과 내결함성: 처음부터 분산 시스템을 염두에 두고 설계되었습니다. 토픽(Topic)을 여러 파티션(Partition)으로 나누고, 이를 여러 브로커 서버에 분산하여 저장함으로써 수평적 확장이 용이하고 일부 서버에 장애가 발생해도 서비스 중단 없이 운영이 가능합니다.
  • 스트림 처리: Kafka Streams 라이브러리나 Apache Flink, Spark Streaming 같은 외부 프레임워크와 결합하여 실시간 데이터 스트림을 변환하고 분석하는 강력한 스트림 처리 애플리케이션을 구축할 수 있습니다.

Kafka 아키텍처의 핵심

Kafka의 흐름은 Producer → Topic (Partition) → Consumer (Consumer Group) 순서로 이루어집니다.

  1. Producer: 이벤트를 생성하여 특정 Topic에 발행합니다.
  2. Topic: 이벤트가 저장되는 카테고리입니다. 각 토픽은 하나 이상의 파티션으로 나뉘어 분산 저장됩니다. 파티션 내에서는 데이터의 순서가 보장됩니다.
  3. Consumer Group: 하나 이상의 Consumer로 구성된 그룹입니다. 하나의 토픽을 구독할 때, 각 파티션은 컨슈머 그룹 내의 단 하나의 컨슈머에게만 할당됩니다. 이를 통해 병렬 처리가 가능해집니다. 컨슈머는 자신이 마지막으로 읽은 메시지의 위치(오프셋)를 스스로 기억합니다.

핵심 차이점 전격 비교: Kafka vs RabbitMQ

두 시스템의 철학과 아키텍처를 이해했다면, 이제 실질적인 차이점을 비교해 보겠습니다.

1. 아키텍처 모델: 스마트 브로커 vs 멍청한 브로커

  • RabbitMQ: 브로커가 메시지 라우팅, 전달 상태 추적 등 많은 일을 담당합니다(Smart Broker). 이 덕분에 컨슈머는 비교적 단순하게 구현할 수 있습니다.
  • Kafka: 브로커는 데이터를 파티션에 순서대로 쌓는 역할만 합니다(Dumb Broker). 메시지를 어디까지 읽었는지 추적하는 책임은 컨슈머에게 있습니다(Smart Consumer).

2. 메시지 소비 모델: Push vs Pull

  • RabbitMQ: 브로커가 컨슈머에게 메시지를 밀어주는 Push 방식을 사용합니다. 이는 낮은 지연 시간(Low Latency)이 중요한 시나리오에 유리할 수 있지만, 컨슈머의 처리 용량을 초과하는 메시지가 밀려오면 컨슈머가 과부하에 걸릴 수 있습니다.
  • Kafka: 컨슈머가 브로커로부터 메시지를 당겨오는 Pull 방식을 사용합니다. 컨슈머는 자신의 처리 능력에 맞춰 데이터를 가져올 수 있으므로, 데이터 폭주 상황에서도 안정적으로 운영할 수 있습니다.

3. 데이터 보관 및 재사용

  • RabbitMQ: 기본적으로 컨슈머가 메시지를 성공적으로 처리하고 확인(ack)하면 큐에서 삭제됩니다. 메시지는 일회성으로 소비되는 '작업'에 가깝습니다.
  • Kafka: 메시지는 소비 여부와 관계없이 설정된 기간 동안 디스크에 보관됩니다. 이는 단순한 메시징을 넘어 '이벤트 소싱(Event Sourcing)'이나 데이터 분석, 감사 로그 등 다양한 목적으로 데이터를 재사용할 수 있게 해주는 Kafka의 가장 강력한 특징입니다.

4. 성능 및 처리량

  • RabbitMQ: 복잡한 라우팅과 메시지 단위의 처리에 최적화되어 있어, 개별 메시지의 지연 시간은 매우 낮을 수 있습니다. 하지만 처리량 면에서는 Kafka에 비해 한계가 있습니다. 초당 수만 건 수준의 메시지를 처리합니다.
  • Kafka: 대용량 데이터의 순차 처리에 극도로 최적화되어 있습니다. 디스크 I/O를 효율적으로 사용하고 단순한 브로커 구조 덕분에 초당 수십만에서 수백만 건의 메시지를 처리하는 압도적인 처리량을 자랑합니다.

어떤 경우에 RabbitMQ를 선택해야 할까?

다음과 같은 시나리오에서는 RabbitMQ가 더 나은 선택일 수 있습니다.

  • 복잡한 라우팅이 필요할 때: 메시지 내용이나 속성에 따라 여러 다른 큐로 동적으로 라우팅해야 하는 경우.
  • 전통적인 작업 큐가 필요할 때: 이메일 발송, 보고서 생성, 이미지 처리 등 백그라운드에서 실행되어야 할 작업을 여러 워커(worker)에게 분산시키는 경우.
  • 개별 메시지의 빠른 전달과 처리가 중요할 때: 실시간 채팅이나 금융 거래처럼 낮은 지연 시간이 중요한 경우.
  • 레거시 시스템과의 연동: AMQP, STOMP 등 표준 프로토콜 지원이 필요한 경우.

간단한 Python 코드 예시 (pika 라이브러리 사용):


# Producer (발행자)
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

message = 'Hello World!'
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(f" [x] Sent '{message}'")
connection.close()

# Consumer (소비자)
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
    # ... 작업 처리 ...
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

어떤 경우에 Kafka를 선택해야 할까?

다음과 같은 시나리오에서는 Kafka가 빛을 발합니다.

  • 대용량 실시간 데이터 파이프라인 구축: 웹사이트 클릭 스트림, 애플리케이션 로그, IoT 센서 데이터 등 방대한 양의 데이터를 안정적으로 수집하고 처리해야 하는 경우.
  • 이벤트 소싱(Event Sourcing) 아키텍처: 시스템의 모든 상태 변경을 이벤트의 연속으로 기록하고, 이를 기반으로 현재 상태를 재구성하거나 과거 상태를 추적해야 하는 경우.
  • 데이터의 재사용 및 다목적 활용: 하나의 데이터 스트림을 실시간 대시보드, 배치 분석, 머신러닝 모델 학습 등 여러 다른 목적을 가진 컨슈머들이 독립적으로 사용해야 하는 경우.
  • 실시간 스트림 처리: Kafka Streams, Flink 등과 연동하여 데이터가 들어오는 즉시 필터링, 집계, 변환 등의 분석을 수행해야 하는 경우.

간단한 Python 코드 예시 (kafka-python 라이브러리 사용):


# Producer (발행자)
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'log_topic'
message = b'User logged in successfully'

producer.send(topic, message)
producer.flush()
print(f"Sent: {message.decode()}")

# Consumer (소비자)
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'log_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', # 가장 처음부터 메시지를 읽음
    group_id='log-analyzer-group'
)

for message in consumer:
    print(f"Received: {message.value.decode()} at offset {message.offset}")

한눈에 보는 비교표

항목 RabbitMQ Apache Kafka
주요 패러다임 스마트 브로커 (메시지 큐) 멍청한 브로커 (분산 커밋 로그)
소비 모델 Push (브로커 → 컨슈머) Pull (컨슈머 → 브로커)
라우팅 매우 유연하고 복잡한 라우팅 가능 토픽과 파티션 기반의 단순한 라우팅
데이터 보관 소비 후 삭제 (일회성) 정책 기반 영구 보관 (재사용 가능)
처리량 높음 (초당 수만 건) 매우 높음 (초당 수십만~수백만 건)
주요 사용 사례 작업 큐, 복잡한 비즈니스 로직, 낮은 지연 시간의 메시징 로그 수집, 이벤트 소싱, 실시간 데이터 파이프라인, 스트림 처리

결론: '더 좋은 것'이 아니라 '더 적합한 것'을 찾아야

Kafka와 RabbitMQ를 둘러싼 논쟁은 종종 '어느 것이 더 우월한가'로 흐르기 쉽지만, 이는 올바른 접근이 아닙니다. 두 시스템은 서로 다른 문제를 해결하기 위해 태어났으며, 각각의 영역에서 최고의 솔루션입니다.

결정을 내리기 전에 스스로에게 다음과 같은 질문을 던져보세요:

  • "나는 일회성 작업을 안정적으로 분산 처리할 시스템이 필요한가, 아니면 발생한 모든 이벤트를 영구적으로 기록하고 여러 용도로 재사용할 플랫폼이 필요한가?"
  • "메시지 하나하나의 복잡한 라우팅 규칙이 중요한가, 아니면 초당 수백만 건의 데이터를 막힘없이 처리하는 능력이 중요한가?"

RabbitMQ는 복잡한 라우팅과 신뢰성 있는 작업 처리가 필요한 전통적인 메시징 시스템에 탁월한 선택입니다. 반면, Kafka는 대용량 데이터 스트림을 실시간으로 처리하고, 이벤트를 영구적인 기록으로 활용하는 현대적인 데이터 아키텍처의 심장 역할을 하기에 가장 적합합니다.

결국, 정답은 여러분의 프로젝트 요구사항 안에 있습니다. 이 글이 여러분의 시스템에 가장 적합한 메시지 브로커를 선택하는 데 훌륭한 나침반이 되기를 바랍니다.

Kafka vs. RabbitMQ: How to Choose the Right Message Broker for Your Needs

In modern software architecture, especially within microservices environments, asynchronous communication is a cornerstone for building scalable and resilient systems. To facilitate this, we rely on "message brokers." Among the myriad of available solutions, Apache Kafka and RabbitMQ stand out as the two undisputed leaders in the space.

A common question that developers and architects face early in a project is, "Should we use Kafka or RabbitMQ?" There's no simple answer. Both are excellent tools, but they were designed with different philosophies and architectural patterns, making each better suited for specific use cases. This article will provide a deep dive into the core differences between Kafka and RabbitMQ, offering a clear guide on when to choose one over the other.

What is RabbitMQ? The Powerhouse of Traditional Messaging

RabbitMQ is the most popular open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). First released in 2007, it has a long-standing reputation for stability and reliability. The core philosophy of RabbitMQ is based on a "Smart Broker / Dumb Consumer" model.

A "Smart Broker" means that the broker itself is responsible for the complex logic of how and where to route messages. A producer simply sends a message to an "Exchange," and the Exchange, based on predefined rules (bindings and routing keys), distributes the message to the appropriate "Queue." Consumers then fetch messages from these queues to process them.

Key Features of RabbitMQ

  • Flexible Routing: It offers various exchange types—Direct, Topic, Fanout, and Headers—enabling incredibly sophisticated and complex message routing scenarios. For instance, you can route messages to specific queues based on patterns in the routing key.
  • Message Acknowledgement: It natively supports acknowledgements, where a consumer informs the broker upon successful processing of a message. This prevents message loss and ensures operational reliability.
  • Multi-Protocol Support: In addition to its core AMQP 0-9-1 protocol, it supports others like STOMP and MQTT through plugins, facilitating easy integration with diverse client environments.
  • Task Queues: It excels at distributing time-consuming tasks among multiple workers. It's ideal for background jobs like image resizing, PDF generation, or sending emails.

The Core of RabbitMQ's Architecture

The flow in RabbitMQ follows this path: Producer → Exchange → Binding → Queue → Consumer.

  1. Producer: Creates and publishes a message to an Exchange.
  2. Exchange: Receives the message from the producer and acts as a router, deciding which Queue(s) should receive it.
  3. Queue: A buffer that stores messages before they are delivered to a consumer.
  4. Consumer: Connects to a Queue, subscribes to messages, and processes them.

This structure makes RabbitMQ an excellent fit for traditional messaging systems that require fine-grained control over individual messages.

What is Apache Kafka? The Distributed Event Streaming Platform

Apache Kafka was developed at LinkedIn in 2011 to handle high-volume, real-time data feeds. While RabbitMQ is more of a "message broker," Kafka is better described as a "distributed commit log" or an "event streaming platform." Kafka's philosophy is the opposite of RabbitMQ's: a "Dumb Broker / Smart Consumer" model.

A "Dumb Broker" means the broker doesn't perform complex routing. It simply appends data to a log in the order it's received. The "Smart Consumer" is responsible for keeping track of which messages it has read (known as the "offset"). This simple, streamlined architecture is the secret behind Kafka's phenomenal throughput and scalability.

Key Features of Kafka

  • High Throughput: Designed for sequential disk I/O, Kafka can handle millions of messages per second. It's unparalleled for use cases involving massive data volumes, such as log aggregation, IoT data streaming, and real-time analytics.
  • Data Persistence and Replayability: Messages are not deleted after being consumed. Instead, they are retained on disk for a configurable retention period. This allows multiple, independent consumer groups to read the same data stream for different purposes and enables re-processing of data from a specific point in time in case of failure.
  • Scalability and Fault Tolerance: Kafka was designed as a distributed system from the ground up. A "Topic" can be split into multiple "Partitions," which are distributed across a cluster of broker servers. This allows for horizontal scaling and ensures high availability; the system can tolerate server failures without service interruption.
  • Stream Processing: It integrates seamlessly with frameworks like Kafka Streams, Apache Flink, and Spark Streaming to build powerful applications that can transform and analyze data streams in real-time.

The Core of Kafka's Architecture

The flow in Kafka is: Producer → Topic (Partition) → Consumer (Consumer Group).

  1. Producer: Creates and publishes an event to a specific Topic.
  2. Topic: A category or feed name where events are stored. Each topic is divided into one or more Partitions, which are ordered, immutable sequences of records.
  3. Consumer Group: A group of one or more consumers. When a consumer group subscribes to a topic, each partition is assigned to exactly one consumer within that group, enabling parallel processing. The consumer is responsible for tracking its own position (offset) in each partition it reads from.

Core Differences: A Head-to-Head Comparison

Now that we understand their philosophies and architectures, let's compare them on key practical differences.

1. Architectural Model: Smart Broker vs. Dumb Broker

  • RabbitMQ: The broker is intelligent. It handles message routing, tracks delivery status, and more (Smart Broker). This simplifies the consumer's implementation.
  • Kafka: The broker is simple. It just stores data in partitions (Dumb Broker). The responsibility of tracking what has been read lies with the consumer (Smart Consumer).

2. Message Consumption Model: Push vs. Pull

  • RabbitMQ: Uses a Push model, where the broker actively pushes messages to consumers. This can be advantageous for low-latency scenarios but can overwhelm a consumer if messages arrive faster than it can process them.
  • Kafka: Uses a Pull model, where the consumer requests batches of messages from the broker. This allows consumers to control the rate of consumption, preventing them from being overloaded and leading to more stable processing under heavy load.

3. Data Retention and Reusability

  • RabbitMQ: By default, messages are deleted from the queue once they are consumed and acknowledged. They are treated as transient tasks to be completed.
  • Kafka: Messages are retained on disk for a configured period, regardless of whether they have been consumed. This is Kafka's most powerful feature, transforming it from a simple messaging system into a platform for event sourcing, data analysis, auditing, and more.

4. Performance and Throughput

  • RabbitMQ: Optimized for complex routing and per-message guarantees, which can result in very low latency for individual messages. However, its throughput is limited compared to Kafka, typically handling tens of thousands of messages per second.
  • Kafka: Highly optimized for sequential, high-volume data streams. Its efficient use of disk I/O and simple broker logic allow it to achieve massive throughput, often in the hundreds of thousands or even millions of messages per second.

When Should You Choose RabbitMQ?

RabbitMQ is likely the better choice in these scenarios:

  • For complex routing needs: When you need to dynamically route messages to different queues based on their content or attributes.
  • For traditional task queues: Distributing background jobs like sending emails, generating reports, or processing images across multiple workers.
  • When low latency for individual messages is critical: For applications like real-time chat or financial transaction processing.
  • For integration with legacy systems: When you need support for standard protocols like AMQP or STOMP.

A simple Python code example (using the `pika` library):


# Producer
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

message = 'Process this job!'
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(f" [x] Sent '{message}'")
connection.close()

# Consumer
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
    # ... process the job ...
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

When Should You Choose Kafka?

Kafka shines in the following scenarios:

  • For building high-throughput, real-time data pipelines: To reliably ingest and process massive streams of data from website clickstreams, application logs, or IoT sensors.
  • For event sourcing architectures: When you need to record every state change in your system as an immutable sequence of events, allowing you to reconstruct state at any point in time.
  • For data reuse and multi-purpose consumption: When a single data stream needs to be consumed independently by multiple applications for different purposes (e.g., real-time dashboards, batch analytics, machine learning).
  • For real-time stream processing: When you need to perform on-the-fly filtering, aggregation, or transformation of data streams using frameworks like Kafka Streams or Flink.

A simple Python code example (using the `kafka-python` library):


# Producer
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'event_stream'
event = b'user_id:123,action:click,page:home'

producer.send(topic, event)
producer.flush()
print(f"Sent event: {event.decode()}")

# Consumer
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'event_stream',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', # Start reading from the beginning
    group_id='analytics-service'
)

for message in consumer:
    print(f"Consumed event: {message.value.decode()} at offset {message.offset}")

At-a-Glance Comparison Table

Aspect RabbitMQ Apache Kafka
Primary Paradigm Smart Broker (Message Queue) Dumb Broker (Distributed Commit Log)
Consumption Model Push (Broker → Consumer) Pull (Consumer → Broker)
Routing Highly flexible and complex routing Simple routing based on topics and partitions
Data Retention Deleted after consumption (transient) Policy-based retention (persistent & reusable)
Throughput High (tens of thousands/sec) Extremely High (hundreds of thousands+/sec)
Primary Use Cases Task queues, complex business logic, low-latency messaging Log aggregation, event sourcing, real-time data pipelines, stream processing

Conclusion: It's Not About "Better," It's About "Different"

The debate over Kafka vs. RabbitMQ often mistakenly frames it as a question of which is superior. This is the wrong approach. They are two different tools built to solve different problems, and both are best-in-class in their respective domains.

Before making a decision, ask yourself these critical questions:

  • "Do I need a system to reliably distribute transient jobs, or do I need a platform to store a permanent record of events that can be re-read for multiple purposes?"
  • "Is complex routing for individual messages a priority, or is the ability to process millions of events per second without failure the main concern?"

RabbitMQ is an outstanding choice for traditional messaging, where complex routing and reliable task processing are paramount. In contrast, Kafka is the ideal foundation for modern data architectures that treat events as a permanent source of truth and require the processing of massive data streams in real-time.

Ultimately, the right answer lies within the specific requirements of your project. We hope this guide serves as a valuable compass in helping you choose the most suitable message broker for your system.

Kafka vs RabbitMQ: プロジェクトに最適なメッセージブローカーの選び方

現代のソフトウェアアーキテクチャ、特にマイクロサービス(MSA)環境において、非同期通信はシステムの拡張性と安定性を確保するための中心的な要素です。この非同期通信を実現するために、私たちは「メッセージブローカー」を利用します。数あるソリューションの中でも、Apache KafkaとRabbitMQは、間違いなく最も有名で広く利用されている二大巨頭と言えるでしょう。

多くの開発者やアーキテクトが、プロジェクトの初期段階で「Kafkaを使うべきか、それともRabbitMQを使うべきか?」という問いに直面します。この問いに対する答えは単純ではありません。両者はどちらも優れたソリューションですが、異なる思想とアーキテクチャに基づいて設計されているため、特定のユースケースにおいて、それぞれに適性があります。この記事では、KafkaとRabbitMQの核心的な違いを深く掘り下げ、どのようなシナリオでどちらを選択すべきかについての明確な指針を提供します。

RabbitMQとは? 伝統的なメッセージブローカーの強者

RabbitMQは、AMQP (Advanced Message Queuing Protocol) という標準プロトコルを実装した、最も代表的なオープンソースのメッセージブローカーです。2007年に登場して以来、長年にわたりその安定性と信頼性が評価されてきました。RabbitMQの核心的な思想は、「スマートなブローカー / ダムなコンシューマー (Smart Broker / Dumb Consumer)」モデルに基づいています。

ここで言う「スマートなブローカー」とは、メッセージをどこに、どのように配信するかの複雑なルーティングロジックをブローカー自身が担うことを意味します。プロデューサー(Producer)はメッセージをExchangeという場所に送信するだけで、Exchangeが設定されたルール(ルーティングキー、バインディング)に従って適切なキュー(Queue)にメッセージを分配します。そして、コンシューマー(Consumer)はそのキューからメッセージを取得して処理します。

RabbitMQの主な特徴

  • 柔軟なルーティング: Direct, Topic, Fanout, Headersといった多様なExchangeタイプを提供し、非常に複雑で精巧なメッセージルーティングシナリオを実装できます。例えば、特定のパターンのルーティングキーを持つメッセージだけを特定のキューに送る、といった処理が可能です。
  • メッセージ確認応答 (Acknowledgement): コンシューマーがメッセージを正常に処理したことをブローカーに通知する機能を標準でサポートしています。これにより、メッセージの損失を防ぎ、タスクの信頼性を保証します。
  • 多様なプロトコルのサポート: 中核となるAMQP 0-9-1以外にも、STOMPやMQTTなどをプラグイン形式でサポートしており、様々なクライアント環境との統合が容易です。
  • タスクキュー (Task Queues): 複数のコンシューマーにタスクを分散して処理する「ワークキュー」のシナリオに非常に強力です。例えば、画像のリサイズやPDF生成など、時間のかかる処理をバックグラウンドで実行するのに最適です。

RabbitMQアーキテクチャの核心

RabbitMQのフローは Producer → Exchange → Binding → Queue → Consumer の順で構成されます。

  1. Producer: メッセージを生成し、Exchangeに発行(Publish)します。
  2. Exchange: Producerからメッセージを受け取り、どのQueueに送信するかを決定するルーターの役割を担います。
  3. Queue: メッセージがConsumerに配信される前に待機するストレージです。
  4. Consumer: Queueに接続し、メッセージを購読(Subscribe)して処理します。

この構造により、RabbitMQはメッセージ単位でのきめ細やかな制御が求められる伝統的なメッセージングシステムに非常に適しています。

Apache Kafkaとは? 分散イベントストリーミングプラットフォーム

Apache Kafkaは、LinkedInが大規模なリアルタイムデータフィードを処理するために2011年に開発した、分散イベントストリーミングプラットフォームです。RabbitMQが「メッセージブローカー」に近いとすれば、Kafkaは「分散コミットログ」と表現するのがより適切です。Kafkaの思想はRabbitMQとは正反対の、「ダムなブローカー / スマートなコンシューマー (Dumb Broker / Smart Consumer)」モデルです。

「ダムなブローカー」とは、ブローカーが複雑なルーティングロジックを実行せず、単にデータを受け取って順序通りにログに保存する役割しか持たないことを意味します。その代わり、「スマートなコンシューマー」が自身でどこまでデータを読み取ったか(オフセット)を追跡・管理します。このシンプルな構造こそが、Kafkaの驚異的なスループットとスケーラビリティの秘訣です。

Kafkaの主な特徴

  • 高スループット: ディスクへのシーケンシャルI/Oを行うように設計されており、1秒あたり数百万件のメッセージを処理できます。大量のログ収集、IoTデータストリーミングなど、大容量データ処理において圧倒的な性能を誇ります。
  • データの永続化と再生 (Replay): メッセージはコンシューマーに読み取られてもすぐには削除されず、設定された保持期間(Retention Period)の間、ディスクに安全に保管されます。これにより、複数の異なるコンシューマーグループがそれぞれの目的で同じデータを何度も読み直したり、障害発生時に特定の時点からデータを再処理(Replay)したりすることが可能です。
  • スケーラビリティと耐障害性: 最初から分散システムとして設計されています。トピック(Topic)を複数のパーティション(Partition)に分割し、それらを複数のブローカーサーバーに分散して保存することで、水平方向のスケーリングが容易であり、一部のサーバーに障害が発生してもサービスを中断することなく運用できます。
  • ストリーム処理: Kafka Streamsライブラリや、Apache Flink、Spark Streamingといった外部フレームワークと組み合わせることで、リアルタイムのデータストリームを変換・分析する強力なストリーム処理アプリケーションを構築できます。

Kafkaアーキテクチャの核心

Kafkaのフローは Producer → Topic (Partition) → Consumer (Consumer Group) の順で構成されます。

  1. Producer: イベントを生成し、特定のTopicに発行します。
  2. Topic: イベントが保存されるカテゴリです。各トピックは1つ以上のパーティションに分割されて分散保存されます。パーティション内ではデータの順序が保証されます。
  3. Consumer Group: 1つ以上のConsumerで構成されるグループです。1つのトピックを購読する際、各パーティションはコンシューマーグループ内のただ1つのコンシューマーにのみ割り当てられます。これにより並列処理が可能になります。コンシューマーは、自身が最後に読み取ったメッセージの位置(オフセット)を自己管理します。

核心的な違いを徹底比較: Kafka vs RabbitMQ

両システムの思想とアーキテクチャを理解したところで、実用的な違いを比較してみましょう。

1. アーキテクチャモデル: スマートブローカー vs ダムブローカー

  • RabbitMQ: ブローカーがメッセージのルーティングや配信状態の追跡など、多くの役割を担います(スマートブローカー)。これにより、コンシューマーの実装は比較的シンプルになります。
  • Kafka: ブローカーはデータをパーティションに順次書き込むだけのシンプルな役割です(ダムブローカー)。どこまでメッセージを読み取ったかを追跡する責任はコンシューマー側にあります(スマートコンシューマー)。

2. メッセージ消費モデル: Push vs Pull

  • RabbitMQ: ブローカーがコンシューマーにメッセージを押し出すPush方式を採用しています。これは低遅延(ローレイテンシー)が重要なシナリオで有利ですが、コンシューマーの処理能力を超えるメッセージが送られてくると、コンシューマーが過負荷に陥る可能性があります。
  • Kafka: コンシューマーがブローカーからメッセージを引いてくるPull方式を採用しています。コンシューマーは自身の処理能力に合わせてデータを取得できるため、データのバースト発生時にも安定して運用できます。

3. データの保持と再利用

  • RabbitMQ: 基本的に、コンシューマーがメッセージを正常に処理し、確認応答(ack)を返すとキューから削除されます。メッセージは一回限りの「タスク」として扱われます。
  • Kafka: メッセージは消費されたかどうかに関わらず、設定された期間ディスクに保持されます。これは単なるメッセージングを超え、「イベントソーシング」やデータ分析、監査ログなど、多様な目的でデータを再利用可能にする、Kafkaの最も強力な特徴です。

4. パフォーマンスとスループット

  • RabbitMQ: 複雑なルーティングとメッセージ単位の処理に最適化されているため、個々のメッセージの遅延は非常に低く抑えられます。しかし、スループットの面ではKafkaに比べて限界があり、1秒あたり数万件レベルの処理能力です。
  • Kafka: 大量データのシーケンシャル処理に極度に最適化されています。ディスクI/Oの効率的な利用とシンプルなブローカー構造により、1秒あたり数十万から数百万件のメッセージを処理する圧倒的なスループットを誇ります。

どのような場合にRabbitMQを選ぶべきか?

以下のようなシナリオでは、RabbitMQがより良い選択肢となるでしょう。

  • 複雑なルーティングが必要な場合: メッセージの内容や属性に応じて、動的に異なるキューへルーティングする必要があるケース。
  • 伝統的なタスクキューが必要な場合: メール送信、レポート生成、画像処理など、バックグラウンドで実行すべきタスクを複数のワーカーに分散させるケース。
  • 個々のメッセージの迅速な配信と処理が重要な場合: リアルタイムチャットや金融取引のように、低遅延が重視されるケース。
  • レガシーシステムとの連携: AMQPやSTOMPといった標準プロトコルのサポートが必要なケース。

Pythonによる簡単なコード例(pikaライブラリを使用):


# Producer (生産者)
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

message = 'このタスクを処理してください'
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # メッセージを永続化
    ))
print(f" [x] Sent '{message}'")
connection.close()

# Consumer (消費者)
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode('utf-8')}")
    # ... タスク処理 ...
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

どのような場合にKafkaを選ぶべきか?

以下のようなシナリオでは、Kafkaがその真価を発揮します。

  • 大規模なリアルタイムデータパイプラインの構築: Webサイトのクリックストリーム、アプリケーションログ、IoTセンサーデータなど、膨大な量のデータを安定して収集・処理する必要がある場合。
  • イベントソーシングアーキテクチャ: システムの状態変更をすべてイベントの連続として記録し、それをもとに現在の状態を再構築したり、過去の状態を追跡したりする必要がある場合。
  • データの再利用と多目的活用: 一つのデータストリームを、リアルタイムダッシュボード、バッチ分析、機械学習モデルの学習など、複数の異なる目的を持つコンシューマーが独立して利用する必要がある場合。
  • リアルタイムストリーム処理: Kafka StreamsやFlinkなどと連携し、データが流入すると同時にフィルタリング、集計、変換などの分析を行う必要がある場合。

Pythonによる簡単なコード例(kafka-pythonライブラリを使用):


# Producer (生産者)
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
topic = 'user-activity'
event = {'user_id': 'xyz-123', 'action': 'login'}

producer.send(topic, event)
producer.flush()
print(f"Sent event: {event}")

# Consumer (消費者)
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user-activity',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', # 最も古いメッセージから読み込む
    group_id='activity-monitor-group',
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

for message in consumer:
    print(f"Consumed event: {message.value} at offset {message.offset}")

一目でわかる比較表

項目 RabbitMQ Apache Kafka
主要パラダイム スマートブローカー(メッセージキュー) ダムブローカー(分散コミットログ)
消費モデル Push(ブローカー → コンシューマー) Pull(コンシューマー → ブローカー)
ルーティング 非常に柔軟で複雑なルーティングが可能 トピックとパーティションに基づく単純なルーティング
データ保持 消費後に削除(一時的) ポリシーに基づき永続保持(再利用可能)
スループット 高い(毎秒数万件) 極めて高い(毎秒数十万件以上)
主要なユースケース タスクキュー、複雑なビジネスロジック、低遅延メッセージング ログ収集、イベントソーシング、リアルタイムデータパイプライン、ストリーム処理

結論: 「どちらが良いか」ではなく「どちらが適しているか」

KafkaとRabbitMQをめぐる議論は、しばしば「どちらが優れているか」という方向に進みがちですが、それは正しいアプローチではありません。この二つのシステムは、異なる問題を解決するために生まれ、それぞれの領域で最高のソリューションです。

決定を下す前に、自分自身に次のような問いを投げかけてみてください。

  • 「一回限りのタスクを安定して分散処理するシステムが必要なのか、それとも発生したすべてのイベントを永続的に記録し、多目的に再利用できるプラットフォームが必要なのか?」
  • 「メッセージ一つひとつの複雑なルーティング規則が重要なのか、それとも毎秒数百万件のデータを滞りなく処理する能力が重要なのか?」

RabbitMQは、複雑なルーティングと信頼性の高いタスク処理が求められる伝統的なメッセージングシステムにおいて、卓越した選択肢です。一方、Kafkaは、イベントを永続的な真実の記録として扱い、大規模なデータストリームをリアルタイムで処理する必要がある現代的なデータアーキテクチャの心臓部として、最も適しています。

最終的に、答えはあなたのプロジェクトの要求事項の中にあります。この記事が、あなたのシステムに最も適したメッセージブローカーを選択する上で、価値ある羅針盤となることを願っています。

Wednesday, September 20, 2023

스프링 부트와 카프카로 구축하는 실전 마이크로서비스 아키텍처

서론: 왜 지금 마이크로서비스와 비동기 메시징인가?

현대의 소프트웨어 개발 패러다임은 거대한 단일 애플리케이션(Monolithic Architecture)에서 작고 독립적으로 배포 가능한 여러 서비스의 집합(Microservices Architecture)으로 빠르게 이동하고 있습니다. 이러한 변화의 중심에는 비즈니스 민첩성 향상, 기술 스택의 유연성, 그리고 서비스 단위의 확장성 확보라는 명확한 목표가 있습니다. 하지만 여러 서비스가 분산되어 동작하는 환경은 필연적으로 서비스 간의 통신이라는 새로운 복잡성을 야기합니다.

가장 단순한 통신 방식은 REST API를 이용한 동기(Synchronous) 호출입니다. 서비스 A가 서비스 B의 기능이 필요할 때, 서비스 B의 API를 호출하고 응답이 올 때까지 기다리는 방식이죠. 이 방식은 직관적이지만, 치명적인 약점을 가집니다. 만약 서비스 B가 일시적인 장애를 겪거나 응답이 지연되면, 이를 호출한 서비스 A 역시 장애가 전파되어 전체 시스템의 안정성을 해칠 수 있습니다. 이를 '결합(Coupling)'이 높다고 표현하며, 분산 시스템에서는 이러한 강한 결합을 피하는 것이 매우 중요합니다.

이 문제에 대한 효과적인 해결책이 바로 이벤트 기반 아키텍처(Event-Driven Architecture)비동기(Asynchronous) 메시징입니다. 서비스들은 직접 통신하는 대신, '이벤트'라는 메시지를 중앙 메시지 시스템에 발행(Publish)하고, 해당 이벤트에 관심 있는 다른 서비스들이 이를 구독(Subscribe)하여 처리합니다. 이 과정은 비동기적으로 일어나므로, 메시지를 발행한 서비스는 수신 서비스의 상태와 관계없이 자신의 작업을 계속할 수 있습니다. 서비스 간의 결합이 느슨해지고(Loosely Coupled), 개별 서비스의 장애가 전체 시스템으로 확산되는 것을 방지하여 시스템의 회복탄력성(Resilience)을 극적으로 향상시킵니다.

이러한 이벤트 기반 마이크로서비스 아키텍처를 구현하는 데 있어 스프링 부트(Spring Boot)아파치 카프카(Apache Kafka)는 현재 업계에서 가장 강력하고 대중적인 조합 중 하나입니다. 스프링 부트는 자바 생태계에서 마이크로서비스 개발을 위한 사실상의 표준으로, 개발자가 비즈니스 로직에만 집중할 수 있도록 수많은 편의 기능을 제공합니다. 아파치 카프카는 단순한 메시지 큐를 넘어, 대용량의 실시간 데이터 스트림을 안정적으로 처리하기 위해 설계된 분산 스트리밍 플랫폼입니다.

본 글에서는 스프링 부트와 카프카를 결합하여 어떻게 견고하고 확장 가능한 마이크로서비스를 구축할 수 있는지, 이론적 배경부터 실전 예제 코드, 그리고 운영 환경에서 고려해야 할 고급 주제까지 단계별로 상세하게 다룰 것입니다. 단순히 코드를 나열하는 것을 넘어, 각 기술의 핵심 원리를 이해하고 왜 이러한 방식으로 설계해야 하는지에 대한 깊이 있는 통찰을 제공하는 것을 목표로 합니다.

1장: 핵심 기반 다지기 - 스프링 부트와 아파치 카프카

본격적인 구현에 앞서, 우리가 사용할 두 가지 핵심 기술인 스프링 부트와 카프카의 본질을 정확히 이해하는 것이 중요합니다. 이들의 철학과 내부 동작 원리를 알면, 단순히 라이브러리를 사용하는 것을 넘어 발생하는 문제에 효과적으로 대처하고 시스템을 최적화할 수 있는 능력을 갖추게 됩니다.

1.1. 스프링 부트(Spring Boot)의 철학

스프링 부트는 기존 스프링 프레임워크(Spring Framework)의 복잡성을 해결하기 위해 탄생했습니다. 과거 스프링 기반 애플리케이션을 개발하기 위해서는 수많은 XML 설정과 라이브러리 버전 호환성 문제로 인해 개발 초기 단계부터 상당한 시간과 노력을 소모해야 했습니다. 스프링 부트는 다음 세 가지 핵심 철학을 통해 이러한 문제를 해결합니다.

  • Convention over Configuration (설정보다 관례): "개발자가 특별히 설정하지 않으면, 가장 보편적이고 합리적인 방식으로 동작해야 한다"는 원칙입니다. 예를 들어, H2 데이터베이스 라이브러리가 클래스패스에 존재하면, 스프링 부트는 별도의 설정 없이 자동으로 인메모리 데이터베이스 연결을 구성해줍니다. 개발자는 꼭 필요한 경우에만 설정을 변경하면 되므로, 설정 파일의 양이 획기적으로 줄어듭니다.
  • Auto-configuration (자동 설정): 스프링 부트는 프로젝트의 클래스패스에 포함된 라이브러리들을 분석하여, 해당 라이브러리들을 사용하는 데 필요한 스프링 빈(Bean)들을 자동으로 등록하고 설정합니다. 예를 들어, spring-boot-starter-web 의존성을 추가하면 내장 톰캣(Tomcat) 서버, 디스패처 서블릿(DispatcherServlet), Jackson 메시지 컨버터 등이 모두 자동으로 구성되어 즉시 웹 애플리케이션을 개발할 수 있습니다.
  • Starter Dependencies (스타터 의존성): 특정 기능을 개발하는 데 필요한 라이브러리들의 묶음을 '스타터'라는 이름으로 제공합니다. 예를 들어, 카프카 연동을 위해서는 spring-kafka 스타터를, JPA를 사용하기 위해서는 spring-boot-starter-data-jpa 스타터를 추가하기만 하면 됩니다. 이를 통해 개발자는 라이브러리 간의 복잡한 버전 호환성 문제를 고민할 필요 없이, 검증된 조합을 손쉽게 사용할 수 있습니다.

이러한 특징들 덕분에 스프링 부트는 개발자가 인프라 설정이 아닌 비즈니스 로직 구현에 온전히 집중할 수 있는 환경을 제공하며, 마이크로서비스 개발의 생산성을 극대화하는 강력한 도구로 자리매김했습니다.

1.2. 아파치 카프카(Apache Kafka)의 구조와 원리

카프카를 단순한 메시지 큐(Message Queue)로 생각하면 그 잠재력의 절반도 활용하지 못하는 것입니다. 카프카는 분산 커밋 로그(Distributed Commit Log)라는 핵심 아이디어를 기반으로 하는 분산 스트리밍 플랫폼입니다.

카프카의 핵심 구성 요소는 다음과 같습니다.

  • 브로커(Broker): 카프카 서버의 단일 인스턴스를 의미합니다. 일반적으로 3대 이상의 브로커가 클러스터(Cluster)를 구성하여 동작하며, 이를 통해 고가용성과 확장성을 보장합니다.
  • 주키퍼(Zookeeper) / KRaft: 카프카 클러스터의 메타데이터(브로커 정보, 토픽 설정, 리더 선출 등)를 관리하는 코디네이션 시스템입니다. 최신 버전의 카프카에서는 주키퍼 의존성을 제거하고 자체적인 합의 프로토콜인 KRaft(Kafka Raft) 모드를 도입하여 운영 복잡성을 낮추고 있습니다.
  • 토픽(Topic): 메시지를 구분하기 위한 논리적인 채널 또는 카테고리입니다. 마치 데이터베이스의 테이블과 유사한 개념으로, 프로듀서는 특정 토픽에 메시지를 발행하고, 컨슈머는 특정 토픽의 메시지를 구독합니다.
  • 파티션(Partition): 각 토픽은 하나 이상의 파티션으로 나뉘어 저장됩니다. 파티션은 메시지가 실제로 저장되는 물리적인 단위이며, 순서가 보장되는 불변의(immutable) 로그 파일입니다. 토픽을 여러 파티션으로 나누면, 여러 컨슈머가 각기 다른 파티션을 병렬로 처리할 수 있게 되어 처리량을 극대화할 수 있습니다. 하나의 파티션 내에서는 메시지의 순서가 보장되지만, 토픽 전체적으로는 메시지 순서가 보장되지 않습니다.
  • 오프셋(Offset): 파티션 내에서 각 메시지가 가지는 고유한 순번(ID)입니다. 컨슈머는 이 오프셋을 기준으로 자신이 어디까지 메시지를 읽었는지 추적합니다.
  • 프로듀서(Producer): 카프카 토픽으로 메시지(이벤트)를 발행하는 클라이언트 애플리케이션입니다.
  • 컨슈머(Consumer): 카프카 토픽의 메시지를 구독하여 가져와서 처리하는 클라이언트 애플리케이션입니다.
  • 컨슈머 그룹(Consumer Group): 하나 이상의 컨슈머가 모여 구성된 그룹입니다. 특정 토픽을 구독하는 컨슈머 그룹 내에서는 하나의 파티션이 오직 하나의 컨슈머에게만 할당됩니다. 만약 컨슈머 그룹에 새로운 컨슈머가 추가되거나 기존 컨슈머가 종료되면, 파티션의 소유권이 재조정되는 '리밸런싱(Rebalancing)' 과정이 발생합니다. 이 메커니즘을 통해 카프카는 장애 허용(Fault Tolerance)과 수평적 확장(Horizontal Scaling)을 동시에 달성합니다.

카프카는 메시지를 컨슈머가 읽어가도 바로 삭제하지 않고, 설정된 보관 주기(retention period) 동안 디스크에 안전하게 보관합니다. 이 특징 덕분에 여러 다른 컨슈머 그룹이 각자의 필요에 따라 동일한 데이터를 여러 번 읽어갈 수 있으며, 실시간 처리뿐만 아니라 배치(batch) 처리나 데이터 분석 파이프라인 구축에도 매우 유용합니다.

2장: 개발 환경 구축 및 초기 설정

이제 이론적 배경을 바탕으로 실제 개발을 위한 환경을 구축해 보겠습니다. 로컬 환경에서 카프카 클러스터를 손쉽게 실행하고, 스프링 부트 프로젝트를 생성하여 카프카 연동을 위한 기본 설정을 완료하는 과정을 단계별로 안내합니다.

2.1. 필수 준비물

개발을 시작하기 전에 다음 소프트웨어가 시스템에 설치되어 있는지 확인하세요.

  • Java Development Kit (JDK): 11 버전 이상 (17 LTS 권장)
  • Build Tool: Maven 3.6+ 또는 Gradle 6.8+
  • IDE: IntelliJ IDEA, Eclipse, VS Code 등 선호하는 Java IDE
  • Docker & Docker Compose: 로컬에서 카프카와 주키퍼를 컨테이너로 실행하기 위해 필요합니다.

2.2. Docker Compose를 활용한 카프카 클러스터 실행

카프카와 주키퍼를 직접 다운로드하여 설치하는 것은 번거로울 수 있습니다. Docker Compose를 사용하면 단 몇 줄의 설정 파일과 명령어 하나로 손쉽게 전체 클러스터를 실행할 수 있습니다.

프로젝트 루트 디렉터리에 docker-compose.yml 파일을 생성하고 아래 내용을 작성하세요.


version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    container_name: kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

이 설정 파일은 Confluent Platform 이미지를 사용하여 주키퍼와 단일 노드 카프카 브로커를 실행합니다. 특히 KAFKA_ADVERTISED_LISTENERS 설정이 중요합니다. localhost:9092는 Docker 호스트(우리의 PC)에서 카프카에 접근할 때 사용하는 주소이고, kafka:29092는 Docker 네트워크 내부에서 다른 컨테이너가 카프카에 접근할 때 사용하는 주소입니다.

이제 터미널에서 아래 명령어를 실행하여 카프카 클러스터를 시작합니다.


docker-compose up -d

-d 옵션은 컨테이너를 백그라운드에서 실행하도록 합니다. docker ps 명령어로 zookeeper와 kafka 컨테이너가 정상적으로 실행 중인지 확인할 수 있습니다.

2.3. Spring Initializr를 통한 프로젝트 생성

스프링 부트 프로젝트를 가장 쉽게 시작하는 방법은 Spring Initializr 웹사이트를 이용하는 것입니다. 다음 설정으로 프로젝트를 생성합니다.

  • Project: Gradle - Groovy (또는 Maven)
  • Language: Java
  • Spring Boot: 3.X.X 버전대 (안정적인 최신 버전 선택)
  • Project Metadata:
    • GroupId: com.example
    • ArtifactId: kafka-microservice
    • Name: kafka-microservice
    • Packaging: Jar
    • Java: 17 (또는 설치된 JDK 버전)
  • Dependencies:
    • Spring Web
    • Spring for Apache Kafka

'GENERATE' 버튼을 클릭하여 프로젝트 압축 파일을 다운로드하고, 원하는 위치에 압축을 해제한 후 IDE로 프로젝트를 엽니다.

2.4. application.yml 상세 설정 파헤치기

프로젝트의 src/main/resources/ 경로에 있는 application.properties 파일을 application.yml로 변경하고, 카프카 연동을 위한 설정을 추가합니다. YAML 형식은 계층 구조를 표현하기에 더 용이하여 복잡한 설정에 유리합니다.


spring:
  application:
    name: kafka-microservice-app

  kafka:
    # --- Producer Configurations ---
    producer:
      # 카프카 클러스터에 연결하기 위한 브로커 서버 목록
      bootstrap-servers: localhost:9092
      # 메시지 키를 직렬화(byte array로 변환)하는 클래스
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 메시지 값을 직렬화하는 클래스 (JSON 전송을 위해 추후 변경 예정)
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 프로듀서가 리더 파티션으로부터 응답을 기다리는 기준
      # all: ISR(In-Sync Replicas)에 포함된 모든 복제본에 쓰기가 완료되었을 때 응답을 받음. 데이터 유실 가능성 최소화.
      acks: all 
      properties:
        # Exactly-once 시맨틱스를 위한 설정 (6장에서 상세히 다룸)
        "enable.idempotence": "true" 
        # 재시도 사이에 발생하는 지연 시간(ms)
        "delivery.timeout.ms": "120000"
        # 재시도 횟수
        "retries": "3"

    # --- Consumer Configurations ---
    consumer:
      # 카프카 클러스터에 연결하기 위한 브로커 서버 목록
      bootstrap-servers: localhost:9092
      # 컨슈머가 속할 그룹 ID (동일 그룹 내에서는 파티션이 분배됨)
      group-id: my-group
      # 브로커에 초기 오프셋 정보가 없을 때 어디서부터 읽을지 결정
      # earliest: 가장 오래된 메시지부터, latest: 가장 최신 메시지부터
      auto-offset-reset: earliest
      # 메시지 키를 역직렬화(byte array에서 객체로 변환)하는 클래스
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 메시지 값을 역직렬화하는 클래스
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 오프셋을 자동으로 커밋할지 여부
      enable-auto-commit: false # false로 설정하여 수동 커밋 사용 권장 (4장에서 상세히 다룸)

    # --- Listener (Consumer) Configurations ---
    listener:
      # 컨슈머 리스너의 동작 방식 설정
      # MANUAL_IMMEDIATE: 수동으로 즉시 오프셋을 커밋
      ack-mode: manual_immediate

각 설정 항목에 대한 주석을 통해 그 의미를 파악하는 것이 중요합니다. 특히 프로듀서의 acks 설정은 데이터의 신뢰성과 성능 사이의 트레이드오프를 결정하는 핵심 요소이며, 컨슈머의 auto-offset-resetenable-auto-commit 설정은 메시지 처리 시맨틱스를 결정하는 데 매우 중요합니다.

이로써 우리는 카프카를 사용할 준비를 마쳤습니다. 다음 장에서는 이 설정을 바탕으로 메시지를 발행하는 프로듀서를 구현해 보겠습니다.

3장: 프로듀서 구현 - 안정적인 메시지 발행

카프카 프로듀서는 마이크로서비스 아키텍처에서 이벤트의 시작점입니다. 서비스에서 발생한 중요한 상태 변화(예: 주문 생성, 회원 가입)를 안정적으로 카프카 토픽에 전달하는 역할을 맡습니다. 이번 장에서는 스프링 카프카가 제공하는 KafkaTemplate을 활용하여 단순한 문자열 메시지부터 복잡한 객체(DTO)까지 전송하는 방법을 다룹니다.

3.1. Java Configuration을 통한 프로듀서 설정

application.yml을 통한 설정은 편리하지만, 더 세밀한 제어나 커스텀 로직이 필요한 경우 Java Configuration 클래스를 사용하는 것이 좋습니다. 여기서는 KafkaTemplate을 생성하는 데 필요한 ProducerFactory 빈을 직접 등록해 보겠습니다.

config 패키지를 생성하고 KafkaProducerConfig.java 클래스를 작성합니다.


package com.example.kafkamicorservice.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // application.yml에 정의한 다른 프로듀서 속성들도 필요에 따라 추가할 수 있습니다.
        // configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

이 설정 클래스는 application.ymlbootstrap-servers 값을 읽어와 프로듀서 설정을 구성합니다. 이렇게 Java 코드로 설정을 관리하면 컴파일 시점에 타입 체크가 가능하고, 프로그래밍적으로 설정을 동적으로 변경하는 등의 유연성을 확보할 수 있습니다.

3.2. DTO와 JSON을 활용한 구조화된 데이터 전송

실제 애플리케이션에서는 단순 문자열보다 구조화된 데이터를 주고받는 경우가 훨씬 많습니다. 이때 DTO(Data Transfer Object)를 정의하고, 이를 JSON 형식으로 직렬화하여 카프카에 전송하는 것이 일반적입니다.

먼저, 전송할 데이터를 담을 OrderEvent.java DTO 클래스를 생성합니다.


package com.example.kafkamicorservice.event;

// Lombok 어노테이션을 사용하면 Getter, Setter, 생성자 등을 자동으로 생성해줍니다.
// import lombok.AllArgsConstructor;
// import lombok.Data;
// import lombok.NoArgsConstructor;
//
// @Data
// @AllArgsConstructor
// @NoArgsConstructor
public class OrderEvent {
    private String orderId;
    private String product;
    private int quantity;
    private long timestamp;

    // Lombok을 사용하지 않을 경우
    public OrderEvent() {}

    public OrderEvent(String orderId, String product, int quantity, long timestamp) {
        this.orderId = orderId;
        this.product = product;
        this.quantity = quantity;
        this.timestamp = timestamp;
    }
    // Getters and Setters ...
}

이제 OrderEvent 객체를 JSON으로 직렬화하기 위해 스프링 카프카가 제공하는 JsonSerializer를 사용하도록 설정을 변경해야 합니다. 먼저, KafkaProducerConfig.java를 수정합니다.


// KafkaProducerConfig.java

import org.springframework.kafka.support.serializer.JsonSerializer;
// ... (다른 import문 생략)

@Configuration
public class KafkaProducerConfig {
    
    // ... (bootstrapServers 필드 생략)

    // DTO를 전송하기 위한 ProducerFactory 및 KafkaTemplate 빈을 추가
    @Bean
    public ProducerFactory<String, Object> producerFactoryWithJson() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // Value Serializer를 JsonSerializer로 변경
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplateWithJson() {
        return new KafkaTemplate<>(producerFactoryWithJson());
    }

    // 기존 문자열 전송을 위한 KafkaTemplate은 그대로 유지하거나 필요 없다면 삭제
    // @Bean
    // public ProducerFactory producerFactory() { ... }
    // @Bean
    // public KafkaTemplate kafkaTemplate() { ... }
}

이제 이 KafkaTemplate<String, Object>를 사용하여 OrderEvent 객체를 전송하는 서비스를 만들어 보겠습니다. service 패키지를 만들고 EventProducerService.java를 작성합니다.


package com.example.kafkamicorservice.service;

import com.example.kafkamicorservice.event.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class EventProducerService {
    private static final Logger logger = LoggerFactory.getLogger(EventProducerService.class);
    private static final String TOPIC_NAME = "order-events";

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public EventProducerService(@Qualifier("kafkaTemplateWithJson") KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrderEvent(OrderEvent event) {
        logger.info("Producing message: {}", event);
        kafkaTemplate.send(TOPIC_NAME, event.getOrderId(), event);
    }
}

위 코드에서 kafkaTemplate.send() 메소드의 두 번째 인자로 event.getOrderId()를 전달한 것에 주목하세요. 이것이 바로 메시지 키(Key)입니다. 키를 지정하면, 카프카는 키의 해시값을 계산하여 특정 파티션에 메시지를 일관되게 보냅니다. 즉, 동일한 주문 ID(키)를 가진 이벤트들은 항상 같은 파티션으로 들어가게 되어, 해당 주문에 대한 이벤트 처리 순서를 보장하는 데 매우 중요한 역할을 합니다.

3.3. 비동기 전송과 콜백을 이용한 결과 처리

KafkaTemplate.send() 메소드는 기본적으로 비동기로 동작합니다. 즉, 메소드는 메시지를 내부 버퍼에 추가하고 즉시 리턴하며, 실제 전송은 백그라운드 스레드에서 이루어집니다. 이는 높은 처리량을 보장하지만, 전송이 성공했는지 실패했는지 바로 알 수는 없습니다.

전송 결과를 확인하고 싶다면 send 메소드가 반환하는 CompletableFuture(최신 스프링 카프카 버전) 또는 ListenableFuture(이전 버전)를 사용하여 콜백을 등록할 수 있습니다.

EventProducerService.java를 다음과 같이 수정해 보겠습니다.


package com.example.kafkamicorservice.service;

import com.example.kafkamicorservice.event.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class EventProducerService {
    private static final Logger logger = LoggerFactory.getLogger(EventProducerService.class);
    private static final String TOPIC_NAME = "order-events";

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public EventProducerService(@Qualifier("kafkaTemplateWithJson") KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrderEvent(OrderEvent event) {
        CompletableFuture<SendResult<String, Object>> future = 
                kafkaTemplate.send(TOPIC_NAME, event.getOrderId(), event);

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                // 전송 성공
                logger.info("Sent message=[{}] with offset=[{}] on partition=[{}]",
                        event,
                        result.getRecordMetadata().offset(),
                        result.getRecordMetadata().partition());
            } else {
                // 전송 실패
                logger.error("Unable to send message=[{}] due to : {}", event, ex.getMessage());
            }
        });
    }
}

이제 메시지 전송 후 성공 시에는 해당 메시지가 저장된 파티션과 오프셋 정보를 로그로 남기고, 실패 시에는 에러 로그를 남겨서 전송 상태를 명확하게 추적할 수 있습니다. 이는 장애 상황을 분석하고 디버깅하는 데 매우 중요합니다. 예를 들어, 브로커에 연결할 수 없거나 메시지 크기가 너무 커서 전송에 실패하는 경우, 이 콜백을 통해 즉시 문제를 인지하고 재시도 로직이나 관리자 알림 등의 후속 조치를 취할 수 있습니다.

4장: 컨슈머 구현 - 신뢰성 있는 메시지 소비

프로듀서가 발행한 이벤트를 받아 실질적인 비즈니스 로직을 수행하는 것이 컨슈머의 역할입니다. 컨슈머를 구현할 때는 메시지를 안정적으로 처리하고, 장애 발생 시 메시지 유실 없이 복구할 수 있도록 설계하는 것이 무엇보다 중요합니다. 이번 장에서는 @KafkaListener 어노테이션을 중심으로, JSON 데이터 처리, 컨슈머 그룹을 통한 확장, 그리고 에러 핸들링 전략에 대해 심도 있게 다룹니다.

4.1. Java Configuration을 통한 컨슈머 설정

프로듀서와 마찬가지로 컨슈머도 Java Configuration을 통해 더 상세하게 설정할 수 있습니다. 특히 @KafkaListener가 사용할 ConcurrentKafkaListenerContainerFactory 빈을 정의하여 컨슈머의 동작 방식을 제어합니다.

config 패키지에 KafkaConsumerConfig.java 클래스를 작성합니다.


package com.example.kafkamicorservice.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    
    // JSON 메시지를 수신하기 위한 설정은 다음 섹션에서 추가됩니다.

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 수동 커밋을 위한 AckMode 설정
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

여기서 가장 중요한 설정은 setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)입니다. 이는 application.ymlenable-auto-commit: false 설정과 함께, 컨슈머가 메시지를 언제 "처리 완료"로 표시할지(오프셋 커밋)를 애플리케이션 코드에서 직접 제어하겠다는 의미입니다. 메시지 처리가 완전히 성공했을 때만 커밋함으로써, 처리 도중 에러가 발생하더라도 메시지가 유실되는 것을 방지할 수 있습니다.

4.2. JSON 메시지 역직렬화 및 DTO 변환

프로듀서가 OrderEvent DTO를 JSON으로 보내므로, 컨슈머는 이 JSON 문자열을 다시 OrderEvent 객체로 변환(역직렬화)해야 합니다. 이를 위해 스프링 카프카의 JsonDeserializer를 사용합니다.

KafkaConsumerConfig.java에 JSON 역직렬화를 위한 팩토리와 리스너 컨테이너 팩토리를 추가합니다.


// KafkaConsumerConfig.java

import com.example.kafkamicorservice.event.OrderEvent;
import org.springframework.kafka.support.serializer.JsonDeserializer;
// ... (다른 import문 생략)

@Configuration
public class KafkaConsumerConfig {
    
    // ... (기존 필드 및 빈 설정 생략)

    @Bean
    public ConsumerFactory<String, OrderEvent> orderEventConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // JsonDeserializer 설정
        JsonDeserializer<OrderEvent> deserializer = new JsonDeserializer<>(OrderEvent.class);
        // 신뢰할 수 없는 패키지로부터의 역직렬화를 방지하기 위한 설정
        // '*'는 모든 패키지를 허용. 보안상 특정 패키지만 명시하는 것이 좋음.
        deserializer.addTrustedPackages("com.example.kafkamicorservice.event");
        
        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> orderEventKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderEventConsumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

JsonDeserializer를 설정할 때 addTrustedPackages를 통해 역직렬화할 DTO 클래스가 위치한 패키지를 명시해주는 것이 중요합니다. 이는 보안 취약점을 방지하기 위한 조치입니다.

이제 service 패키지에 EventConsumerService.java를 생성하고 @KafkaListener를 사용하여 메시지를 수신합니다.


package com.example.kafkamicorservice.service;

import com.example.kafkamicorservice.event.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class EventConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumerService.class);

    @KafkaListener(topics = "order-events",
                   groupId = "${spring.kafka.consumer.group-id}",
                   containerFactory = "orderEventKafkaListenerContainerFactory")
    public void consumeOrderEvent(@Payload OrderEvent event,
                                  @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                                  @Header(KafkaHeaders.OFFSET) long offset,
                                  Acknowledgment acknowledgment) {
        
        logger.info("Received message: partition={}, offset={}, event={}", partition, offset, event.getOrderId());
        
        try {
            // 여기에 실제 비즈니스 로직을 구현합니다.
            // 예: 데이터베이스에 저장, 외부 API 호출, 알림 발송 등
            logger.info("Processing order for product: {}", event.getProduct());
            Thread.sleep(1000); // 로직 처리 시간 시뮬레이션

            // 처리가 성공적으로 완료되면 오프셋을 커밋
            acknowledgment.acknowledge();
            logger.info("Successfully processed and acknowledged the message.");

        } catch (Exception e) {
            logger.error("Error processing message: {}", e.getMessage());
            // 에러 발생 시 커밋하지 않음. 
            // 이 메시지는 나중에 다시 처리 시도됨 (에러 핸들링 전략에 따라 동작이 달라짐)
        }
    }
}

@KafkaListener 어노테이션의 containerFactory 속성을 통해 방금 설정한 orderEventKafkaListenerContainerFactory 빈을 사용하도록 지정했습니다. 리스너 메소드의 파라미터로 @Payload를 사용해 메시지 본문을 OrderEvent 객체로 바로 받을 수 있으며, Acknowledgment 객체를 받아 acknowledge()를 호출함으로써 수동으로 오프셋을 커밋합니다. 로직 수행 중 예외가 발생하면 acknowledge()가 호출되지 않으므로, 이 메시지는 다음 폴링(polling) 시에 다시 전달되어 재처리를 시도하게 됩니다.

4.3. 컨슈머 그룹과 파티션 리밸런싱을 통한 확장성 확보

카프카의 진정한 힘은 컨슈머 그룹을 통한 수평적 확장에서 나옵니다. 만약 `order-events` 토픽이 6개의 파티션으로 구성되어 있다고 가정해 보겠습니다.

  • 컨슈머 1개 실행: 해당 컨슈머가 6개 파티션의 모든 메시지를 처리합니다.
  • 컨슈머 2개 실행 (동일 그룹 ID): 카프카는 리밸런싱을 통해 각 컨슈머에게 3개의 파티션을 할당합니다. 전체 처리량이 이론적으로 2배가 됩니다.
  • 컨슈머 6개 실행 (동일 그룹 ID): 각 컨슈머가 1개의 파티션을 전담하여 처리합니다. 처리량이 최대로 확장됩니다.
  • 컨슈머 7개 실행 (동일 그룹 ID): 6개의 컨슈머가 각각 파티션을 할당받고, 1개의 컨슈머는 아무 일도 하지 않고 대기(idle)합니다. 파티션 수보다 많은 컨슈머는 낭비입니다.

스프링 부트 애플리케이션을 여러 인스턴스로 실행하기만 하면, 동일한 group-id를 가진 컨슈머들이 자동으로 파티션을 나누어 처리하게 되므로, 메시지 처리량이 많아질 경우 별도의 코드 수정 없이 서버 인스턴스를 늘리는 것만으로 손쉽게 시스템을 확장할 수 있습니다.

4.4. 견고한 시스템을 위한 컨슈머 에러 핸들링 전략

메시지 처리 중 예외가 발생했을 때, 무한정 재시도하는 것은 시스템에 큰 부담을 줄 수 있습니다. 예를 들어, 메시지 형식이 잘못되어 역직렬화에 계속 실패하는 경우, 해당 메시지는 계속해서 재처리되며 다른 정상적인 메시지들의 처리를 막는 '블로킹(blocking)' 현상을 유발합니다.

이를 해결하기 위해 스프링 카프카는 다양한 에러 핸들링 전략을 제공합니다. 그중 가장 널리 사용되는 것은 Dead Letter Topic (DLT) 패턴입니다.

DLT는 특정 횟수 이상 재시도에 실패한 메시지를 별도의 '죽은 메시지' 토픽으로 보내고, 원래 토픽에서는 해당 메시지를 커밋하여 다음 메시지 처리를 계속 진행하는 방식입니다. 이를 통해 문제의 메시지를 격리하고 시스템 전체의 흐름을 유지할 수 있습니다.

KafkaConsumerConfig.java에 DLT를 위한 에러 핸들러를 설정해 보겠습니다.


// KafkaConsumerConfig.java
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

// ...

@Configuration
public class KafkaConsumerConfig {

    // ... (기존 설정 생략)

    // DLT로 메시지를 보내기 위해 KafkaTemplate이 필요합니다.
    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) {
        // 재시도 실패 시 DeadLetterPublishingRecoverer를 호출합니다.
        // dlt-order-events 토픽으로 메시지를 보냅니다.
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (rec, ex) -> new TopicPartition("dlt-order-events", rec.partition()));

        // 1초 간격으로 최대 3번 재시도합니다.
        FixedBackOff backOff = new FixedBackOff(1000L, 2L);
        return new DefaultErrorHandler(recoverer, backOff);
    }
    
    // orderEventKafkaListenerContainerFactory 빈 설정을 수정합니다.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> orderEventKafkaListenerContainerFactory(
            DefaultErrorHandler errorHandler) { // errorHandler를 주입받습니다.
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderEventConsumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 생성한 에러 핸들러를 컨테이너 팩토리에 설정합니다.
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

이제 컨슈머에서 처리할 수 없는 예외(예: NullPointerException)가 발생하면, 애플리케이션은 1초 간격으로 두 번 더 재시도합니다. 세 번의 시도 모두 실패하면 해당 메시지는 dlt-order-events라는 이름의 토픽으로 전송되고, 원래 메시지는 커밋 처리됩니다. 개발자는 나중에 DLT에 쌓인 메시지들을 분석하여 원인을 파악하고, 버그를 수정한 뒤 수동으로 재처리하는 등의 조치를 취할 수 있습니다. 이는 시스템의 안정성과 데이터 무결성을 보장하는 핵심적인 장치입니다.

5장: 실전 예제 - 이벤트 기반 주문/알림 시스템 구축

지금까지 배운 개념들을 종합하여 간단하지만 현실적인 마이크로서비스 아키텍처 예제를 구현해 보겠습니다. 사용자가 주문을 생성하면 '주문 서비스'가 이를 처리하고, 카프카를 통해 '주문 생성됨' 이벤트를 발행합니다. 그러면 '알림 서비스'가 이 이벤트를 구독하여 사용자에게 알림을 보내는 시나리오입니다.

실제로는 두 서비스를 별개의 스프링 부트 프로젝트로 구성해야 하지만, 여기서는 설명을 위해 하나의 프로젝트 내에 두 서비스의 역할을 모두 구현하겠습니다.

5.1. 시나리오 정의: 주문 서비스와 알림 서비스

  1. 사용자: HTTP POST 요청으로 새로운 주문을 생성합니다.
  2. 주문 서비스 (Order Service - Producer 역할):
    • REST API 엔드포인트를 통해 주문 요청을 받습니다.
    • 주문 데이터를 (가상) 데이터베이스에 저장합니다.
    • 주문이 성공적으로 생성되었음을 알리는 OrderEvent를 생성하여 order-events 카프카 토픽에 발행합니다.
  3. 알림 서비스 (Notification Service - Consumer 역할):
    • order-events 토픽을 구독하고 있습니다.
    • 새로운 OrderEvent 메시지를 수신하면, 해당 주문 정보를 바탕으로 사용자에게 이메일이나 SMS를 발송하는 로직을 수행합니다(여기서는 로그 출력으로 대체).

이 구조에서 주문 서비스와 알림 서비스는 서로를 전혀 알지 못합니다. 오직 카프카 토픽을 통해서만 상호작용하므로, 한쪽 서비스가 장애가 나거나 배포 중이더라도 다른 서비스는 영향을 받지 않는 느슨한 결합(Loose Coupling)이 달성됩니다.

5.2. 주문 서비스(Producer) 구현

먼저 주문 요청을 받을 REST 컨트롤러를 생성합니다. controller 패키지를 만들고 OrderController.java를 작성합니다.


package com.example.kafkamicorservice.controller;

import com.example.kafkamicorservice.event.OrderEvent;
import com.example.kafkamicorservice.service.EventProducerService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("/orders")
public class OrderController {

    private final EventProducerService producerService;

    public OrderController(EventProducerService producerService) {
        this.producerService = producerService;
    }

    // 주문 요청을 위한 DTO
    public static class CreateOrderRequest {
        private String product;
        private int quantity;
        
        // Getters and Setters
        public String getProduct() { return product; }
        public void setProduct(String product) { this.product = product; }
        public int getQuantity() { return quantity; }
        public void setQuantity(int quantity) { this.quantity = quantity; }
    }

    @PostMapping
    public ResponseEntity<String> createOrder(@RequestBody CreateOrderRequest request) {
        // 1. (가상) 데이터베이스에 주문 저장 로직
        String orderId = UUID.randomUUID().toString();
        System.out.println("Order saved to DB with ID: " + orderId);

        // 2. 카프카에 이벤트 발행
        OrderEvent event = new OrderEvent(
                orderId,
                request.getProduct(),
                request.getQuantity(),
                System.currentTimeMillis()
        );
        producerService.sendOrderEvent(event);

        return ResponseEntity.ok("Order created successfully with ID: " + orderId);
    }
}

이 컨트롤러는 /orders 경로로 POST 요청을 받습니다. 요청 본문(body)에 포함된 상품명과 수량을 바탕으로 고유한 주문 ID를 생성하고, 이를 포함한 OrderEvent를 만들어 EventProducerService를 통해 카프카로 전송합니다. EventProducerService는 3장에서 이미 구현했습니다.

5.3. 알림 서비스(Consumer) 구현

알림 서비스의 역할은 4장에서 구현한 EventConsumerService가 그대로 수행합니다. `order-events` 토픽을 리스닝하다가 OrderEvent가 들어오면 로그를 출력하는 로직이 이미 작성되어 있습니다. 실제 시스템이라면 로그 출력 대신 이메일 발송 라이브러리나 SMS 게이트웨이 연동 코드가 위치하게 될 것입니다.

다시 한번 EventConsumerService.java의 코드를 살펴보겠습니다.


package com.example.kafkamicorservice.service;

// ... (imports)

@Service
public class EventConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumerService.class);

    @KafkaListener(topics = "order-events",
                   groupId = "${spring.kafka.consumer.group-id}",
                   containerFactory = "orderEventKafkaListenerContainerFactory")
    public void consumeOrderEvent(@Payload OrderEvent event,
                                  // ... (other parameters)
                                  Acknowledgment acknowledgment) {
        
        logger.info("Received Order Event: orderId={}", event.getOrderId());
        
        try {
            // --- 알림 서비스의 핵심 비즈니스 로직 ---
            logger.info("Sending notification for order [{}]: Product '{}', Quantity '{}'", 
                event.getOrderId(), event.getProduct(), event.getQuantity());

            // 여기에 실제 이메일 발송 또는 SMS 전송 로직이 들어갑니다.
            // sendEmail(toUser, "Order Confirmation", createEmailBody(event));

            // 성공적으로 알림을 보냈다고 가정
            acknowledgment.acknowledge();
            logger.info("Notification sent and message acknowledged.");

        } catch (Exception e) {
            logger.error("Failed to send notification for order [{}]. Error: {}", event.getOrderId(), e.getMessage());
            // 에러 발생 시 커밋하지 않음 (재시도 또는 DLT로 이동)
        }
    }
}

5.4. End-to-End 테스트 및 동작 확인

이제 모든 준비가 끝났습니다. 애플리케이션을 실행하고 전체 흐름이 정상적으로 동작하는지 확인해 보겠습니다.

  1. 카프카 클러스터 실행: docker-compose up -d 명령으로 카프카와 주키퍼가 실행 중인지 확인합니다.
  2. 스프링 부트 애플리케이션 실행: IDE에서 메인 애플리케이션 클래스를 실행합니다.
  3. 주문 생성 요청: cURL이나 Postman과 같은 API 테스트 도구를 사용하여 주문 서비스에 요청을 보냅니다.

    cURL 명령어 예시:

    
        curl -X POST http://localhost:8080/orders \
        -H "Content-Type: application/json" \
        -d '{
            "product": "Kafka in Action Book",
            "quantity": 2
        }'
        
  4. 콘솔 로그 확인:

    요청을 보내면 애플리케이션의 콘솔 로그에 다음과 같은 순서로 로그가 출력되는 것을 확인할 수 있습니다.

    (1) 주문 서비스(OrderController) 로그:

    Order saved to DB with ID: 123e4567-e89b-12d3-a456-426614174000

    (2) 프로듀서(EventProducerService) 로그:

    Sent message=[OrderEvent(orderId=123...)] with offset=[0] on partition=[2]

    (3) 컨슈머(EventConsumerService) 로그:

    Received Order Event: orderId=123e4567-e89b-12d3-a456-426614174000
    Sending notification for order [123e...]: Product 'Kafka in Action Book', Quantity '2'
    Notification sent and message acknowledged.

이 로그를 통해 사용자의 HTTP 요청이 주문 서비스에 의해 처리되고, 카프카를 통해 이벤트가 비동기적으로 알림 서비스에 전달되어 독립적인 로직이 수행되었음을 명확하게 확인할 수 있습니다. 이것이 바로 이벤트 기반 마이크로서비스 아키텍처의 핵심 동작 방식입니다.

6장: 더 나아가기 - 고급 주제 및 운영 고려사항

지금까지 구현한 내용만으로도 기본적인 마이크로서비스를 구축할 수 있지만, 실제 운영 환경에서는 데이터 정합성, 성능, 안정성 등을 위해 더 많은 요소들을 고려해야 합니다. 이 장에서는 시스템을 한 단계 더 발전시킬 수 있는 몇 가지 고급 주제를 소개합니다.

6.1. 멱등성 프로듀서 (Idempotent Producer)

네트워크 문제 등으로 인해 프로듀서가 메시지를 보낸 후 브로커로부터 정상적인 응답(ACK)을 받지 못하는 경우가 발생할 수 있습니다. 이때 프로듀서는 메시지 전송이 실패했다고 판단하고 재시도를 하게 되는데, 만약 첫 번째 메시지가 실제로는 성공적으로 브로커에 저장되었다면 재시도로 인해 동일한 메시지가 중복으로 저장될 수 있습니다.

멱등성 프로듀서는 이러한 메시지 중복을 방지하기 위한 기능입니다. application.yml에서 enable.idempotence: true로 설정하면 (이미 2장에서 설정했습니다), 프로듀서는 각 메시지에 고유한 ID(PID와 시퀀스 번호)를 부여하여 브로커로 전송합니다. 브로커는 이 ID를 확인하여 이전에 이미 처리한 메시지라면 중복으로 저장하지 않고 성공 응답만 반환합니다. 이를 통해 'At-Least-Once' 전송 보장을 'Exactly-Once'에 가깝게 (정확히는 스트림 내 중복 없는 At-Least-Once) 만들어 줍니다.

6.2. 카프카 트랜잭션과 Exactly-Once 시맨틱스

주문 서비스 예제에서 "데이터베이스에 주문을 저장"하는 작업과 "카프카에 이벤트를 발행"하는 작업은 두 개의 분리된 단계입니다. 만약 DB 저장은 성공했는데, 카프카 발행 직전에 애플리케이션이 다운된다면 어떻게 될까요? 주문은 생성되었지만 이벤트는 발생하지 않아 알림이 가지 않는 데이터 불일치 상태가 됩니다.

이 문제를 해결하기 위해 카프카 트랜잭션을 사용할 수 있습니다. 스프링에서는 @Transactional 어노테이션을 사용하여 데이터베이스 트랜잭션과 카프카 메시지 발행을 하나의 원자적인 작업으로 묶을 수 있습니다.


// OrderService.java (가상 코드)
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {
    // ... (DB Repository, Kafka Producer 주입)

    @Transactional("kafkaTransactionManager") // Kafka 트랜잭션 매니저를 지정
    public void createOrderAndPublishEvent(OrderData data) {
        // 1. 데이터베이스에 주문 저장
        orderRepository.save(data);

        // 2. 카프카에 이벤트 발행
        producerService.sendOrderEvent(new OrderEvent(...));

        // 만약 이 지점에서 예외가 발생하면?
        // DB 작업은 롤백되고, 카프카 메시지 발행도 취소(abort)됩니다.
    }
}

이 방식을 사용하려면 KafkaTransactionManager 빈을 설정해야 합니다. 트랜잭션이 성공적으로 커밋되면 DB 변경 사항이 적용되고 카프카 메시지도 발행됩니다. 만약 어느 한쪽이라도 실패하면 모든 작업이 롤백되어 데이터 정합성을 보장할 수 있습니다. 이는 'Exactly-Once Semantics'를 달성하기 위한 핵심 기술입니다.

6.3. 스키마 레지스트리를 이용한 데이터 계약 관리

JSON은 사용하기 편리하지만, 데이터의 구조(스키마)를 강제하지 않는다는 단점이 있습니다. 만약 주문 서비스에서 OrderEvent에 새로운 필드를 추가했는데, 알림 서비스는 아직 업데이트되지 않았다면 역직렬화 오류나 NullPointerException이 발생할 수 있습니다. 여러 팀이 독립적으로 서비스를 개발하는 대규모 환경에서는 이러한 '데이터 계약' 파괴가 빈번하게 발생합니다.

스키마 레지스트리(Schema Registry)Avro와 같은 스키마 정의 포맷을 사용하면 이 문제를 해결할 수 있습니다.

  • Avro: 데이터를 바이너리 형태로 직렬화하며, 데이터 자체에 스키마 정보를 포함하지 않고 스키마는 별도로 관리합니다. JSON보다 훨씬 효율적인 직렬화/역직렬화 성능과 작은 메시지 크기를 가집니다.
  • 스키마 레지스트리: Avro 스키마를 중앙에서 관리하고 버저닝하는 서버입니다. 프로듀서는 메시지를 보낼 때 스키마 레지스트리에 스키마를 등록하고, 컨슈머는 메시지를 받을 때 스키마 레지스트리에서 해당 스키마를 가져와 역직렬화에 사용합니다.

스키마 레지스트리는 스키마 변경 시 하위 호환성(Backward Compatibility)을 검사하는 규칙을 강제할 수 있어, 한 서비스의 변경이 다른 서비스에 미치는 영향을 사전에 차단하고 안정적인 데이터 파이프라인을 유지하는 데 결정적인 역할을 합니다.

6.4. 모니터링: 컨슈머 랙(Lag)과 시스템 상태 추적

운영 환경에서는 우리 시스템이 정상적으로 동작하고 있는지 지속적으로 확인해야 합니다. 카프카 기반 시스템에서 가장 중요한 모니터링 지표 중 하나는 컨슈머 랙(Consumer Lag)입니다.

컨슈머 랙이란, 특정 파티션에 마지막으로 발행된 메시지의 오프셋과 특정 컨슈머 그룹이 마지막으로 커밋한 오프셋의 차이를 의미합니다. 즉, "컨슈머가 처리해야 할 메시지가 얼마나 밀려있는가"를 나타내는 지표입니다.

랙이 지속적으로 증가한다면, 이는 메시지 발행 속도를 컨슈머의 처리 속도가 따라가지 못하고 있다는 신호입니다. 이때는 컨슈머 인스턴스를 추가하여 처리량을 늘리거나, 컨슈머의 비즈니스 로직을 최적화하는 등의 조치가 필요합니다. 스프링 부트 액추에이터(Actuator), 프로메테우스(Prometheus), 그라파나(Grafana)와 같은 도구들을 연동하여 컨슈머 랙을 비롯한 각종 카프카 지표(메시지 처리량, 브로커 상태 등)를 시각화하고 알림을 설정하는 것이 일반적입니다.

결론: 진정한 분산 시스템을 향한 첫걸음

이 글을 통해 우리는 스프링 부트와 아파치 카프카라는 강력한 두 도구를 사용하여 현대적인 이벤트 기반 마이크로서비스를 구축하는 전 과정을 살펴보았습니다. 단순한 개념 소개를 넘어, 실용적인 개발 환경 구축부터 상세한 설정, 구조화된 데이터 처리, 비동기 콜백, 신뢰성 있는 컨슈머 설계, 에러 핸들링, 그리고 실제 시나리오를 바탕으로 한 통합 예제까지 깊이 있게 다루었습니다.

스프링 부트와 카프카의 조합은 단순히 두 기술을 함께 사용하는 것을 넘어, 시스템의 회복탄력성, 확장성, 유연성을 극대화하는 시너지를 창출합니다. 서비스 간의 강한 결합을 끊어내고 비동기적인 이벤트 스트림을 통해 상호작용하게 함으로써, 우리는 장애에 더 강하고 변화에 더 빠르게 대응할 수 있는 아키텍처를 만들 수 있습니다.

물론 오늘 다룬 내용이 전부는 아닙니다. 카프카 스트림즈(Kafka Streams)를 이용한 실시간 스트림 처리, 스키마 레지스트리를 통한 엄격한 데이터 관리, 쿠버네티스(Kubernetes) 환경에서의 운영 등 더 넓고 깊은 세계가 여러분을 기다리고 있습니다. 하지만 본 글에서 다진 탄탄한 기본기는 앞으로 마주할 더 복잡한 도전들을 해결해 나가는 훌륭한 초석이 될 것입니다.

이제 여러분은 이론을 넘어, 직접 코드를 작성하고 시스템을 구축하며 진정한 분산 시스템 개발자로서의 여정을 시작할 준비가 되었습니다. 이벤트의 흐름 속에서 더 나은 소프트웨어를 만들어 나가시길 바랍니다.

Spring BootとKafkaによるイベント駆動型マイクロサービスの構築

はじめに:なぜ今、イベント駆動型アーキテクチャなのか?

現代のソフトウェア開発は、モノリシックアーキテクチャからマイクロサービスアーキテクチャへの大規模な移行の真っ只中にあります。ビジネスの要求が複雑化し、市場への迅速な対応が求められる中で、単一の巨大なアプリケーション(モノリス)は、開発速度の低下、技術的負債の増大、スケーラビリティの限界といった課題を露呈させました。マイクロサービスは、アプリケーションを独立して開発、デプロイ、スケール可能な小さなサービスの集合体として構築することで、これらの課題を解決する有力なアプローチとして登場しました。

しかし、マイクロサービスアーキテクチャは新たな課題、特に「サービス間の通信」という複雑な問題を生み出します。従来多用されてきた同期的なREST APIによる通信は、シンプルで理解しやすい反面、サービス間の密結合を生み出し、一方がダウンすると他方も影響を受ける連鎖的な障害(カスケード障害)を引き起こすリスクを孕んでいます。システム全体がリクエスト/レスポンスの鎖で繋がれ、柔軟性や回復力が損なわれるのです。

ここで脚光を浴びるのが、イベント駆動型アーキテクチャ(Event-Driven Architecture, EDA)です。EDAでは、サービスは「イベント」と呼ばれる状態変化の通知を非同期に送受信します。例えば、「注文が作成された」「商品が発送された」といったビジネス上の出来事がイベントとなります。イベントを発行するサービス(プロデューサー)は、そのイベントを誰が受け取るかを知る必要がなく、イベントを受信するサービス(コンシューマー)も、誰が発行したかを意識する必要がありません。この疎結合な性質により、各サービスは独立性を保ち、システム全体の回復力とスケーラビリティが劇的に向上します。

このイベント駆動型アーキテクチャの心臓部となるのが、Apache Kafkaです。元々はLinkedInで開発されたKafkaは、単なるメッセージキューイングシステムに留まらない、高性能な分散ストリーミングプラットフォームです。膨大な量のイベントをリアルタイムで、かつ確実に処理する能力を持ち、現代のデータ集約型アプリケーションのバックボーンとしてデファクトスタンダードの地位を確立しています。

一方、マイクロサービスの開発現場では、JavaエコシステムにおけるSpring Bootが絶大な支持を得ています。Spring Bootは、複雑な設定を自動化し、開発者がビジネスロジックに集中できる環境を提供することで、生産性を飛躍的に向上させます。「規約大設定(Convention over Configuration)」の哲学に基づき、わずかなコードで本番品質のアプリケーションを迅速に立ち上げることが可能です。

本稿では、この強力な2つの技術、Spring BootとKafkaを組み合わせ、堅牢でスケーラブルなイベント駆動型マイクロサービスを構築するための実践的な知識を体系的に解説します。Kafkaの基本的な概念から始め、環境構築、シンプルなメッセージ送受信、より実用的なカスタムオブジェクトの利用、そして本番運用に不可欠な信頼性向上のための高度な設定やエラーハンドリング戦略まで、段階的に深く掘り下げていきます。単なるコードの断片的な紹介ではなく、各機能が「なぜ」必要なのか、どのような設計思想に基づいているのかを理解することに重点を置きます。

Kafkaの核心概念を深く理解する

Spring Bootでの実装に入る前に、Kafkaがどのように動作するのか、その根幹をなす概念を正確に理解することが不可欠です。これらの概念を把握することで、設定の一つ一つが持つ意味や、問題発生時のトラブルシューティングが格段に容易になります。

Kafkaアーキテクチャの全体像

Kafkaは単一のサーバーではなく、複数のサーバーが連携して動作する分散システムです。このサーバー群をKafkaクラスタと呼び、クラスタを構成する各サーバーをブローカー(Broker)と呼びます。

  • ブローカー(Broker): Kafkaクラスタを構成する個々のサーバーです。メッセージ(Kafkaではレコードと呼びます)の受信、ディスクへの永続化、そしてコンシューマーからのリクエストに応じたレコードの送信を担当します。ブローカーを複数台用意することで、負荷分散と耐障害性を実現します。1台のブローカーがダウンしても、他のブローカーが処理を引き継ぐことでサービスを継続できます。
  • クラスタ(Cluster): 複数のブローカーが集まって形成される論理的なグループです。クラスタ全体で膨大な量のデータを処理・保存する能力を持ちます。
  • Zookeeper / KRaft: かつてKafkaは、ブローカーの管理、リーダー選出、設定情報の保持などのメタデータ管理にApache Zookeeperを必須としていました。しかし、近年ではZookeeperへの依存をなくし、Kafka自体がメタデータを管理するKRaft(Kafka Raft metadata mode)プロトコルが導入され、主流となりつつあります。KRaftモードにより、アーキテクチャが簡素化され、起動時間の短縮や管理オーバーヘッドの削減が実現されています。

データフローの基本単位:トピック、パーティション、オフセット

Kafkaにおけるデータの流れは、3つの重要な概念によって構成されています。

  • トピック(Topic): メッセージを分類するためのカテゴリ名です。ファイルシステムにおけるフォルダのようなものと考えると理解しやすいでしょう。例えば、「orders」(注文情報)、「payments」(決済情報)、「user-activity」(ユーザー行動ログ)といったトピックを作成し、関連するメッセージをグループ化します。プロデューサーは特定のトピックにメッセージを送信し、コンシューマーは特定のトピックからメッセージを購読します。
  • パーティション(Partition): トピックを分割した単位であり、Kafkaの並列処理とスケーラビリティの核となる概念です。各トピックは1つ以上のパーティションを持つことができます。プロデューサーから送信されたメッセージは、いずれか1つのパーティションに追記されます。各パーティションは順序が保証されたログ(追記型ファイル)として機能します。つまり、パーティション内ではメッセージの順序は保証されますが、トピック全体で見ると順序は保証されません。パーティションの数を増やすことで、複数のコンシューマーが同時に処理できるようになり、システム全体のスループットを向上させることができます。
  • オフセット(Offset): 各パーティション内のメッセージに割り当てられる、一意で連番のIDです。パーティションの先頭から0, 1, 2, ...と採番されます。コンシューマーは、自身がどのオフセットまで読み込んだかを記録しており、これによりどこから処理を再開すればよいかを把握します。このオフセット管理が、Kafkaの信頼性の高いメッセージングを実現する上で重要な役割を果たします。
Kafka Topic, Partitions, and Offsets

図1: トピック、パーティション、オフセットの関係

メッセージの生産と消費:プロデューサーとコンシューマー

  • プロデューサー(Producer): Kafkaのトピックにメッセージ(レコード)を書き込むクライアントアプリケーションです。プロデューサーは、どのトピックのどのパーティションにメッセージを送信するかを決定します。特定のキーを指定して送信した場合、同じキーを持つメッセージは常に同じパーティションに送られることが保証されます(キーのハッシュ値に基づいてパーティションが決定されるため)。これにより、特定のエンティティ(例:特定のユーザーID)に関するイベントの順序性を保証することができます。
  • コンシューマー(Consumer): 1つ以上のトピックを購読し、そこに書き込まれたメッセージを読み出して処理するクライアントアプリケーションです。コンシューマーは、自身がどのパーティションからメッセージを読み込むかをKafkaクラスタと協調して決定し、読み込んだメッセージのオフセットを記録(コミット)します。

スケーラビリティと耐障害性の鍵:コンシューマーグループ

コンシューマーグループ(Consumer Group)は、同じ目的で同じトピックを購読するコンシューマーの集合です。これはKafkaの非常に強力な機能であり、2つの主要な目的を果たします。

  1. 負荷分散(Load Balancing): あるトピックのパーティションは、コンシューマーグループ内のただ1つのコンシューマーにのみ割り当てられます。例えば、4つのパーティションを持つトピックを、2つのコンシューマーからなるグループが購読する場合、各コンシューマーは2つのパーティションを担当します。もしコンシューマーを4つに増やせば、各コンシューマーが1つのパーティションを担当することになり、処理が並列化されスループットが向上します。
  2. 耐障害性(Fault Tolerance): グループ内のあるコンシューマーが障害で停止した場合、Kafkaは自動的に検知し、そのコンシューマーが担当していたパーティションをグループ内の他の生きているコンシューマーに再割り当てします。このプロセスをリバランス(Rebalance)と呼びます。リバランスにより、一部のコンシューマーがダウンしてもメッセージ処理が滞ることなく継続されます。
Kafka Consumer Group

図2: コンシューマーグループによる負荷分散

これらの基本概念を念頭に置くことで、次の章以降で登場するSpring Bootのコードや設定が、Kafkaのどの部分と連携しているのかを明確に理解できるようになります。

開発環境の準備とSpring Bootプロジェクトの初期設定

理論を学んだ後は、実際に手を動かして環境を構築します。ここでは、ローカル開発環境にDockerを使ってKafkaを立ち上げ、Spring Initializrでマイクロサービスの雛形を作成する手順を説明します。

DockerによるKafka環境の構築

Kafkaクラスタをローカルマシンに手動でインストールするのは手間がかかります。そこで、DockerとDocker Composeを利用して、必要なコンポーネントをコンテナとして簡単に起動する方法が推奨されます。

まず、プロジェクトのルートディレクトリに docker-compose.yml という名前のファイルを作成し、以下の内容を記述します。ここでは、Zookeeperと1台のKafkaブローカーを起動する最もシンプルな構成を採用します。


version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

このファイルを作成したら、ターミナルで以下のコマンドを実行してコンテナを起動します。


docker-compose up -d

-d オプションはバックグラウンドでの実行を意味します。docker ps コマンドで `zookeeper` と `kafka` の2つのコンテナが正常に起動していることを確認してください。これで、ローカルホストの9092ポート(localhost:9092)からアクセス可能なKafkaブローカーが準備できました。

Spring Initializrによるプロジェクト生成

次に、Spring Bootプロジェクトを作成します。Webブラウザで Spring Initializr にアクセスし、以下の設定でプロジェクトを生成します。今回は、メッセージを送信する「プロデューサーサービス」と、受信する「コンシューマーサービス」の2つのプロジェクトを作成します。

共通の設定:

  • Project: Maven Project
  • Language: Java
  • Spring Boot: 3.x.x (安定版を選択)
  • Group: com.example
  • Packaging: Jar
  • Java: 17 or later

プロデューサーサービス (`producer-service`) の設定:

  • Artifact: producer-service
  • Dependencies:
    • Spring Web
    • Spring for Apache Kafka

コンシューマーサービス (`consumer-service`) の設定:

  • Artifact: consumer-service
  • Dependencies:
    • Spring Web (動作確認用のAPIを設ける場合)
    • Spring for Apache Kafka

「GENERATE」ボタンをクリックして、それぞれのプロジェクトのzipファイルをダウンロードし、任意の場所に展開してIDE(IntelliJ IDEA, Eclipseなど)で開きます。

基本的なKafka接続設定

プロジェクトが作成できたら、次はSpring BootアプリケーションがKafkaブローカーに接続するための設定を行います。各プロジェクトの src/main/resources/application.properties ファイルに以下の設定を追記します。

`producer-service` の `application.properties`:


# サーバーのポート番号(コンシューマーと被らないように変更)
server.port=8080

# Kafkaブローカーの接続先
spring.kafka.bootstrap-servers=localhost:9092

# プロデューサー固有の設定
# 送信するメッセージのキーとバリューのシリアライザを指定
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

`consumer-service` の `application.properties`:


# サーバーのポート番号
server.port=8081

# Kafkaブローカーの接続先
spring.kafka.bootstrap-servers=localhost:9092

# コンシューマー固有の設定
spring.kafka.consumer.group-id=my-group
# 受信するメッセージのキーとバリューのデシリアライザを指定
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

各設定項目の意味は以下の通りです。

  • server.port: Webサーバーのポート番号です。2つのサービスを同時に起動するため、異なるポート番号を割り当てます。
  • spring.kafka.bootstrap-servers: 接続先のKafkaブローカーのアドレスリストです。Dockerで起動したKafkaを指定しています。
  • spring.kafka.producer.*-serializer: プロデューサーがメッセージをKafkaに送信する際に、Javaオブジェクトをバイト配列に変換(シリアライズ)するためのクラスを指定します。ここではキーもバリューも単純な文字列として扱うため、StringSerializer を使用します。
  • spring.kafka.consumer.group-id: このコンシューマーが属するコンシューマーグループのIDです。
  • spring.kafka.consumer.*-deserializer: コンシューマーがKafkaからメッセージを受信する際に、バイト配列をJavaオブジェクトに変換(デシリアライズ)するためのクラスを指定します。ここでは StringDeserializer を使用します。

これで、2つのマイクロサービスがKafkaと通信するための基本的な準備が整いました。次の章では、実際にメッセージを送受信するコードを実装していきます。

基本的なメッセージングの実装:プロデューサーとコンシューマー

環境設定が完了したので、いよいよKafkaを使ったメッセージングを実装します。ここでは、最も基本的な文字列メッセージの送受信を行います。

Kafkaプロデューサーの実装:メッセージを送信する

producer-serviceプロジェクトで、メッセージをKafkaに送信するロジックを実装します。Spring for Kafkaは、KafkaTemplateという便利なクラスを提供しており、これを使うことでメッセージ送信を非常に簡単に行うことができます。

まず、メッセージ送信を責務とするサービスクラスを作成します。

com.example.producerservice.service.KafkaProducerService.java:


package com.example.producerservice.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class KafkaProducerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaProducerService.class);
    private static final String TOPIC_NAME = "simple-messages";

    private final KafkaTemplate<String, String> kafkaTemplate;

    // コンストラクタインジェクションでKafkaTemplateを受け取る
    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        log.info("Sending message to Kafka: {}", message);
        
        // KafkaTemplateのsendメソッドでメッセージを非同期に送信
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);

        // 送信結果をハンドリング (非同期)
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                // 送信成功
                log.info("Sent message=[{}] with offset=[{}] to partition=[{}]",
                        message,
                        result.getRecordMetadata().offset(),
                        result.getRecordMetadata().partition());
            } else {
                // 送信失敗
                log.error("Unable to send message=[{}] due to : {}", message, ex.getMessage());
            }
        });
    }
}

このコードのポイントは以下の通りです。

  • KafkaTemplate<String, String>: Spring Bootの自動設定機能により、application.propertiesの設定に基づいてKafkaTemplateのBeanが自動的に生成され、DIコンテナに登録されます。コンストラクタインジェクションでこれを受け取るだけで、すぐに使用できます。ジェネリクスは<Key, Value>の型を示します。
  • kafkaTemplate.send(topic, message): このメソッドを呼び出すことで、指定したトピックにメッセージを送信します。この処理は非同期で行われ、即座にCompletableFuture(以前のバージョンではListenableFuture)を返します。
  • 結果のハンドリング: whenComplete(またはaddCallback)メソッドを使って、送信が成功したか失敗したかを非同期に受け取ることができます。成功時には、メッセージが書き込まれたパーティションやオフセットといったメタデータが取得できます。本番環境では、失敗時のリトライ処理やエラー通知などをここで行うことが重要です。

次に、このサービスを呼び出すためのRESTコントローラーを作成します。

com.example.producerservice.controller.MessageController.java:


package com.example.producerservice.controller;

import com.example.producerservice.service.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    private final KafkaProducerService producerService;

    public MessageController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @PostMapping
    public String publishMessage(@RequestBody String message) {
        producerService.sendMessage(message);
        return "Message published successfully: " + message;
    }
}

これで、HTTP POSTリクエストを /api/messages に送信すると、リクエストボディの文字列がKafkaの `simple-messages` トピックに送信されるようになりました。

Kafkaコンシューマーの実装:メッセージを受信する

次に、consumer-serviceプロジェクトで、simple-messages トピックからメッセージを受信して処理するコンシューマーを実装します。

Spring for Kafkaでは、@KafkaListener アノテーションを使うことで、驚くほど簡単にコンシューマーを作成できます。

com.example.consumerservice.service.KafkaConsumerService.java:


package com.example.consumerservice.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
    
    // @KafkaListenerアノテーションを付与したメソッドがコンシューマーとなる
    @KafkaListener(topics = "simple-messages", groupId = "my-group")
    public void consume(String message) {
        log.info("Consumed message: {}", message);
        // ここで受信したメッセージに対するビジネスロジックを実行する
        // (例: データベースへの保存、別のAPIの呼び出しなど)
    }
}

このコードのポイントは、@KafkaListenerアノテーションです。

  • topics = "simple-messages": どのトピックを購読するかを指定します。
  • groupId = "my-group": このコンシューマーが属するコンシューマーグループIDを指定します。これは application.properties で設定した spring.kafka.consumer.group-id と同じ値を指定することが一般的ですが、アノテーションで上書きすることも可能です。

Spring Bootアプリケーションが起動すると、Spring for Kafkaは @KafkaListener が付与されたメソッドをスキャンし、バックグラウンドでメッセージリスナーコンテナを起動します。このコンテナがKafkaブローカーへの接続、メッセージのポーリング、そしてメッセージ受信時のメソッド呼び出しをすべて自動的に行ってくれます。開発者は、メッセージを受信した後に何をするか、というビジネスロジックにのみ集中すればよいのです。

動作確認:マイクロサービス間の非同期通信

それでは、実装した2つのサービスを連携させて動作を確認しましょう。

  1. DockerのKafkaが起動していることを確認します。
  2. consumer-service を起動します。 IDEの実行機能またはターミナルで mvn spring-boot:run を実行します。コンソールログに、Kafkaに接続し、トピックの購読を開始する旨のログが表示されるはずです。
  3. producer-service を起動します。 同様に、別のターミナルまたはIDEの機能で起動します。
  4. プロデューサーにリクエストを送信します。 ターミナルから curl コマンドを使うか、PostmanなどのAPIクライアントツールを使って、producer-serviceにPOSTリクエストを送信します。

curl -X POST -H "Content-Type: text/plain" -d "Hello, Kafka from Spring Boot!" http://localhost:8080/api/messages

このコマンドを実行すると、以下のような流れで処理が行われます。

  1. producer-serviceMessageController がリクエストを受け取ります。
  2. KafkaProducerServicesendMessage メソッドが呼び出され、メッセージ "Hello, Kafka from Spring Boot!" がKafkaの `simple-messages` トピックに送信されます。
  3. producer-service のコンソールには "Sending message..." と "Sent message..." のログが出力されます。
  4. ほぼ同時に、Kafkaブローカーを介してメッセージが consumer-service に配信されます。
  5. consumer-serviceKafkaConsumerServiceconsume メソッドが呼び出され、コンソールに "Consumed message: Hello, Kafka from Spring Boot!" というログが出力されます。

これにより、HTTPリクエストを起点として、2つの独立したマイクロサービスがKafkaを介して非同期に通信できることが確認できました。プロデューサーはレスポンスを待つことなく処理を完了し、コンシューマーは自身のタイミングでメッセージを処理します。これがイベント駆動型アーキテクチャの基本的な動作です。

実用的なデータ交換:JSONとカスタムオブジェクトの利用

なぜ単純な文字列では不十分なのか?

前章では文字列メッセージの送受信を実装しましたが、実際のアプリケーションでは、より構造化されたデータを扱うことがほとんどです。例えば、注文情報を送信する場合、「注文ID」「商品ID」「数量」「顧客名」といった複数の属性を持つデータをやり取りする必要があります。これを単一の文字列として扱うのは非効率で、解析も面倒であり、エラーの温床となります。

そこで、JavaオブジェクトをJSON形式に変換(シリアライズ)して送信し、受信側でJSONからJavaオブジェクトに復元(デシリアライズ)する方法が一般的に用いられます。これにより、型安全性を保ちながら、構造化されたデータをサービス間で柔軟に交換できます。

Jacksonを利用したJSONシリアライズ/デシリアライズ

Spring Bootは、JSONの扱いに長けたライブラリであるJacksonを標準でサポートしています。Spring for KafkaもJacksonとシームレスに連携するためのシリアライザ/デシリアライザを提供しています。

1. 共通のデータ転送オブジェクト(DTO)の作成

まず、プロデューサーとコンシューマーの両方で利用するデータ構造を定義したクラス(DTO: Data Transfer Object)を作成します。両方のサービスで同じクラス定義が必要になるため、共通のライブラリとして管理するのが理想的ですが、ここでは簡単のため、各プロジェクトに同じクラスを作成します。

producer-serviceconsumer-service の両方に、以下の OrderEvent.java を作成します。

com.example.xxx.dto.OrderEvent.java (xxxはproducerserviceまたはconsumerservice):


package com.example.producerservice.dto; // パッケージ名は適宜変更

import java.math.BigDecimal;
import java.time.Instant;

// Lombokを使うとより簡潔に書けるが、ここでは手動で実装
public class OrderEvent {
    private String orderId;
    private String productId;
    private int quantity;
    private BigDecimal price;
    private String customerId;
    private Instant timestamp;

    // デシリアライズのためにデフォルトコンストラクタが必須
    public OrderEvent() {
    }

    // ゲッター、セッター、toStringなどを記述...
    // (ここでは省略)
}

注意: JacksonがJSONからオブジェクトにデシリアライズする際には、デフォルトコンストラクタが必要ですので、必ず定義してください。

2. application.propertiesの変更

次に、Kafkaが送受信するデータのシリアライザ/デシリアライザを、文字列用からJSON用に変更します。application.propertiesを以下のように更新します。

`producer-service` の `application.properties`:


# ... (server.port, spring.kafka.bootstrap-serversは同じ) ...
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# ValueのシリアライザをJsonSerializerに変更
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

`consumer-service` の `application.properties`:


# ... (server.port, spring.kafka.bootstrap-serversは同じ) ...
spring.kafka.consumer.group-id=order-events-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# ValueのデシリアライザをJsonDeserializerに変更
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

# JsonDeserializerがどのクラスに変換するかを指定
spring.kafka.consumer.properties.spring.json.trusted.packages=*
# デフォルトでヘッダーから型情報を推測するが、明示的に指定することも可能
# spring.kafka.consumer.properties.spring.json.value.default.type=com.example.consumerservice.dto.OrderEvent

コンシューマー側の設定で重要なのは spring.kafka.consumer.properties.spring.json.trusted.packages=* です。これはセキュリティ上の理由から、デシリアライズを許可するパッケージを指定するものです。開発中は * で全てのパッケージを許可しても問題ありませんが、本番環境では com.example.dto のように具体的なパッケージ名を指定することが推奨されます。

カスタムオブジェクトを送受信する実装例

設定を変更したら、次はコードを修正して OrderEvent オブジェクトを扱えるようにします。

プロデューサー側の修正

KafkaTemplate のジェネリクスを <String, OrderEvent> に変更し、コントローラーもJSONを受け取るように修正します。

com.example.producerservice.service.KafkaProducerService.java:


// ...
import com.example.producerservice.dto.OrderEvent;
// ...

@Service
public class KafkaProducerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaProducerService.class);
    private static final String TOPIC_NAME = "order-events";

    // Valueの型をOrderEventに変更
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrderEvent(OrderEvent orderEvent) {
        log.info("Sending order event to Kafka: {}", orderEvent);
        // orderIdをキーとして送信することで、同じ注文IDのイベントは同じパーティションに送られる
        CompletableFuture<SendResult<String, OrderEvent>> future = kafkaTemplate.send(TOPIC_NAME, orderEvent.getOrderId(), orderEvent);

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("Sent order event=[{}] with offset=[{}]", orderEvent, result.getRecordMetadata().offset());
            } else {
                log.error("Unable to send order event=[{}] due to : {}", orderEvent, ex.getMessage());
            }
        });
    }
}

com.example.producerservice.controller.OrderController.java (新規作成):


package com.example.producerservice.controller;

import com.example.producerservice.dto.OrderEvent;
import com.example.producerservice.service.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final KafkaProducerService producerService;

    public OrderController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @PostMapping
    public String publishOrderEvent(@RequestBody OrderEvent orderEvent) {
        producerService.sendOrderEvent(orderEvent);
        return "Order event published successfully.";
    }
}

注目すべきは、kafkaTemplate.send メソッドで第2引数にキー(orderEvent.getOrderId())を指定している点です。これにより、Kafkaはキーのハッシュ値に基づいて送信先のパーティションを決定するため、同じ注文IDに関するイベントは必ず同じパーティションに送られ、処理の順序性が保証されます。

コンシューマー側の修正

リスナーメソッドの引数の型を OrderEvent に変更するだけです。

com.example.consumerservice.service.KafkaConsumerService.java:


// ...
import com.example.consumerservice.dto.OrderEvent;
// ...

@Service
public class KafkaConsumerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);

    @KafkaListener(topics = "order-events", groupId = "order-events-group")
    public void consumeOrderEvent(OrderEvent orderEvent) {
        log.info("Consumed order event: {}", orderEvent);
        // 在庫更新、通知送信などのビジネスロジックを実行
    }
}

Spring for KafkaとJacksonが連携し、受信したJSONバイト配列を自動的に OrderEvent オブジェクトにデシリアライズしてメソッドに渡してくれます。開発者は面倒な変換処理を意識することなく、型安全なオブジェクトとしてデータを扱うことができます。

動作確認

再度両方のサービスを起動し、今度は /api/orders エンドポイントにJSON形式でリクエストを送信します。


curl -X POST -H "Content-Type: application/json" -d '{
  "orderId": "ORD-12345",
  "productId": "PROD-ABC",
  "quantity": 2,
  "price": 25.50,
  "customerId": "CUST-001",
  "timestamp": "2023-10-27T10:00:00Z"
}' http://localhost:8080/api/orders

consumer-service のコンソールに、受信した OrderEvent オブジェクトの内容がログとして出力されれば成功です。これで、より実践的なマイクロサービス間通信の基盤が整いました。

信頼性の高いメッセージングのための詳細設定

これまでは基本的なメッセージ送受信を実装してきましたが、本番環境で稼働するシステムでは、「メッセージが失われないこと」「メッセージが重複処理されないこと」「エラーが発生した際に適切に対処できること」といった信頼性が極めて重要になります。この章では、Spring BootとKafkaで信頼性の高いメッセージングを実現するための詳細な設定について解説します。

プロデューサーの信頼性設定:メッセージ損失を防ぐ

プロデューサーが送信したメッセージが、ネットワーク障害やブローカーのダウンなどによって失われるリスクを低減するための設定です。

`producer-service` の `application.properties` に追記:


# ... (既存の設定) ...

# 1. 送信完了の確認レベル (acks)
# all: リーダーとすべてのISR(In-Sync Replica)に書き込まれたら完了とみなす。最も信頼性が高い。
# 1: リーダーに書き込まれたら完了とみなす(デフォルト)。
# 0: 確認を待たない。最も高速だがメッセージ損失のリスクが最も高い。
spring.kafka.producer.acks=all

# 2. リトライ設定
# 送信失敗時にリトライする回数。
# 一時的なネットワークエラーなどからの回復に有効。
spring.kafka.producer.retries=3

# 3. べき等プロデューサー (Idempotent Producer)
# これをtrueにすると、リトライによって同じメッセージが重複して送信されるのを防ぐ。
# 内部的にシーケンス番号を使って重複を排除する。
# acks=all, retries > 0 と一緒に使うことが必須。
spring.kafka.producer.properties.enable.idempotence=true
  • acks=all: プロデューサーは、メッセージがリーダーパーティションだけでなく、設定されたすべての同期レプリカ(ISR)にも書き込まれたことを確認してから送信完了とみなします。これにより、リーダーブローカーがクラッシュしてもメッセージが失われることはありません。ただし、レイテンシは若干増加します。
  • retries: ブローカーからエラーが返ってきた場合(例:リーダーが一時的に利用不能)、プロデューサーが自動的に再送を試みる回数です。
  • enable.idempotence=true: べき等性を有効にすると、プロデューサーは各メッセージに一意のIDを付与します。リトライによってメッセージが再送された場合でも、ブローカー側でIDを確認し、重複していればそれを破棄します。これにより、「At-Least-Once(少なくとも1回)」の配信セマンティクスに加えて、重複を排除した「Exactly-Once(厳密に1回)」に近い挙動(プロデューサーからブローカーまでの範囲で)を実現できます。

これらの設定を組み合わせることで、プロデューサー側のメッセージ損失や重複のリスクを大幅に削減できます。

コンシューマーのメッセージ処理保証:重複処理とデータ一貫性

コンシューマー側では、メッセージを確実に処理し、かつ障害発生時に処理が重複したりデータが失われたりしないようにすることが課題となります。その鍵となるのがオフセットコミットの管理です。

オフセットコミットの仕組みと課題

コンシューマーは、どこまでメッセージを読み込んだかをオフセットとしてKafkaに記録(コミット)します。デフォルト設定(enable.auto.commit=true)では、コンシューマーは一定間隔(auto.commit.interval.ms, デフォルト5秒)で自動的にオフセットをコミットします。

この自動コミットには問題があります。例えば、コンシューマーがメッセージを取得し、ビジネスロジック(例:DBへの書き込み)を実行している最中に、オフセットの自動コミットが発生したとします。その直後にコンシューマーがクラッシュした場合、DBへの書き込みは完了していませんが、オフセットはコミットされてしまっています。再起動後、コンシューマーはコミットされたオフセットの次から処理を再開するため、クラッシュ前に処理していたメッセージは失われてしまいます(At-Most-Once)

手動コミットによるAt-Least-Onceの実現

この問題を解決するには、オフセットコミットを手動で行います。ビジネスロジックが正常に完了したことを確認してから、アプリケーションのコード内で明示的にコミットするのです。

`consumer-service` の `application.properties` を変更:


# ... (既存の設定) ...

# オフセットの自動コミットを無効にする
spring.kafka.consumer.enable-auto-commit=false

# コンテナ管理の手動コミットモードを設定
# RECORD: メッセージを1件処理するごとにコミット
# BATCH: ポーリングで取得した全メッセージを処理し終えたらコミット(デフォルト)
# MANUAL_IMMEDIATE: Acknowledgment.acknowledge()を呼び出すと即座に同期コミット
spring.kafka.listener.ack-mode=RECORD

`KafkaConsumerService.java` を修正:


// ...
import org.springframework.kafka.support.Acknowledgment;
// ...

@Service
public class KafkaConsumerService {
    // ...
    @KafkaListener(topics = "order-events", groupId = "order-events-group")
    public void consumeOrderEvent(OrderEvent orderEvent, Acknowledgment ack) {
        try {
            log.info("Consumed order event: {}", orderEvent);
            // 重要なビジネスロジック
            // ...
            
            // 処理が正常に完了したら、手動でコミット
            ack.acknowledge();
            log.info("Offset committed successfully.");

        } catch (Exception e) {
            log.error("Error processing order event: {}", orderEvent, e);
            // エラーが発生した場合、コミットしないことでメッセージが再処理される
            // ここでリトライ処理やDLQへの送信などを検討する
        }
    }
}

ack-mode を設定し、リスナーメソッドの引数に Acknowledgment を追加することで、手動コミットが可能になります。ack.acknowledge() を呼び出すと、コンテナがオフセットをコミットします。もしビジネスロジックの途中で例外が発生して acknowledge() が呼ばれずにメソッドが終了した場合、オフセットはコミットされません。コンシューマーが再起動(またはリバランス後)すると、同じメッセージが再度配信されます。これにより、メッセージが失われることはなくなりますが、代わりに同じメッセージが重複して処理される可能性(At-Least-Once)が生まれます。そのため、ビジネスロジック側で重複処理を許容できるように(べき等に)設計することが重要になります。

高度なエラーハンドリングとデッドレターキュー(DLQ)

メッセージの処理中に回復不可能なエラー(例:データ形式の不正、関連データが存在しない)が発生した場合、単純にリトライを繰り返しても成功しません。このようなメッセージは、無限にリトライされ続け、後続の正常なメッセージの処理を妨げてしまいます(ブロッキング)。

この問題に対する一般的な解決策が、デッドレターキュー(Dead-Letter Queue, DLQ)パターンです。処理に失敗したメッセージを、通常のトピックとは別の専用トピック(DLQトピック)に転送し、後で開発者が原因を調査したり、手動で再処理したりできるようにする仕組みです。

Spring for Kafkaは、DLQパターンを簡単に実装するための機能を提供しています。

1. DLQ用のトピックを準備する

DLQメッセージを格納するためのトピック(例:order-events.DLT)をあらかじめ作成しておくか、後述するKafkaAdminで自動生成するように設定します。

2. エラーハンドラとDLQの設定

まず、application.propertiesでDLQトピック名を指定します。

`consumer-service` の `application.properties` に追記:


# ...
# @DltHandlerで利用するDLT(Dead Letter Topic)のサフィックスを指定
spring.kafka.listener.dead-letter-topic-suffix=.DLT

次に、特定のエラーが発生した場合にDLQに送るための設定をJava Configで行います。

com.example.consumerservice.config.KafkaConsumerConfig.java:


package com.example.consumerservice.config;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
public class KafkaConsumerConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    
    @Bean
    public DefaultErrorHandler errorHandler(KafkaOperations<String, Object> template) {
        // リトライを2回(合計3回試行)行い、それでも失敗した場合はDLQに送信する
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                // DLQトピック名をカスタマイズするラムダ式
                (record, ex) -> {
                    log.error("Message failed after retries. Sending to DLT. Topic: {}, Partition: {}, Offset: {}",
                            record.topic(), record.partition(), record.offset());
                    return new TopicPartition(record.topic() + ".DLT", record.partition());
                }
        );

        // 1秒間隔で2回リトライ
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2));

        // リトライしない例外を指定
        errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);
        
        return errorHandler;
    }
}

この設定では、DefaultErrorHandler をカスタマイズしています。

  • FixedBackOff(1000L, 2): 1秒間隔で最大2回リトライします。
  • DeadLetterPublishingRecoverer: リトライがすべて失敗した後に呼び出されるリカバリー処理で、メッセージをDLQに送信します。
  • addNotRetryableExceptions: 特定の例外(ここではIllegalArgumentException)が発生した場合は、リトライせずに即座にDLQに送信するように設定できます。データ不備などが原因の場合に有効です。

3. DLQを購読するリスナーの実装 (オプション)

DLQに送られたメッセージを監視し、アラートを発報したり、データベースに記録したりするためのリスナーを別途作成することもできます。


// in KafkaConsumerService.java
import org.springframework.kafka.annotation.DltHandler;

// ...
    @DltHandler
    public void dltListen(Object in) {
        log.warn("Received from DLT: {}", in);
        // 通知処理など
    }

@DltHandlerアノテーションを使うと、同じリスナーコンテナ内でDLQを処理する専用のメソッドを定義できます。

これで、コンシューマーが処理できないメッセージによってシステム全体が停止することなく、問題を隔離し、安定した運用を継続するための仕組みが整いました。

高度なトピックとコンシューマー管理

基本的な信頼性設定に加えて、Spring for Kafkaは運用を効率化し、パフォーマンスを向上させるための多くの高度な機能を提供しています。

`KafkaAdmin`によるトピックの自動生成と管理

これまでは、Kafkaトピックが事前に存在することを前提としていました。しかし、アプリケーションの起動時に必要なトピックを自動的に作成・設定できると、環境構築の手間が省け、設定ミスも防げます。

Spring Bootは、クラスパス上にspring-kafkaが存在すると自動的にKafkaAdmin Beanを登録します。このKafkaAdminは、DIコンテナ内にあるNewTopic型のBeanを検出し、対応するトピックがKafkaクラスタに存在しない場合に自動で作成してくれます。

例えば、producer-serviceに以下のConfigクラスを追加します。

com.example.producerservice.config.KafkaTopicConfig.java:


package com.example.producerservice.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic orderEventsTopic() {
        return TopicBuilder.name("order-events")
                .partitions(3)       // パーティション数を3に設定
                .replicas(1)         // レプリケーション係数を1に設定(開発環境用)
                .build();
    }
    
    @Bean
    public NewTopic orderEventsDlt() {
        return TopicBuilder.name("order-events.DLT")
                .partitions(1)
                .replicas(1)
                .build();
    }
}

この設定により、producer-serviceアプリケーションが起動する際に、order-eventsトピック(パーティション3)とorder-events.DLTトピック(パーティション1)が存在しない場合は自動で作成されます。パーティション数やレプリケーション係数、その他のトピック設定(ログの保持期間など)もコードで管理できるため、インフラの設定とアプリケーションのロジックを同期させやすくなります。

コンシューマーの並列処理とスループット向上

Kafkaのスループットは、トピックのパーティション数とコンシューマーの並列度によって決まります。パーティションが複数あっても、コンシューマーがシングルスレッドで動作していては、その性能を十分に引き出せません。

Spring for Kafkaでは、ConcurrentKafkaListenerContainerFactory を利用して、1つの@KafkaListenerに対して複数のコンシューマースレッドを割り当てることができます。これにより、コンシューマーグループ内の1つのアプリケーションインスタンスが、複数のパーティションを並行して処理できるようになります。

application.propertiesで並列度を設定します。

`consumer-service` の `application.properties` に追記:


# @KafkaListenerの並列度を設定
# トピックのパーティション数以下の値を設定する
spring.kafka.listener.concurrency=3

order-eventsトピックのパーティション数を3に設定した場合、concurrency=3とすることで、アプリケーションは3つのスレッドを起動し、それぞれが1つのパーティションを担当してメッセージを並列に処理します。これにより、コンシューマーのスループットが大幅に向上します。

注意: concurrencyの値は、購読するトピックのパーティション数より大きくしても意味がありません。パーティションはコンシューマーグループ内の1つのスレッドにしか割り当てられないため、余ったスレッドは遊休状態になります。

特定のメッセージのみを処理する:メッセージフィルタリング

場合によっては、トピック内のすべてのメッセージではなく、特定の条件を満たすメッセージのみを処理したいことがあります。例えば、ヘッダーに特定のフラグが付いているメッセージや、JSONペイロード内の特定の値を持つメッセージだけを処理するケースです。

@KafkaListenerにカスタムのRecordFilterStrategyを設定することで、メッセージを受信した時点でフィルタリングを行い、条件に合わないメッセージを破棄(オフセットはコミットされる)することができます。

まず、フィルタリングロジックを実装したRecordFilterStrategyのBeanを作成します。

com.example.consumerservice.config.KafkaConsumerConfig.java (追記):


// ...
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.consumerservice.dto.OrderEvent;
// ...
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
            ConsumerFactory<String, OrderEvent> consumerFactory,
            ObjectMapper objectMapper
    ) {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        // フィルターを設定
        factory.setRecordFilterStrategy(record -> {
            // 例: 注文金額が100未満のイベントは処理しない (破棄する)
            OrderEvent event = record.value();
            return event.getPrice().doubleValue() < 100.0; // trueを返すとメッセージは破棄される
        });
        
        // trueをセットすると、破棄されたメッセージのオフセットもコミットされる
        factory.setAckDiscarded(true);

        return factory;
    }
// ...

この例では、注文金額(price)が100未満のOrderEventは処理対象外とし、破棄するフィルターを定義しています。setAckDiscarded(true)により、破棄されたメッセージも処理済みとみなされ、再配信されることはありません。

このカスタムファクトリを@KafkaListenerで指定します。

com.example.consumerservice.service.KafkaConsumerService.java (修正):


    @KafkaListener(topics = "order-events", groupId = "order-events-group", containerFactory = "kafkaListenerContainerFactory")
    public void consumeOrderEvent(OrderEvent orderEvent, Acknowledgment ack) {
        // このメソッドは、注文金額が100以上のイベントでのみ呼び出される
        log.info("Processing high-value order: {}", orderEvent);
        // ...
        ack.acknowledge();
    }

これにより、リスナーメソッドは関心のあるメッセージの処理に専念でき、コードの可読性と保守性が向上します。

まとめと次のステップへ

本稿では、Spring BootとApache Kafkaを用いて、現代的なイベント駆動型マイクロサービスを構築するための道のりを、基礎から応用まで体系的に探求してきました。

まず、Kafkaの核心概念であるトピック、パーティション、プロデューサー、コンシューマー、そしてコンシューマーグループの役割を理解し、非同期メッセージングの基盤を固めました。次に、Dockerを使ってローカル開発環境を迅速に構築し、Spring Initializrで2つのマイクロサービスの雛形を作成しました。基本的な文字列メッセージの送受信から始め、より実践的なJSON形式のカスタムオブジェクトを扱う方法へとステップアップし、型安全で構造化されたデータ交換を実現しました。

さらに、本番運用を見据え、メッセージングの信頼性を高めるための重要な設定を学びました。プロデューサー側ではacks=allとべき等性(idempotence)を有効にすることでメッセージの損失と重複を防ぎ、コンシューマー側では手動オフセットコミットによって「At-Least-Once」の処理保証を実現しました。また、回復不能なエラーに対応するためのデッドレターキュー(DLQ)パターンを導入し、システムの安定性を向上させました。

最後に、KafkaAdminによるトピックの宣言的な管理、コンシューマーの並列度設定によるスループットの最適化、メッセージフィルタリングによる効率的な処理など、運用を高度化するためのテクニックについても触れました。

Spring Bootが提供する優れた抽象化と自動設定により、開発者はKafkaの複雑な内部動作を深く意識することなく、宣言的なアノテーションや設定ファイルを通じて、その強力な機能を最大限に活用できます。この組み合わせは、疎結合で、回復力があり、スケーラブルなマイクロサービスアーキテクチャを迅速に構築するための、まさに理想的な選択肢と言えるでしょう。

ここからさらに学びを深めるための次のステップとしては、以下のようなトピックが挙げられます。

  • Kafka Streams / ksqlDB: Kafka内で直接リアルタイムなストリーム処理(集計、結合、フィルタリングなど)を行うためのライブラリ/エンジン。
  • AvroとSchema Registry: JSONよりも効率的で、スキーマの進化を管理できるデータフォーマットであるAvroと、そのスキーマを一元管理するSchema Registryの導入。
  • Kafka Connect: データベースやS3、Elasticsearchなど、様々な外部システムとKafkaをノンコーディングで連携させるためのフレームワーク。
  • セキュリティ: SASLによる認証やSSL/TLSによる通信の暗号化。
  • 監視: PrometheusやGrafanaを用いたKafkaクラスタとクライアントアプリケーションのメトリクス監視。

本稿が、あなたのイベント駆動型システム設計の一助となれば幸いです。