Wednesday, September 20, 2023

Spring BootとKafkaによるイベント駆動型マイクロサービスの構築

はじめに:なぜ今、イベント駆動型アーキテクチャなのか?

現代のソフトウェア開発は、モノリシックアーキテクチャからマイクロサービスアーキテクチャへの大規模な移行の真っ只中にあります。ビジネスの要求が複雑化し、市場への迅速な対応が求められる中で、単一の巨大なアプリケーション(モノリス)は、開発速度の低下、技術的負債の増大、スケーラビリティの限界といった課題を露呈させました。マイクロサービスは、アプリケーションを独立して開発、デプロイ、スケール可能な小さなサービスの集合体として構築することで、これらの課題を解決する有力なアプローチとして登場しました。

しかし、マイクロサービスアーキテクチャは新たな課題、特に「サービス間の通信」という複雑な問題を生み出します。従来多用されてきた同期的なREST APIによる通信は、シンプルで理解しやすい反面、サービス間の密結合を生み出し、一方がダウンすると他方も影響を受ける連鎖的な障害(カスケード障害)を引き起こすリスクを孕んでいます。システム全体がリクエスト/レスポンスの鎖で繋がれ、柔軟性や回復力が損なわれるのです。

ここで脚光を浴びるのが、イベント駆動型アーキテクチャ(Event-Driven Architecture, EDA)です。EDAでは、サービスは「イベント」と呼ばれる状態変化の通知を非同期に送受信します。例えば、「注文が作成された」「商品が発送された」といったビジネス上の出来事がイベントとなります。イベントを発行するサービス(プロデューサー)は、そのイベントを誰が受け取るかを知る必要がなく、イベントを受信するサービス(コンシューマー)も、誰が発行したかを意識する必要がありません。この疎結合な性質により、各サービスは独立性を保ち、システム全体の回復力とスケーラビリティが劇的に向上します。

このイベント駆動型アーキテクチャの心臓部となるのが、Apache Kafkaです。元々はLinkedInで開発されたKafkaは、単なるメッセージキューイングシステムに留まらない、高性能な分散ストリーミングプラットフォームです。膨大な量のイベントをリアルタイムで、かつ確実に処理する能力を持ち、現代のデータ集約型アプリケーションのバックボーンとしてデファクトスタンダードの地位を確立しています。

一方、マイクロサービスの開発現場では、JavaエコシステムにおけるSpring Bootが絶大な支持を得ています。Spring Bootは、複雑な設定を自動化し、開発者がビジネスロジックに集中できる環境を提供することで、生産性を飛躍的に向上させます。「規約大設定(Convention over Configuration)」の哲学に基づき、わずかなコードで本番品質のアプリケーションを迅速に立ち上げることが可能です。

本稿では、この強力な2つの技術、Spring BootとKafkaを組み合わせ、堅牢でスケーラブルなイベント駆動型マイクロサービスを構築するための実践的な知識を体系的に解説します。Kafkaの基本的な概念から始め、環境構築、シンプルなメッセージ送受信、より実用的なカスタムオブジェクトの利用、そして本番運用に不可欠な信頼性向上のための高度な設定やエラーハンドリング戦略まで、段階的に深く掘り下げていきます。単なるコードの断片的な紹介ではなく、各機能が「なぜ」必要なのか、どのような設計思想に基づいているのかを理解することに重点を置きます。

Kafkaの核心概念を深く理解する

Spring Bootでの実装に入る前に、Kafkaがどのように動作するのか、その根幹をなす概念を正確に理解することが不可欠です。これらの概念を把握することで、設定の一つ一つが持つ意味や、問題発生時のトラブルシューティングが格段に容易になります。

Kafkaアーキテクチャの全体像

Kafkaは単一のサーバーではなく、複数のサーバーが連携して動作する分散システムです。このサーバー群をKafkaクラスタと呼び、クラスタを構成する各サーバーをブローカー(Broker)と呼びます。

  • ブローカー(Broker): Kafkaクラスタを構成する個々のサーバーです。メッセージ(Kafkaではレコードと呼びます)の受信、ディスクへの永続化、そしてコンシューマーからのリクエストに応じたレコードの送信を担当します。ブローカーを複数台用意することで、負荷分散と耐障害性を実現します。1台のブローカーがダウンしても、他のブローカーが処理を引き継ぐことでサービスを継続できます。
  • クラスタ(Cluster): 複数のブローカーが集まって形成される論理的なグループです。クラスタ全体で膨大な量のデータを処理・保存する能力を持ちます。
  • Zookeeper / KRaft: かつてKafkaは、ブローカーの管理、リーダー選出、設定情報の保持などのメタデータ管理にApache Zookeeperを必須としていました。しかし、近年ではZookeeperへの依存をなくし、Kafka自体がメタデータを管理するKRaft(Kafka Raft metadata mode)プロトコルが導入され、主流となりつつあります。KRaftモードにより、アーキテクチャが簡素化され、起動時間の短縮や管理オーバーヘッドの削減が実現されています。

データフローの基本単位:トピック、パーティション、オフセット

Kafkaにおけるデータの流れは、3つの重要な概念によって構成されています。

  • トピック(Topic): メッセージを分類するためのカテゴリ名です。ファイルシステムにおけるフォルダのようなものと考えると理解しやすいでしょう。例えば、「orders」(注文情報)、「payments」(決済情報)、「user-activity」(ユーザー行動ログ)といったトピックを作成し、関連するメッセージをグループ化します。プロデューサーは特定のトピックにメッセージを送信し、コンシューマーは特定のトピックからメッセージを購読します。
  • パーティション(Partition): トピックを分割した単位であり、Kafkaの並列処理とスケーラビリティの核となる概念です。各トピックは1つ以上のパーティションを持つことができます。プロデューサーから送信されたメッセージは、いずれか1つのパーティションに追記されます。各パーティションは順序が保証されたログ(追記型ファイル)として機能します。つまり、パーティション内ではメッセージの順序は保証されますが、トピック全体で見ると順序は保証されません。パーティションの数を増やすことで、複数のコンシューマーが同時に処理できるようになり、システム全体のスループットを向上させることができます。
  • オフセット(Offset): 各パーティション内のメッセージに割り当てられる、一意で連番のIDです。パーティションの先頭から0, 1, 2, ...と採番されます。コンシューマーは、自身がどのオフセットまで読み込んだかを記録しており、これによりどこから処理を再開すればよいかを把握します。このオフセット管理が、Kafkaの信頼性の高いメッセージングを実現する上で重要な役割を果たします。
Kafka Topic, Partitions, and Offsets

図1: トピック、パーティション、オフセットの関係

メッセージの生産と消費:プロデューサーとコンシューマー

  • プロデューサー(Producer): Kafkaのトピックにメッセージ(レコード)を書き込むクライアントアプリケーションです。プロデューサーは、どのトピックのどのパーティションにメッセージを送信するかを決定します。特定のキーを指定して送信した場合、同じキーを持つメッセージは常に同じパーティションに送られることが保証されます(キーのハッシュ値に基づいてパーティションが決定されるため)。これにより、特定のエンティティ(例:特定のユーザーID)に関するイベントの順序性を保証することができます。
  • コンシューマー(Consumer): 1つ以上のトピックを購読し、そこに書き込まれたメッセージを読み出して処理するクライアントアプリケーションです。コンシューマーは、自身がどのパーティションからメッセージを読み込むかをKafkaクラスタと協調して決定し、読み込んだメッセージのオフセットを記録(コミット)します。

スケーラビリティと耐障害性の鍵:コンシューマーグループ

コンシューマーグループ(Consumer Group)は、同じ目的で同じトピックを購読するコンシューマーの集合です。これはKafkaの非常に強力な機能であり、2つの主要な目的を果たします。

  1. 負荷分散(Load Balancing): あるトピックのパーティションは、コンシューマーグループ内のただ1つのコンシューマーにのみ割り当てられます。例えば、4つのパーティションを持つトピックを、2つのコンシューマーからなるグループが購読する場合、各コンシューマーは2つのパーティションを担当します。もしコンシューマーを4つに増やせば、各コンシューマーが1つのパーティションを担当することになり、処理が並列化されスループットが向上します。
  2. 耐障害性(Fault Tolerance): グループ内のあるコンシューマーが障害で停止した場合、Kafkaは自動的に検知し、そのコンシューマーが担当していたパーティションをグループ内の他の生きているコンシューマーに再割り当てします。このプロセスをリバランス(Rebalance)と呼びます。リバランスにより、一部のコンシューマーがダウンしてもメッセージ処理が滞ることなく継続されます。
Kafka Consumer Group

図2: コンシューマーグループによる負荷分散

これらの基本概念を念頭に置くことで、次の章以降で登場するSpring Bootのコードや設定が、Kafkaのどの部分と連携しているのかを明確に理解できるようになります。

開発環境の準備とSpring Bootプロジェクトの初期設定

理論を学んだ後は、実際に手を動かして環境を構築します。ここでは、ローカル開発環境にDockerを使ってKafkaを立ち上げ、Spring Initializrでマイクロサービスの雛形を作成する手順を説明します。

DockerによるKafka環境の構築

Kafkaクラスタをローカルマシンに手動でインストールするのは手間がかかります。そこで、DockerとDocker Composeを利用して、必要なコンポーネントをコンテナとして簡単に起動する方法が推奨されます。

まず、プロジェクトのルートディレクトリに docker-compose.yml という名前のファイルを作成し、以下の内容を記述します。ここでは、Zookeeperと1台のKafkaブローカーを起動する最もシンプルな構成を採用します。


version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

このファイルを作成したら、ターミナルで以下のコマンドを実行してコンテナを起動します。


docker-compose up -d

-d オプションはバックグラウンドでの実行を意味します。docker ps コマンドで `zookeeper` と `kafka` の2つのコンテナが正常に起動していることを確認してください。これで、ローカルホストの9092ポート(localhost:9092)からアクセス可能なKafkaブローカーが準備できました。

Spring Initializrによるプロジェクト生成

次に、Spring Bootプロジェクトを作成します。Webブラウザで Spring Initializr にアクセスし、以下の設定でプロジェクトを生成します。今回は、メッセージを送信する「プロデューサーサービス」と、受信する「コンシューマーサービス」の2つのプロジェクトを作成します。

共通の設定:

  • Project: Maven Project
  • Language: Java
  • Spring Boot: 3.x.x (安定版を選択)
  • Group: com.example
  • Packaging: Jar
  • Java: 17 or later

プロデューサーサービス (`producer-service`) の設定:

  • Artifact: producer-service
  • Dependencies:
    • Spring Web
    • Spring for Apache Kafka

コンシューマーサービス (`consumer-service`) の設定:

  • Artifact: consumer-service
  • Dependencies:
    • Spring Web (動作確認用のAPIを設ける場合)
    • Spring for Apache Kafka

「GENERATE」ボタンをクリックして、それぞれのプロジェクトのzipファイルをダウンロードし、任意の場所に展開してIDE(IntelliJ IDEA, Eclipseなど)で開きます。

基本的なKafka接続設定

プロジェクトが作成できたら、次はSpring BootアプリケーションがKafkaブローカーに接続するための設定を行います。各プロジェクトの src/main/resources/application.properties ファイルに以下の設定を追記します。

`producer-service` の `application.properties`:


# サーバーのポート番号(コンシューマーと被らないように変更)
server.port=8080

# Kafkaブローカーの接続先
spring.kafka.bootstrap-servers=localhost:9092

# プロデューサー固有の設定
# 送信するメッセージのキーとバリューのシリアライザを指定
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

`consumer-service` の `application.properties`:


# サーバーのポート番号
server.port=8081

# Kafkaブローカーの接続先
spring.kafka.bootstrap-servers=localhost:9092

# コンシューマー固有の設定
spring.kafka.consumer.group-id=my-group
# 受信するメッセージのキーとバリューのデシリアライザを指定
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

各設定項目の意味は以下の通りです。

  • server.port: Webサーバーのポート番号です。2つのサービスを同時に起動するため、異なるポート番号を割り当てます。
  • spring.kafka.bootstrap-servers: 接続先のKafkaブローカーのアドレスリストです。Dockerで起動したKafkaを指定しています。
  • spring.kafka.producer.*-serializer: プロデューサーがメッセージをKafkaに送信する際に、Javaオブジェクトをバイト配列に変換(シリアライズ)するためのクラスを指定します。ここではキーもバリューも単純な文字列として扱うため、StringSerializer を使用します。
  • spring.kafka.consumer.group-id: このコンシューマーが属するコンシューマーグループのIDです。
  • spring.kafka.consumer.*-deserializer: コンシューマーがKafkaからメッセージを受信する際に、バイト配列をJavaオブジェクトに変換(デシリアライズ)するためのクラスを指定します。ここでは StringDeserializer を使用します。

これで、2つのマイクロサービスがKafkaと通信するための基本的な準備が整いました。次の章では、実際にメッセージを送受信するコードを実装していきます。

基本的なメッセージングの実装:プロデューサーとコンシューマー

環境設定が完了したので、いよいよKafkaを使ったメッセージングを実装します。ここでは、最も基本的な文字列メッセージの送受信を行います。

Kafkaプロデューサーの実装:メッセージを送信する

producer-serviceプロジェクトで、メッセージをKafkaに送信するロジックを実装します。Spring for Kafkaは、KafkaTemplateという便利なクラスを提供しており、これを使うことでメッセージ送信を非常に簡単に行うことができます。

まず、メッセージ送信を責務とするサービスクラスを作成します。

com.example.producerservice.service.KafkaProducerService.java:


package com.example.producerservice.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class KafkaProducerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaProducerService.class);
    private static final String TOPIC_NAME = "simple-messages";

    private final KafkaTemplate<String, String> kafkaTemplate;

    // コンストラクタインジェクションでKafkaTemplateを受け取る
    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        log.info("Sending message to Kafka: {}", message);
        
        // KafkaTemplateのsendメソッドでメッセージを非同期に送信
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);

        // 送信結果をハンドリング (非同期)
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                // 送信成功
                log.info("Sent message=[{}] with offset=[{}] to partition=[{}]",
                        message,
                        result.getRecordMetadata().offset(),
                        result.getRecordMetadata().partition());
            } else {
                // 送信失敗
                log.error("Unable to send message=[{}] due to : {}", message, ex.getMessage());
            }
        });
    }
}

このコードのポイントは以下の通りです。

  • KafkaTemplate<String, String>: Spring Bootの自動設定機能により、application.propertiesの設定に基づいてKafkaTemplateのBeanが自動的に生成され、DIコンテナに登録されます。コンストラクタインジェクションでこれを受け取るだけで、すぐに使用できます。ジェネリクスは<Key, Value>の型を示します。
  • kafkaTemplate.send(topic, message): このメソッドを呼び出すことで、指定したトピックにメッセージを送信します。この処理は非同期で行われ、即座にCompletableFuture(以前のバージョンではListenableFuture)を返します。
  • 結果のハンドリング: whenComplete(またはaddCallback)メソッドを使って、送信が成功したか失敗したかを非同期に受け取ることができます。成功時には、メッセージが書き込まれたパーティションやオフセットといったメタデータが取得できます。本番環境では、失敗時のリトライ処理やエラー通知などをここで行うことが重要です。

次に、このサービスを呼び出すためのRESTコントローラーを作成します。

com.example.producerservice.controller.MessageController.java:


package com.example.producerservice.controller;

import com.example.producerservice.service.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    private final KafkaProducerService producerService;

    public MessageController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @PostMapping
    public String publishMessage(@RequestBody String message) {
        producerService.sendMessage(message);
        return "Message published successfully: " + message;
    }
}

これで、HTTP POSTリクエストを /api/messages に送信すると、リクエストボディの文字列がKafkaの `simple-messages` トピックに送信されるようになりました。

Kafkaコンシューマーの実装:メッセージを受信する

次に、consumer-serviceプロジェクトで、simple-messages トピックからメッセージを受信して処理するコンシューマーを実装します。

Spring for Kafkaでは、@KafkaListener アノテーションを使うことで、驚くほど簡単にコンシューマーを作成できます。

com.example.consumerservice.service.KafkaConsumerService.java:


package com.example.consumerservice.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
    
    // @KafkaListenerアノテーションを付与したメソッドがコンシューマーとなる
    @KafkaListener(topics = "simple-messages", groupId = "my-group")
    public void consume(String message) {
        log.info("Consumed message: {}", message);
        // ここで受信したメッセージに対するビジネスロジックを実行する
        // (例: データベースへの保存、別のAPIの呼び出しなど)
    }
}

このコードのポイントは、@KafkaListenerアノテーションです。

  • topics = "simple-messages": どのトピックを購読するかを指定します。
  • groupId = "my-group": このコンシューマーが属するコンシューマーグループIDを指定します。これは application.properties で設定した spring.kafka.consumer.group-id と同じ値を指定することが一般的ですが、アノテーションで上書きすることも可能です。

Spring Bootアプリケーションが起動すると、Spring for Kafkaは @KafkaListener が付与されたメソッドをスキャンし、バックグラウンドでメッセージリスナーコンテナを起動します。このコンテナがKafkaブローカーへの接続、メッセージのポーリング、そしてメッセージ受信時のメソッド呼び出しをすべて自動的に行ってくれます。開発者は、メッセージを受信した後に何をするか、というビジネスロジックにのみ集中すればよいのです。

動作確認:マイクロサービス間の非同期通信

それでは、実装した2つのサービスを連携させて動作を確認しましょう。

  1. DockerのKafkaが起動していることを確認します。
  2. consumer-service を起動します。 IDEの実行機能またはターミナルで mvn spring-boot:run を実行します。コンソールログに、Kafkaに接続し、トピックの購読を開始する旨のログが表示されるはずです。
  3. producer-service を起動します。 同様に、別のターミナルまたはIDEの機能で起動します。
  4. プロデューサーにリクエストを送信します。 ターミナルから curl コマンドを使うか、PostmanなどのAPIクライアントツールを使って、producer-serviceにPOSTリクエストを送信します。

curl -X POST -H "Content-Type: text/plain" -d "Hello, Kafka from Spring Boot!" http://localhost:8080/api/messages

このコマンドを実行すると、以下のような流れで処理が行われます。

  1. producer-serviceMessageController がリクエストを受け取ります。
  2. KafkaProducerServicesendMessage メソッドが呼び出され、メッセージ "Hello, Kafka from Spring Boot!" がKafkaの `simple-messages` トピックに送信されます。
  3. producer-service のコンソールには "Sending message..." と "Sent message..." のログが出力されます。
  4. ほぼ同時に、Kafkaブローカーを介してメッセージが consumer-service に配信されます。
  5. consumer-serviceKafkaConsumerServiceconsume メソッドが呼び出され、コンソールに "Consumed message: Hello, Kafka from Spring Boot!" というログが出力されます。

これにより、HTTPリクエストを起点として、2つの独立したマイクロサービスがKafkaを介して非同期に通信できることが確認できました。プロデューサーはレスポンスを待つことなく処理を完了し、コンシューマーは自身のタイミングでメッセージを処理します。これがイベント駆動型アーキテクチャの基本的な動作です。

実用的なデータ交換:JSONとカスタムオブジェクトの利用

なぜ単純な文字列では不十分なのか?

前章では文字列メッセージの送受信を実装しましたが、実際のアプリケーションでは、より構造化されたデータを扱うことがほとんどです。例えば、注文情報を送信する場合、「注文ID」「商品ID」「数量」「顧客名」といった複数の属性を持つデータをやり取りする必要があります。これを単一の文字列として扱うのは非効率で、解析も面倒であり、エラーの温床となります。

そこで、JavaオブジェクトをJSON形式に変換(シリアライズ)して送信し、受信側でJSONからJavaオブジェクトに復元(デシリアライズ)する方法が一般的に用いられます。これにより、型安全性を保ちながら、構造化されたデータをサービス間で柔軟に交換できます。

Jacksonを利用したJSONシリアライズ/デシリアライズ

Spring Bootは、JSONの扱いに長けたライブラリであるJacksonを標準でサポートしています。Spring for KafkaもJacksonとシームレスに連携するためのシリアライザ/デシリアライザを提供しています。

1. 共通のデータ転送オブジェクト(DTO)の作成

まず、プロデューサーとコンシューマーの両方で利用するデータ構造を定義したクラス(DTO: Data Transfer Object)を作成します。両方のサービスで同じクラス定義が必要になるため、共通のライブラリとして管理するのが理想的ですが、ここでは簡単のため、各プロジェクトに同じクラスを作成します。

producer-serviceconsumer-service の両方に、以下の OrderEvent.java を作成します。

com.example.xxx.dto.OrderEvent.java (xxxはproducerserviceまたはconsumerservice):


package com.example.producerservice.dto; // パッケージ名は適宜変更

import java.math.BigDecimal;
import java.time.Instant;

// Lombokを使うとより簡潔に書けるが、ここでは手動で実装
public class OrderEvent {
    private String orderId;
    private String productId;
    private int quantity;
    private BigDecimal price;
    private String customerId;
    private Instant timestamp;

    // デシリアライズのためにデフォルトコンストラクタが必須
    public OrderEvent() {
    }

    // ゲッター、セッター、toStringなどを記述...
    // (ここでは省略)
}

注意: JacksonがJSONからオブジェクトにデシリアライズする際には、デフォルトコンストラクタが必要ですので、必ず定義してください。

2. application.propertiesの変更

次に、Kafkaが送受信するデータのシリアライザ/デシリアライザを、文字列用からJSON用に変更します。application.propertiesを以下のように更新します。

`producer-service` の `application.properties`:


# ... (server.port, spring.kafka.bootstrap-serversは同じ) ...
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# ValueのシリアライザをJsonSerializerに変更
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

`consumer-service` の `application.properties`:


# ... (server.port, spring.kafka.bootstrap-serversは同じ) ...
spring.kafka.consumer.group-id=order-events-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# ValueのデシリアライザをJsonDeserializerに変更
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

# JsonDeserializerがどのクラスに変換するかを指定
spring.kafka.consumer.properties.spring.json.trusted.packages=*
# デフォルトでヘッダーから型情報を推測するが、明示的に指定することも可能
# spring.kafka.consumer.properties.spring.json.value.default.type=com.example.consumerservice.dto.OrderEvent

コンシューマー側の設定で重要なのは spring.kafka.consumer.properties.spring.json.trusted.packages=* です。これはセキュリティ上の理由から、デシリアライズを許可するパッケージを指定するものです。開発中は * で全てのパッケージを許可しても問題ありませんが、本番環境では com.example.dto のように具体的なパッケージ名を指定することが推奨されます。

カスタムオブジェクトを送受信する実装例

設定を変更したら、次はコードを修正して OrderEvent オブジェクトを扱えるようにします。

プロデューサー側の修正

KafkaTemplate のジェネリクスを <String, OrderEvent> に変更し、コントローラーもJSONを受け取るように修正します。

com.example.producerservice.service.KafkaProducerService.java:


// ...
import com.example.producerservice.dto.OrderEvent;
// ...

@Service
public class KafkaProducerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaProducerService.class);
    private static final String TOPIC_NAME = "order-events";

    // Valueの型をOrderEventに変更
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrderEvent(OrderEvent orderEvent) {
        log.info("Sending order event to Kafka: {}", orderEvent);
        // orderIdをキーとして送信することで、同じ注文IDのイベントは同じパーティションに送られる
        CompletableFuture<SendResult<String, OrderEvent>> future = kafkaTemplate.send(TOPIC_NAME, orderEvent.getOrderId(), orderEvent);

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("Sent order event=[{}] with offset=[{}]", orderEvent, result.getRecordMetadata().offset());
            } else {
                log.error("Unable to send order event=[{}] due to : {}", orderEvent, ex.getMessage());
            }
        });
    }
}

com.example.producerservice.controller.OrderController.java (新規作成):


package com.example.producerservice.controller;

import com.example.producerservice.dto.OrderEvent;
import com.example.producerservice.service.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final KafkaProducerService producerService;

    public OrderController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @PostMapping
    public String publishOrderEvent(@RequestBody OrderEvent orderEvent) {
        producerService.sendOrderEvent(orderEvent);
        return "Order event published successfully.";
    }
}

注目すべきは、kafkaTemplate.send メソッドで第2引数にキー(orderEvent.getOrderId())を指定している点です。これにより、Kafkaはキーのハッシュ値に基づいて送信先のパーティションを決定するため、同じ注文IDに関するイベントは必ず同じパーティションに送られ、処理の順序性が保証されます。

コンシューマー側の修正

リスナーメソッドの引数の型を OrderEvent に変更するだけです。

com.example.consumerservice.service.KafkaConsumerService.java:


// ...
import com.example.consumerservice.dto.OrderEvent;
// ...

@Service
public class KafkaConsumerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);

    @KafkaListener(topics = "order-events", groupId = "order-events-group")
    public void consumeOrderEvent(OrderEvent orderEvent) {
        log.info("Consumed order event: {}", orderEvent);
        // 在庫更新、通知送信などのビジネスロジックを実行
    }
}

Spring for KafkaとJacksonが連携し、受信したJSONバイト配列を自動的に OrderEvent オブジェクトにデシリアライズしてメソッドに渡してくれます。開発者は面倒な変換処理を意識することなく、型安全なオブジェクトとしてデータを扱うことができます。

動作確認

再度両方のサービスを起動し、今度は /api/orders エンドポイントにJSON形式でリクエストを送信します。


curl -X POST -H "Content-Type: application/json" -d '{
  "orderId": "ORD-12345",
  "productId": "PROD-ABC",
  "quantity": 2,
  "price": 25.50,
  "customerId": "CUST-001",
  "timestamp": "2023-10-27T10:00:00Z"
}' http://localhost:8080/api/orders

consumer-service のコンソールに、受信した OrderEvent オブジェクトの内容がログとして出力されれば成功です。これで、より実践的なマイクロサービス間通信の基盤が整いました。

信頼性の高いメッセージングのための詳細設定

これまでは基本的なメッセージ送受信を実装してきましたが、本番環境で稼働するシステムでは、「メッセージが失われないこと」「メッセージが重複処理されないこと」「エラーが発生した際に適切に対処できること」といった信頼性が極めて重要になります。この章では、Spring BootとKafkaで信頼性の高いメッセージングを実現するための詳細な設定について解説します。

プロデューサーの信頼性設定:メッセージ損失を防ぐ

プロデューサーが送信したメッセージが、ネットワーク障害やブローカーのダウンなどによって失われるリスクを低減するための設定です。

`producer-service` の `application.properties` に追記:


# ... (既存の設定) ...

# 1. 送信完了の確認レベル (acks)
# all: リーダーとすべてのISR(In-Sync Replica)に書き込まれたら完了とみなす。最も信頼性が高い。
# 1: リーダーに書き込まれたら完了とみなす(デフォルト)。
# 0: 確認を待たない。最も高速だがメッセージ損失のリスクが最も高い。
spring.kafka.producer.acks=all

# 2. リトライ設定
# 送信失敗時にリトライする回数。
# 一時的なネットワークエラーなどからの回復に有効。
spring.kafka.producer.retries=3

# 3. べき等プロデューサー (Idempotent Producer)
# これをtrueにすると、リトライによって同じメッセージが重複して送信されるのを防ぐ。
# 内部的にシーケンス番号を使って重複を排除する。
# acks=all, retries > 0 と一緒に使うことが必須。
spring.kafka.producer.properties.enable.idempotence=true
  • acks=all: プロデューサーは、メッセージがリーダーパーティションだけでなく、設定されたすべての同期レプリカ(ISR)にも書き込まれたことを確認してから送信完了とみなします。これにより、リーダーブローカーがクラッシュしてもメッセージが失われることはありません。ただし、レイテンシは若干増加します。
  • retries: ブローカーからエラーが返ってきた場合(例:リーダーが一時的に利用不能)、プロデューサーが自動的に再送を試みる回数です。
  • enable.idempotence=true: べき等性を有効にすると、プロデューサーは各メッセージに一意のIDを付与します。リトライによってメッセージが再送された場合でも、ブローカー側でIDを確認し、重複していればそれを破棄します。これにより、「At-Least-Once(少なくとも1回)」の配信セマンティクスに加えて、重複を排除した「Exactly-Once(厳密に1回)」に近い挙動(プロデューサーからブローカーまでの範囲で)を実現できます。

これらの設定を組み合わせることで、プロデューサー側のメッセージ損失や重複のリスクを大幅に削減できます。

コンシューマーのメッセージ処理保証:重複処理とデータ一貫性

コンシューマー側では、メッセージを確実に処理し、かつ障害発生時に処理が重複したりデータが失われたりしないようにすることが課題となります。その鍵となるのがオフセットコミットの管理です。

オフセットコミットの仕組みと課題

コンシューマーは、どこまでメッセージを読み込んだかをオフセットとしてKafkaに記録(コミット)します。デフォルト設定(enable.auto.commit=true)では、コンシューマーは一定間隔(auto.commit.interval.ms, デフォルト5秒)で自動的にオフセットをコミットします。

この自動コミットには問題があります。例えば、コンシューマーがメッセージを取得し、ビジネスロジック(例:DBへの書き込み)を実行している最中に、オフセットの自動コミットが発生したとします。その直後にコンシューマーがクラッシュした場合、DBへの書き込みは完了していませんが、オフセットはコミットされてしまっています。再起動後、コンシューマーはコミットされたオフセットの次から処理を再開するため、クラッシュ前に処理していたメッセージは失われてしまいます(At-Most-Once)

手動コミットによるAt-Least-Onceの実現

この問題を解決するには、オフセットコミットを手動で行います。ビジネスロジックが正常に完了したことを確認してから、アプリケーションのコード内で明示的にコミットするのです。

`consumer-service` の `application.properties` を変更:


# ... (既存の設定) ...

# オフセットの自動コミットを無効にする
spring.kafka.consumer.enable-auto-commit=false

# コンテナ管理の手動コミットモードを設定
# RECORD: メッセージを1件処理するごとにコミット
# BATCH: ポーリングで取得した全メッセージを処理し終えたらコミット(デフォルト)
# MANUAL_IMMEDIATE: Acknowledgment.acknowledge()を呼び出すと即座に同期コミット
spring.kafka.listener.ack-mode=RECORD

`KafkaConsumerService.java` を修正:


// ...
import org.springframework.kafka.support.Acknowledgment;
// ...

@Service
public class KafkaConsumerService {
    // ...
    @KafkaListener(topics = "order-events", groupId = "order-events-group")
    public void consumeOrderEvent(OrderEvent orderEvent, Acknowledgment ack) {
        try {
            log.info("Consumed order event: {}", orderEvent);
            // 重要なビジネスロジック
            // ...
            
            // 処理が正常に完了したら、手動でコミット
            ack.acknowledge();
            log.info("Offset committed successfully.");

        } catch (Exception e) {
            log.error("Error processing order event: {}", orderEvent, e);
            // エラーが発生した場合、コミットしないことでメッセージが再処理される
            // ここでリトライ処理やDLQへの送信などを検討する
        }
    }
}

ack-mode を設定し、リスナーメソッドの引数に Acknowledgment を追加することで、手動コミットが可能になります。ack.acknowledge() を呼び出すと、コンテナがオフセットをコミットします。もしビジネスロジックの途中で例外が発生して acknowledge() が呼ばれずにメソッドが終了した場合、オフセットはコミットされません。コンシューマーが再起動(またはリバランス後)すると、同じメッセージが再度配信されます。これにより、メッセージが失われることはなくなりますが、代わりに同じメッセージが重複して処理される可能性(At-Least-Once)が生まれます。そのため、ビジネスロジック側で重複処理を許容できるように(べき等に)設計することが重要になります。

高度なエラーハンドリングとデッドレターキュー(DLQ)

メッセージの処理中に回復不可能なエラー(例:データ形式の不正、関連データが存在しない)が発生した場合、単純にリトライを繰り返しても成功しません。このようなメッセージは、無限にリトライされ続け、後続の正常なメッセージの処理を妨げてしまいます(ブロッキング)。

この問題に対する一般的な解決策が、デッドレターキュー(Dead-Letter Queue, DLQ)パターンです。処理に失敗したメッセージを、通常のトピックとは別の専用トピック(DLQトピック)に転送し、後で開発者が原因を調査したり、手動で再処理したりできるようにする仕組みです。

Spring for Kafkaは、DLQパターンを簡単に実装するための機能を提供しています。

1. DLQ用のトピックを準備する

DLQメッセージを格納するためのトピック(例:order-events.DLT)をあらかじめ作成しておくか、後述するKafkaAdminで自動生成するように設定します。

2. エラーハンドラとDLQの設定

まず、application.propertiesでDLQトピック名を指定します。

`consumer-service` の `application.properties` に追記:


# ...
# @DltHandlerで利用するDLT(Dead Letter Topic)のサフィックスを指定
spring.kafka.listener.dead-letter-topic-suffix=.DLT

次に、特定のエラーが発生した場合にDLQに送るための設定をJava Configで行います。

com.example.consumerservice.config.KafkaConsumerConfig.java:


package com.example.consumerservice.config;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
public class KafkaConsumerConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    
    @Bean
    public DefaultErrorHandler errorHandler(KafkaOperations<String, Object> template) {
        // リトライを2回(合計3回試行)行い、それでも失敗した場合はDLQに送信する
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                // DLQトピック名をカスタマイズするラムダ式
                (record, ex) -> {
                    log.error("Message failed after retries. Sending to DLT. Topic: {}, Partition: {}, Offset: {}",
                            record.topic(), record.partition(), record.offset());
                    return new TopicPartition(record.topic() + ".DLT", record.partition());
                }
        );

        // 1秒間隔で2回リトライ
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2));

        // リトライしない例外を指定
        errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);
        
        return errorHandler;
    }
}

この設定では、DefaultErrorHandler をカスタマイズしています。

  • FixedBackOff(1000L, 2): 1秒間隔で最大2回リトライします。
  • DeadLetterPublishingRecoverer: リトライがすべて失敗した後に呼び出されるリカバリー処理で、メッセージをDLQに送信します。
  • addNotRetryableExceptions: 特定の例外(ここではIllegalArgumentException)が発生した場合は、リトライせずに即座にDLQに送信するように設定できます。データ不備などが原因の場合に有効です。

3. DLQを購読するリスナーの実装 (オプション)

DLQに送られたメッセージを監視し、アラートを発報したり、データベースに記録したりするためのリスナーを別途作成することもできます。


// in KafkaConsumerService.java
import org.springframework.kafka.annotation.DltHandler;

// ...
    @DltHandler
    public void dltListen(Object in) {
        log.warn("Received from DLT: {}", in);
        // 通知処理など
    }

@DltHandlerアノテーションを使うと、同じリスナーコンテナ内でDLQを処理する専用のメソッドを定義できます。

これで、コンシューマーが処理できないメッセージによってシステム全体が停止することなく、問題を隔離し、安定した運用を継続するための仕組みが整いました。

高度なトピックとコンシューマー管理

基本的な信頼性設定に加えて、Spring for Kafkaは運用を効率化し、パフォーマンスを向上させるための多くの高度な機能を提供しています。

`KafkaAdmin`によるトピックの自動生成と管理

これまでは、Kafkaトピックが事前に存在することを前提としていました。しかし、アプリケーションの起動時に必要なトピックを自動的に作成・設定できると、環境構築の手間が省け、設定ミスも防げます。

Spring Bootは、クラスパス上にspring-kafkaが存在すると自動的にKafkaAdmin Beanを登録します。このKafkaAdminは、DIコンテナ内にあるNewTopic型のBeanを検出し、対応するトピックがKafkaクラスタに存在しない場合に自動で作成してくれます。

例えば、producer-serviceに以下のConfigクラスを追加します。

com.example.producerservice.config.KafkaTopicConfig.java:


package com.example.producerservice.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic orderEventsTopic() {
        return TopicBuilder.name("order-events")
                .partitions(3)       // パーティション数を3に設定
                .replicas(1)         // レプリケーション係数を1に設定(開発環境用)
                .build();
    }
    
    @Bean
    public NewTopic orderEventsDlt() {
        return TopicBuilder.name("order-events.DLT")
                .partitions(1)
                .replicas(1)
                .build();
    }
}

この設定により、producer-serviceアプリケーションが起動する際に、order-eventsトピック(パーティション3)とorder-events.DLTトピック(パーティション1)が存在しない場合は自動で作成されます。パーティション数やレプリケーション係数、その他のトピック設定(ログの保持期間など)もコードで管理できるため、インフラの設定とアプリケーションのロジックを同期させやすくなります。

コンシューマーの並列処理とスループット向上

Kafkaのスループットは、トピックのパーティション数とコンシューマーの並列度によって決まります。パーティションが複数あっても、コンシューマーがシングルスレッドで動作していては、その性能を十分に引き出せません。

Spring for Kafkaでは、ConcurrentKafkaListenerContainerFactory を利用して、1つの@KafkaListenerに対して複数のコンシューマースレッドを割り当てることができます。これにより、コンシューマーグループ内の1つのアプリケーションインスタンスが、複数のパーティションを並行して処理できるようになります。

application.propertiesで並列度を設定します。

`consumer-service` の `application.properties` に追記:


# @KafkaListenerの並列度を設定
# トピックのパーティション数以下の値を設定する
spring.kafka.listener.concurrency=3

order-eventsトピックのパーティション数を3に設定した場合、concurrency=3とすることで、アプリケーションは3つのスレッドを起動し、それぞれが1つのパーティションを担当してメッセージを並列に処理します。これにより、コンシューマーのスループットが大幅に向上します。

注意: concurrencyの値は、購読するトピックのパーティション数より大きくしても意味がありません。パーティションはコンシューマーグループ内の1つのスレッドにしか割り当てられないため、余ったスレッドは遊休状態になります。

特定のメッセージのみを処理する:メッセージフィルタリング

場合によっては、トピック内のすべてのメッセージではなく、特定の条件を満たすメッセージのみを処理したいことがあります。例えば、ヘッダーに特定のフラグが付いているメッセージや、JSONペイロード内の特定の値を持つメッセージだけを処理するケースです。

@KafkaListenerにカスタムのRecordFilterStrategyを設定することで、メッセージを受信した時点でフィルタリングを行い、条件に合わないメッセージを破棄(オフセットはコミットされる)することができます。

まず、フィルタリングロジックを実装したRecordFilterStrategyのBeanを作成します。

com.example.consumerservice.config.KafkaConsumerConfig.java (追記):


// ...
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.consumerservice.dto.OrderEvent;
// ...
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
            ConsumerFactory<String, OrderEvent> consumerFactory,
            ObjectMapper objectMapper
    ) {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        // フィルターを設定
        factory.setRecordFilterStrategy(record -> {
            // 例: 注文金額が100未満のイベントは処理しない (破棄する)
            OrderEvent event = record.value();
            return event.getPrice().doubleValue() < 100.0; // trueを返すとメッセージは破棄される
        });
        
        // trueをセットすると、破棄されたメッセージのオフセットもコミットされる
        factory.setAckDiscarded(true);

        return factory;
    }
// ...

この例では、注文金額(price)が100未満のOrderEventは処理対象外とし、破棄するフィルターを定義しています。setAckDiscarded(true)により、破棄されたメッセージも処理済みとみなされ、再配信されることはありません。

このカスタムファクトリを@KafkaListenerで指定します。

com.example.consumerservice.service.KafkaConsumerService.java (修正):


    @KafkaListener(topics = "order-events", groupId = "order-events-group", containerFactory = "kafkaListenerContainerFactory")
    public void consumeOrderEvent(OrderEvent orderEvent, Acknowledgment ack) {
        // このメソッドは、注文金額が100以上のイベントでのみ呼び出される
        log.info("Processing high-value order: {}", orderEvent);
        // ...
        ack.acknowledge();
    }

これにより、リスナーメソッドは関心のあるメッセージの処理に専念でき、コードの可読性と保守性が向上します。

まとめと次のステップへ

本稿では、Spring BootとApache Kafkaを用いて、現代的なイベント駆動型マイクロサービスを構築するための道のりを、基礎から応用まで体系的に探求してきました。

まず、Kafkaの核心概念であるトピック、パーティション、プロデューサー、コンシューマー、そしてコンシューマーグループの役割を理解し、非同期メッセージングの基盤を固めました。次に、Dockerを使ってローカル開発環境を迅速に構築し、Spring Initializrで2つのマイクロサービスの雛形を作成しました。基本的な文字列メッセージの送受信から始め、より実践的なJSON形式のカスタムオブジェクトを扱う方法へとステップアップし、型安全で構造化されたデータ交換を実現しました。

さらに、本番運用を見据え、メッセージングの信頼性を高めるための重要な設定を学びました。プロデューサー側ではacks=allとべき等性(idempotence)を有効にすることでメッセージの損失と重複を防ぎ、コンシューマー側では手動オフセットコミットによって「At-Least-Once」の処理保証を実現しました。また、回復不能なエラーに対応するためのデッドレターキュー(DLQ)パターンを導入し、システムの安定性を向上させました。

最後に、KafkaAdminによるトピックの宣言的な管理、コンシューマーの並列度設定によるスループットの最適化、メッセージフィルタリングによる効率的な処理など、運用を高度化するためのテクニックについても触れました。

Spring Bootが提供する優れた抽象化と自動設定により、開発者はKafkaの複雑な内部動作を深く意識することなく、宣言的なアノテーションや設定ファイルを通じて、その強力な機能を最大限に活用できます。この組み合わせは、疎結合で、回復力があり、スケーラブルなマイクロサービスアーキテクチャを迅速に構築するための、まさに理想的な選択肢と言えるでしょう。

ここからさらに学びを深めるための次のステップとしては、以下のようなトピックが挙げられます。

  • Kafka Streams / ksqlDB: Kafka内で直接リアルタイムなストリーム処理(集計、結合、フィルタリングなど)を行うためのライブラリ/エンジン。
  • AvroとSchema Registry: JSONよりも効率的で、スキーマの進化を管理できるデータフォーマットであるAvroと、そのスキーマを一元管理するSchema Registryの導入。
  • Kafka Connect: データベースやS3、Elasticsearchなど、様々な外部システムとKafkaをノンコーディングで連携させるためのフレームワーク。
  • セキュリティ: SASLによる認証やSSL/TLSによる通信の暗号化。
  • 監視: PrometheusやGrafanaを用いたKafkaクラスタとクライアントアプリケーションのメトリクス監視。

本稿が、あなたのイベント駆動型システム設計の一助となれば幸いです。


0 개의 댓글:

Post a Comment