Python Asyncioが止まる?イベントループブロッキングの特定とrun_in_executorによる解決策

先日、本番環境で稼働中のFastAPIベースのマイクロサービスが、突然「504 Gateway Time-out」を連発し始めました。CPU使用率はわずか15%程度、メモリリークの兆候もなし。しかし、ヘルスチェックのエンドポイントすら応答を返さない状態に陥りました。ログには例外(Exception)が一切記録されておらず、まるでアプリケーション時間が凍結したかのような挙動でした。

現象の分析と「静かなる死」の正体

このサービスはAWS Fargate上で稼働しており、Python 3.11を使用しています。秒間数千リクエストを処理する高負荷な環境ではありませんが、データ処理パイプラインの一部として、外部APIとの通信やデータ変換を行っていました。

通常の同期的なWebフレームワーク(DjangoやFlaskの同期モードなど)であれば、1つのリクエストが詰まっても他のワーカースレッドが処理を継続します。しかし、Python Asyncioを用いた並行プログラミング環境では、たった一つの「重い処理」が致命的になります。

根本原因: イベントループ(Event Loop)が同期処理によってブロックされていたため、他のすべてのコルーチン(ヘルスチェック含む)が実行機会を奪われていました。

Asyncioは「協調的マルチタスク(Cooperative Multitasking)」を採用しています。これは、実行中のタスクが自発的に制御を返さない限り(awaitしない限り)、OSが強制的にコンテキストスイッチを行わないことを意味します。つまり、誰かがCPUを独占して計算を始めると、システム全体が停止します。

失敗したアプローチ:ログの埋め込み

最初はどこで止まっているのか特定するために、怪しい箇所の前後にlogger.info()を大量に埋め込みました。

async def process_data(data):
    logger.info("Start processing")
    result = complex_calculation(data) # ここが怪しい
    logger.info("End processing")
    return result

しかし、この方法は役に立ちませんでした。なぜなら、ループがブロックしている間はログ出力(I/O)自体も遅延したり、バッファに留まったりすることがあるからです。さらに、再現性が低く、特定のデータパターンの時だけ計算量が増大するケースだったため、本番ログの海から原因を特定するのは困難でした。

解決策:イベントループの遅延監視とデバッグモード

推測でデバッグするのをやめ、エンジニアリングで解決します。Pythonには標準で非同期デバッグ機能が備わっていますが、本番環境ではデフォルトで無効になっています。また、自前でループの健全性を監視する「ハートビートタスク」を導入することで、ブロッキングが発生した瞬間のスタックトレースを捕捉できます。

ブロッキング検知コードの実装

以下のコードは、イベントループが指定時間(例えば100ms)以上ブロックされた場合に、現在実行中のスタックフレームを強制的にダンプする監視ツールの実装例です。

import asyncio
import time
import traceback
import sys
from loguru import logger

async def monitor_event_loop(threshold_sec: float = 0.1):
    """
    イベントループのラグを監視し、ブロッキング発生時に警告ログを出力する。
    
    :param threshold_sec: 許容する最大ブロック時間(秒)
    """
    logger.info("Event loop monitoring started.")
    
    while True:
        # 現在時刻を記録
        start_time = time.monotonic()
        
        # 制御をイベントループに返し、他のタスクを実行させる
        # 本来なら一瞬で戻ってくるはず
        await asyncio.sleep(0)
        
        # 戻ってくるまでの時間を計測
        lag = time.monotonic() - start_time
        
        if lag > threshold_sec:
            # ブロッキングを検知
            logger.warning(f"Event loop blocked for {lag:.4f} sec!")
            
            # オプション: 現在のスタックトレースを出力して犯人を特定する
            # 注: 本番でのスタックダンプは慎重に扱う必要がありますが、
            #     深刻なハングアップの調査には非常に有効です。
            #     sys._current_frames() を使うと全スレッドの状態が見れます。
            
            for thread_id, frame in sys._current_frames().items():
                logger.warning(f"Thread {thread_id} stack:\n" + 
                               "".join(traceback.format_stack(frame)))

この監視タスクをアプリケーション起動時(FastAPIならlifespan内)にasyncio.create_task()でバックグラウンド実行させます。

また、開発環境では環境変数 PYTHONASYNCIODEBUG=1 を設定することで、遅いコールバックを自動的にログ出力させることも可能です。これにより、イベントループブロッキングの原因となっている同期関数を特定できました。今回のケースでは、サードパーティ製のバリデーションライブラリが巨大なJSONに対して正規表現チェックを行う際、数秒間CPUを占有していたことが判明しました。

修正:run_in_executorによるオフロード

原因となっていたCPUバウンドな処理(重い計算や同期I/O)を、メインのイベントループから切り離す必要があります。これにはloop.run_in_executorを使用し、スレッドプール(またはプロセスプール)に処理を委譲します。

import asyncio
import functools
from concurrent.futures import ThreadPoolExecutor

# グローバルなExecutorを作成(プロセスのライフサイクルに合わせて管理すること)
executor = ThreadPoolExecutor(max_workers=4)

def blocking_validation(data: dict) -> bool:
    """
    CPUを激しく消費する同期関数。
    これをasync関数内で直接呼ぶとループが止まる。
    """
    # ... 重い正規表現処理など ...
    time.sleep(1) # シミュレーション用
    return True

async def handle_request(data: dict):
    loop = asyncio.get_running_loop()
    
    # 修正前:
    # is_valid = blocking_validation(data)  # NG: ここで全体が止まる
    
    # 修正後:
    # ThreadPoolExecutor上で実行し、完了を非同期に待つ
    # functools.partialを使って引数を渡すのがポイント
    is_valid = await loop.run_in_executor(
        executor, 
        functools.partial(blocking_validation, data)
    )
    
    return {"valid": is_valid}

run_in_executorを使用することで、重い処理は別スレッドで実行され、メインスレッド(イベントループ)は即座に解放されます。これにより、計算中であっても他のリクエストやヘルスチェックへの応答が可能になります。

Best Practice: ProcessPoolExecutorを使うべきか?
PythonにはGIL(Global Interpreter Lock)があるため、純粋なCPU計算(数値計算や画像処理)の場合はスレッドプールでは並列化の恩恵を受けにくい場合があります。その場合はProcessPoolExecutorを使いますが、データのシリアライズ(Pickle)コストが発生する点に注意してください。I/O待ちや外部ライブラリ(NumPyなどGILを解放するもの)の場合はスレッドプールで十分効果的です。

パフォーマンス検証結果

修正前後で負荷試験(JMeterを使用)を行い、Pythonパフォーマンスへの影響を確認しました。シナリオは「100同時接続で、時折重いリクエストが混ざる」状況を再現しました。

メトリクス 修正前 (同期実行) 修正後 (run_in_executor)
平均レスポンスタイム 2,400ms 150ms
99パーセンタイル Timeout (30s+) 320ms
スループット 12 req/sec 450 req/sec
エラー率 15% (Timeouts) 0%

修正前は、たった一つの重いリクエストが後続の軽量なリクエスト99個を巻き込んで遅延させていました(Head-of-Line Blocking)。修正後は、重い処理がスレッドに逃がされたため、軽量なリクエストは即座に処理され、システム全体の健全性が劇的に向上しました。

公式ドキュメント: Executorでのコード実行

注意点とエッジケース:ContextVarsの罠

この手法を導入する際に最も注意すべき点は、ContextVars(コンテキスト変数)の伝播です。Python 3.7以降で導入されたcontextvarsは、非同期タスク間で状態を保持するために使われます(例:リクエストID、認証情報、OpenTelemetryのトレースIDなど)。

run_in_executorで別のスレッドに処理を移すと、デフォルトではこのコンテキストがコピーされません。その結果、ログからリクエストIDが欠落したり、データベースのセッション管理がおかしくなる可能性があります。

これを防ぐには、以下のように明示的にコンテキストをコピーする必要があります(Python 3.7+):

import contextvars

def execute_in_thread(func, *args):
    # 現在のコンテキストをコピー
    ctx = contextvars.copy_context()
    # コピーしたコンテキスト内で関数を実行するラッパー
    return ctx.run(func, *args)

# 呼び出し側
await loop.run_in_executor(executor, execute_in_thread, blocking_func, data)

このひと手間を忘れると、デバッグ時にトレースが途切れてしまい、解決したはずの問題が別の形で再発することになります。

結論

Asyncioを用いたアプリケーションでの「原因不明のタイムアウト」は、十中八九イベントループのブロッキングが原因です。感覚や単純なログに頼るのではなく、monitor_event_loopのような定量的な監視タスクを導入し、ループの健康状態を可視化することが重要です。適切な箇所でスレッドプールを活用し、ノンブロッキングなアーキテクチャを維持することで、Pythonの並行処理性能を最大限に引き出すことができます。

Post a Comment