先週、本番環境のログ集計バッチが突如として死にました。エラーログには馴染み深すぎるあのメッセージが刻まれていました:FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory。このバッチ処理は、AWS Fargate(メモリ制限1GB)上で動作しており、これまでは数百MB程度のCSVファイルを処理していましたが、その日はキャンペーンの影響でファイルサイズが12GBに膨れ上がっていたのです。
Node.jsを使っていると、「非同期I/Oだから速い」という漠然とした認識でコードを書きがちですが、大容量ファイル処理においては、単なる非同期処理ではなく、データの「流量制御(Flow Control)」を理解していないと、いとも簡単にメモリを食いつぶします。今回は、私が実際に遭遇したこのメモリリーク問題を解決するために行った、Node.js StreamsのBackpressure(バックプレッシャー)制御と適切なパイプライン構築について、泥臭いデバッグ過程を含めて共有します。
OOMの原因分析:なぜパイプラインは詰まるのか
当時の環境は Node.js v18 (LTS)、EC2 t3.small 相当のスペックでした。問題のコードは、S3から巨大なCSVをストリームで読み込み、行ごとにJSONへ変換し、多少のデータ加工を行ってからDBへバルクインサートするための前処理を行うものでした。
Node.jsのStreamは、データを「チャンク」と呼ばれる小さな塊で扱います。通常、読み込み速度(Source)と書き込み速度(Destination)が一致していれば問題ありません。しかし、今回のケースでは「読み込み(ファイルRead)」の方が、「処理&書き込み(変換処理)」よりも圧倒的に高速でした。
これがBackpressure(背圧)の問題です。下流の処理が詰まっているなら、上流の蛇口を閉めなければなりません。しかし、多くのエンジニア(過去の私も含めて)は、単純に .pipe() を繋げばNode.jsが勝手にやってくれると過信しがちです。
失敗したアプローチ:自作Transformストリームの罠
最初に修正を試みた際、私は以下のような安直な実装を行いました。data イベントをリッスンし、手動で処理を行う方式です。
// ❌ 推奨されない実装例
// Backpressureを無視しており、読み込みが止まらない
const fs = require('fs');
const readStream = fs.createReadStream('huge-data.csv');
readStream.on('data', async (chunk) => {
// 非同期処理をしている間に、次の 'data' イベントが発火しまくる
await heavyProcess(chunk);
});
readStream.on('end', () => console.log('Done'));
このコードの致命的な欠陥は、heavyProcess が完了するのを待たずに、次の data イベントが同期的に次々と発火することです。await を書いたとしても、イベントループはストリームの読み込みを一時停止(pause)してくれません。これにより、未処理のPromiseがメモリ上に数万件スタックし、瞬く間にメモリ管理の破綻を招きました。
解決策:stream.pipelineとAsync Generatorsの活用
この問題を解決するための正解は、手動のイベントリスナー管理をやめ、Node.jsの stream.pipeline ユーティリティと、ES2018から導入された「非同期ジェネレータ(Async Generators)」を組み合わせることです。これにより、BackpressureのハンドリングをNode.jsの内部機構に委譲できます。
以下は、10GBのファイルを安定して処理し、メモリ使用量を一定(約50MB〜100MB)に保つための最適化されたコードです。
// ✅ 最適化された実装
// stream/promises APIを使用(Node.js v15+)
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const { Transform } = require('node:stream');
async function runEfficientProcessing() {
const inputPath = './huge-data.csv';
const outputPath = './cleaned-data.jsonl';
console.time('Processing Time');
// 1. ソースストリーム(読み込み)
// highWaterMarkを調整してバッファサイズを明示的に制御
const source = fs.createReadStream(inputPath, {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunk
});
// 2. 変換処理(Async Generatorを使用)
// ここで yield することで、下流が受け取るまで処理が一時停止し
// 自動的にBackpressureが効くようになる
async function* transformData(sourceStream) {
let buffer = '';
for await (const chunk of sourceStream) {
buffer += chunk;
const lines = buffer.split('\n');
// 最後の行は不完全な可能性があるためバッファに残す
buffer = lines.pop();
for (const line of lines) {
if (!line.trim()) continue;
// 重い同期/非同期処理のシミュレーション
const processed = processLine(line);
// オブジェクトを文字列化して流す
yield JSON.stringify(processed) + '\n';
}
}
// 残ったバッファを処理
if (buffer.trim()) {
yield JSON.stringify(processLine(buffer)) + '\n';
}
}
// 3. 宛先ストリーム(書き込み)
const destination = fs.createWriteStream(outputPath);
try {
// pipelineはエラーハンドリングとストリームの破棄を自動化する
await pipeline(
source,
transformData,
destination
);
console.log('Pipeline succeeded.');
} catch (err) {
console.error('Pipeline failed:', err);
} finally {
console.timeEnd('Processing Time');
}
}
function processLine(line) {
// 実践的なデータ変換ロジック
const columns = line.split(',');
return { id: columns[0], value: Number(columns[1]) * 1.1 };
}
runEfficientProcessing();
このコードの肝は、async function*(非同期ジェネレータ)をTransformストリームとして挟んでいる点です。for await (const chunk of sourceStream) ループ内では、内部的にストリームの readable 状態が監視されます。下流の書き込みが遅延すると、ジェネレータの実行が待機状態になり、それに応じて上流の source ストリームも読み込みを一時停止します。これがNode.jsパフォーマンス最適化の核心です。
| 指標 | 修正前(Naive Approach) | 修正後(Pipeline + Generator) |
|---|---|---|
| メモリ使用量 (RSS) | 1.2 GB (Crash) | 48 MB (Stable) |
| CPU負荷 | スパイク状に100%張り付き | 平準化され安定 |
| エラー時の挙動 | プロセスゾンビ化 | 正常にCatch&リソース解放 |
表の結果を見てください。修正後のコードでは、ファイルサイズがどれだけ大きくなろうとも、メモリ使用量は highWaterMark と現在処理中のチャンク分に限定されるため、ほぼ横ばいで推移します。これは、限られたリソース環境(FargateやLambdaなど)で非常に重要です。
エッジケースと注意点
この手法は強力ですが、万能ではありません。いくつかのエッジケースには注意が必要です。
- 非同期並列数の制限: 上記の例では、1行ずつ直列に処理しています。もし1行ごとの処理にAPIリクエストが含まれる場合、スループットが低下する可能性があります。その場合は、
stream.Transformを継承し、内部でPromise.allを使ってバッチ処理(例:100行まとめて処理)を行うカスタムクラスの実装を検討してください。 - CSVパースの厳密性: 上記の簡易的な
split('\n')は、改行コードを含むCSVセルに対応していません。本番環境では、csv-parser などの信頼できるストリーム対応ライブラリをパイプラインに組み込むべきです。 - エラー伝播: 古い
.pipe()メソッドは、途中のストリームでエラーが発生しても、パイプライン全体を自動でクローズしません。必ずstream.pipelineを使用して、リソースリークを防いでください。
結論
Node.jsにおける大容量ファイル処理の要は、メモリにデータを溜め込まないことです。fs.readFile を使うのはプロトタイプまでとし、本番コードでは必ず Stream API を採用してください。そして何より、Backpressure の存在を意識し、stream.pipeline と非同期イテレータを活用することで、堅牢でスケーラブルなアプリケーションを構築できます。
もしメモリリークに悩まされているなら、ヒープダンプを取る前に、まずはデータフローの「栓」が適切に機能しているか、コードを見直してみてください。
Post a Comment