Node.jsで10GB超のデータを処理したらOOM?StreamとBackpressureでメモリを50MBに抑えた話

先週、本番環境のログ集計バッチが突如として死にました。エラーログには馴染み深すぎるあのメッセージが刻まれていました:FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory。このバッチ処理は、AWS Fargate(メモリ制限1GB)上で動作しており、これまでは数百MB程度のCSVファイルを処理していましたが、その日はキャンペーンの影響でファイルサイズが12GBに膨れ上がっていたのです。

Node.jsを使っていると、「非同期I/Oだから速い」という漠然とした認識でコードを書きがちですが、大容量ファイル処理においては、単なる非同期処理ではなく、データの「流量制御(Flow Control)」を理解していないと、いとも簡単にメモリを食いつぶします。今回は、私が実際に遭遇したこのメモリリーク問題を解決するために行った、Node.js StreamsのBackpressure(バックプレッシャー)制御と適切なパイプライン構築について、泥臭いデバッグ過程を含めて共有します。

OOMの原因分析:なぜパイプラインは詰まるのか

当時の環境は Node.js v18 (LTS)、EC2 t3.small 相当のスペックでした。問題のコードは、S3から巨大なCSVをストリームで読み込み、行ごとにJSONへ変換し、多少のデータ加工を行ってからDBへバルクインサートするための前処理を行うものでした。

Node.jsのStreamは、データを「チャンク」と呼ばれる小さな塊で扱います。通常、読み込み速度(Source)と書き込み速度(Destination)が一致していれば問題ありません。しかし、今回のケースでは「読み込み(ファイルRead)」の方が、「処理&書き込み(変換処理)」よりも圧倒的に高速でした。

現場での失敗: 書き込み側が処理しきれないデータを、読み込み側が容赦なくメモリ上のバッファ(Internal Buffer)にプッシュし続けました。結果、バッファが上限(highWaterMark)を超えて肥大化し、V8エンジンのGC(ガベージコレクション)が追いつかずにプロセスがクラッシュしたのです。

これがBackpressure(背圧)の問題です。下流の処理が詰まっているなら、上流の蛇口を閉めなければなりません。しかし、多くのエンジニア(過去の私も含めて)は、単純に .pipe() を繋げばNode.jsが勝手にやってくれると過信しがちです。

失敗したアプローチ:自作Transformストリームの罠

最初に修正を試みた際、私は以下のような安直な実装を行いました。data イベントをリッスンし、手動で処理を行う方式です。

// ❌ 推奨されない実装例
// Backpressureを無視しており、読み込みが止まらない
const fs = require('fs');
const readStream = fs.createReadStream('huge-data.csv');

readStream.on('data', async (chunk) => {
    // 非同期処理をしている間に、次の 'data' イベントが発火しまくる
    await heavyProcess(chunk); 
});

readStream.on('end', () => console.log('Done'));

このコードの致命的な欠陥は、heavyProcess が完了するのを待たずに、次の data イベントが同期的に次々と発火することです。await を書いたとしても、イベントループはストリームの読み込みを一時停止(pause)してくれません。これにより、未処理のPromiseがメモリ上に数万件スタックし、瞬く間にメモリ管理の破綻を招きました。

解決策:stream.pipelineとAsync Generatorsの活用

この問題を解決するための正解は、手動のイベントリスナー管理をやめ、Node.jsの stream.pipeline ユーティリティと、ES2018から導入された「非同期ジェネレータ(Async Generators)」を組み合わせることです。これにより、BackpressureのハンドリングをNode.jsの内部機構に委譲できます。

以下は、10GBのファイルを安定して処理し、メモリ使用量を一定(約50MB〜100MB)に保つための最適化されたコードです。

// ✅ 最適化された実装
// stream/promises APIを使用(Node.js v15+)
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const { Transform } = require('node:stream');

async function runEfficientProcessing() {
  const inputPath = './huge-data.csv';
  const outputPath = './cleaned-data.jsonl';

  console.time('Processing Time');

  // 1. ソースストリーム(読み込み)
  // highWaterMarkを調整してバッファサイズを明示的に制御
  const source = fs.createReadStream(inputPath, { 
    encoding: 'utf8', 
    highWaterMark: 64 * 1024 // 64KB chunk
  });

  // 2. 変換処理(Async Generatorを使用)
  // ここで yield することで、下流が受け取るまで処理が一時停止し
  // 自動的にBackpressureが効くようになる
  async function* transformData(sourceStream) {
    let buffer = '';
    
    for await (const chunk of sourceStream) {
      buffer += chunk;
      const lines = buffer.split('\n');
      
      // 最後の行は不完全な可能性があるためバッファに残す
      buffer = lines.pop(); 

      for (const line of lines) {
        if (!line.trim()) continue;
        
        // 重い同期/非同期処理のシミュレーション
        const processed = processLine(line); 
        
        // オブジェクトを文字列化して流す
        yield JSON.stringify(processed) + '\n';
      }
    }
    
    // 残ったバッファを処理
    if (buffer.trim()) {
      yield JSON.stringify(processLine(buffer)) + '\n';
    }
  }

  // 3. 宛先ストリーム(書き込み)
  const destination = fs.createWriteStream(outputPath);

  try {
    // pipelineはエラーハンドリングとストリームの破棄を自動化する
    await pipeline(
      source,
      transformData,
      destination
    );
    console.log('Pipeline succeeded.');
  } catch (err) {
    console.error('Pipeline failed:', err);
  } finally {
    console.timeEnd('Processing Time');
  }
}

function processLine(line) {
  // 実践的なデータ変換ロジック
  const columns = line.split(',');
  return { id: columns[0], value: Number(columns[1]) * 1.1 };
}

runEfficientProcessing();

このコードの肝は、async function*(非同期ジェネレータ)をTransformストリームとして挟んでいる点です。for await (const chunk of sourceStream) ループ内では、内部的にストリームの readable 状態が監視されます。下流の書き込みが遅延すると、ジェネレータの実行が待機状態になり、それに応じて上流の source ストリームも読み込みを一時停止します。これがNode.jsパフォーマンス最適化の核心です。

指標修正前(Naive Approach)修正後(Pipeline + Generator)
メモリ使用量 (RSS)1.2 GB (Crash)48 MB (Stable)
CPU負荷スパイク状に100%張り付き平準化され安定
エラー時の挙動プロセスゾンビ化正常にCatch&リソース解放

表の結果を見てください。修正後のコードでは、ファイルサイズがどれだけ大きくなろうとも、メモリ使用量は highWaterMark と現在処理中のチャンク分に限定されるため、ほぼ横ばいで推移します。これは、限られたリソース環境(FargateやLambdaなど)で非常に重要です。

Node.js公式ドキュメント: stream.pipeline

エッジケースと注意点

この手法は強力ですが、万能ではありません。いくつかのエッジケースには注意が必要です。

  1. 非同期並列数の制限: 上記の例では、1行ずつ直列に処理しています。もし1行ごとの処理にAPIリクエストが含まれる場合、スループットが低下する可能性があります。その場合は、stream.Transform を継承し、内部で Promise.all を使ってバッチ処理(例:100行まとめて処理)を行うカスタムクラスの実装を検討してください。
  2. CSVパースの厳密性: 上記の簡易的な split('\n') は、改行コードを含むCSVセルに対応していません。本番環境では、csv-parser などの信頼できるストリーム対応ライブラリをパイプラインに組み込むべきです。
  3. エラー伝播: 古い .pipe() メソッドは、途中のストリームでエラーが発生しても、パイプライン全体を自動でクローズしません。必ず stream.pipeline を使用して、リソースリークを防いでください。
成果: この改修により、バッチ処理は12GBのデータを安定して処理できるようになり、インフラコストを上げることなくOOMエラーを完全に根絶しました。

結論

Node.jsにおける大容量ファイル処理の要は、メモリにデータを溜め込まないことです。fs.readFile を使うのはプロトタイプまでとし、本番コードでは必ず Stream API を採用してください。そして何より、Backpressure の存在を意識し、stream.pipeline と非同期イテレータを活用することで、堅牢でスケーラブルなアプリケーションを構築できます。

もしメモリリークに悩まされているなら、ヒープダンプを取る前に、まずはデータフローの「栓」が適切に機能しているか、コードを見直してみてください。

Post a Comment