ストリーム(Stream)は、Node.jsで大容量データを効率的に扱うための強力な仕組みです。
メモリを節約しながら、ファイルやネットワークデータを処理できます。
💡この記事でわかる事
以下の内容を習得することができます:
- ストリームとは何か
- 4種類のストリーム
- Readableストリーム(読み込み)
- Writableストリーム(書き込み)
- Transformストリーム(変換)
- パイプとチェーン
- バックプレッシャーの理解
- 実践例:大容量ファイル処理
Contents
1. ストリームとは何か
1.1 ストリームの必要性
通常のファイル読み込みでは、全データをメモリに読み込む必要があります。
const fs = require('fs');
// 悪い例: 1GBのファイルを一度に読み込む
fs.readFile('large-file.txt', (err, data) => {
if (err) throw err;
console.log(data); // 1GB分のメモリを消費!
});ストリームを使えば、データを小さな塊(チャンク)に分けて処理できます。
const fs = require('fs');
// 良い例: データを少しずつ読み込む
const stream = fs.createReadStream('large-file.txt');
stream.on('data', (chunk) => {
console.log(`${chunk.length}バイト受信`);// 少量のメモリで処理
});1.2 ストリームのメリット
- メモリ効率: データ全体をメモリに載せる必要がない
- 時間効率: データが到着次第、すぐに処理を開始できる
- コンポーザビリティ: 複数の処理を連結(パイプ)できる
2. 4種類のストリーム
Node.jsには4種類のストリームがあります。
- Readable(読み込み専用)
- Writable(書き込み専用)
- Duplex(読み書き両方)
- Transform(変換)
2.1 ストリームの種類
Readable(読み込み専用)
説明: データを読み込む
用途例: ファイル読み込み、HTTPリクエスト受信
Writable(書き込み専用)
説明: データを書き込む
用途例: ファイル書き込み、HTTPレスポンス送信
Duplex(読み書き両方)
説明: 読み書き両方可能
用途例: TCPソケット、WebSocket
Transform(変換)
説明: データを変換しながら転送
用途例: 圧縮、暗号化、データ整形
2.2 ストリームの基本構造
すべてのストリームはEventEmitterを継承しており、イベントで制御します。
const { Readable } = require('stream');
const readable = new Readable();
// データが利用可能になったとき
readable.on('data', (chunk) => {
console.log('データ受信:', chunk);
});
// ストリームが終了したとき
readable.on('end', () => {
console.log('完了');
});
// エラーが発生したとき
readable.on('error', (err) => {
console.error('エラー:', err);
});3. Readableストリーム(読み込み)
Readableストリームは、データソースからデータを読み込みます。
3.1 ファイルから読み込む
fs.createReadStream()は最も一般的なReadableストリームです。
const fs = require('fs');
const readStream = fs.createReadStream('input.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024// 16KB ずつ読み込む(デフォルトは64KB)
});
let totalBytes = 0;
readStream.on('data', (chunk) => {
totalBytes += chunk.length;
console.log(`読み込み中: ${totalBytes} バイト`);
});
readStream.on('end', () => {
console.log(`完了: 合計 ${totalBytes} バイト`);
});
readStream.on('error', (err) => {
console.error('エラー:', err);
});3.2 カスタムReadableストリームの作成
独自のデータソースからストリームを作成できます。
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(max, options) {
super(options);
this.max = max;
this.current = 0;
}
_read() {
// データを生成してプッシュ
if (this.current < this.max) {
this.current++;
this.push(`数値: ${this.current}\n`);
} else {
// データがなくなったらnullをプッシュして終了
this.push(null);
}
}
}
// 使用例
const counter = new CounterStream(5);
counter.on('data', (chunk) => {
console.log(chunk.toString());
});
counter.on('end', () => {
console.log('カウント完了');
});3.3 Readableモード
Readableストリームには2つのモードがあります。
Flowing モード(自動): データが自動的に読み込まれる
const stream = fs.createReadStream('file.txt');
stream.on('data', (chunk) => {
console.log(chunk);// 自動的にデータが流れてくる
});Paused モード(手動): 明示的に読み込む
const stream = fs.createReadStream('file.txt');
stream.on('readable', () => {
let chunk;
while ((chunk = stream.read()) !== null) {
console.log(chunk);// 手動でread()を呼ぶ
}
});4. Writableストリーム(書き込み)
Writableストリームは、データを書き込み先に送ります。
4.1 ファイルへの書き込み
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt', {
encoding: 'utf8'
});
// データを書き込む
writeStream.write('1行目\n');
writeStream.write('2行目\n');
writeStream.write('3行目\n');
// ストリームを終了
writeStream.end('最終行\n');
writeStream.on('finish', () => {
console.log('書き込み完了');
});
writeStream.on('error', (err) => {
console.error('エラー:', err);
});4.2 カスタムWritableストリームの作成
データを受け取って処理するカスタムストリームを作成できます。
const { Writable } = require('stream');
class LogStream extends Writable {
constructor(options) {
super(options);
this.lineCount = 0;
}
_write(chunk, encoding, callback) {
// データを処理
const lines = chunk.toString().split('\n').filter(line => line.length > 0);
this.lineCount += lines.length;
lines.forEach(line => {
console.log(`[ログ ${this.lineCount}] ${line}`);
});
// 処理完了を通知
callback();
}
_final(callback) {
// ストリーム終了時の処理
console.log(`\n合計 ${this.lineCount} 行を処理しました`);
callback();
}
}
// 使用例
const logger = new LogStream();
logger.write('最初のメッセージ\n');
logger.write('2番目のメッセージ\n');
logger.write('3番目のメッセージ\n');
logger.end('最後のメッセージ\n');4.3 バッファ管理
write()メソッドは、バッファが満杯かどうかを示すブール値を返します。
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
function writeMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// 最後のデータ
writer.write(data, encoding, callback);
} else {
// バッファに空きがあるか確認
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// バッファが満杯なので、空くまで待つ
writer.once('drain', write);
}
}
}
writeMillionTimes(writeStream, 'データ\n', 'utf8', () => {
console.log('書き込み完了');
});5. Transformストリーム(変換)
Transformストリームは、データを読み込み、変換して書き出します。
5.1 大文字変換ストリーム
データを受け取って変換する例です。
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// データを大文字に変換
const upperCased = chunk.toString().toUpperCase();
this.push(upperCased);
callback();
}
}
// 使用例</em>
const fs = require('fs');
const upperCase = new UpperCaseTransform();
fs.createReadStream('input.txt')
.pipe(upperCase)
.pipe(fs.createWriteStream('output.txt'));5.2 CSVパーサー
実用的な例として、CSV形式のデータをオブジェクトに変換するストリームです。
const { Transform } = require('stream');
class CSVParser extends Transform {
constructor(options) {
super({ ...options, objectMode: true });// オブジェクトモード
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
// バッファにデータを追加
this.buffer += chunk.toString();
// 行ごとに処理
const lines = this.buffer.split('\n');
this.buffer = lines.pop();// 最後の不完全な行は残す
lines.forEach(line => {
if (!line.trim()) return;
const values = line.split(',');
if (!this.headers) {
// 最初の行はヘッダー
this.headers = values;
} else {
// データ行をオブジェクトに変換
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i]?.trim() || '';
});
this.push(obj);
}
});
callback();
}
_flush(callback) {
// 残りのバッファを処理
if (this.buffer.trim() && this.headers) {
const values = this.buffer.split(',');
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i]?.trim() || '';
});
this.push(obj);
}
callback();
}
}
// 使用例
const fs = require('fs');
const parser = new CSVParser();
fs.createReadStream('data.csv')
.pipe(parser)
.on('data', (obj) => {
console.log('パース結果:', obj);
})
.on('end', () => {
console.log('CSV解析完了');
});5.3 データ圧縮
zlibモジュールを使った圧縮・解凍の例です。
const fs = require('fs');
const zlib = require('zlib');
// ファイルを圧縮
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'))
.on('finish', () => {
console.log('圧縮完了');
});
// ファイルを解凍
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('output.txt'))
.on('finish', () => {
console.log('解凍完了');
});6. パイプとチェーン
pipe()メソッドを使うと、ストリームを簡単に連結できます。
6.1 基本的なパイプ
パイプは、あるストリームの出力を別のストリームの入力に接続します。
const fs = require('fs');
// ファイルをコピー
fs.createReadStream('source.txt')
.pipe(fs.createWriteStream('destination.txt'));
// エラーハンドリング付き
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('destination.txt');
readStream
.on('error', (err) => console.error('読み込みエラー:', err))
.pipe(writeStream)
.on('error', (err) => console.error('書き込みエラー:', err))
.on('finish', () => console.log('コピー完了'));6.2 複数のストリームをチェーン
複数のTransformストリームを連結できます。
const fs = require('fs');
const zlib = require('zlib');
const { Transform } = require('stream');
// 行番号を追加するTransform
class LineNumberTransform extends Transform {
constructor(options) {
super(options);
this.lineNumber = 0;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
lines.forEach(line => {
this.lineNumber++;
this.push(`${this.lineNumber}: ${line}\n`);
});
callback();
}
_flush(callback) {
if (this.buffer) {
this.lineNumber++;
this.push(`${this.lineNumber}: ${this.buffer}\n`);
}
callback();
}
}
// パイプラインを構築
fs.createReadStream('input.txt')
.pipe(new LineNumberTransform()) <em>// 行番号を追加</em>
.pipe(zlib.createGzip()) <em>// 圧縮</em>
.pipe(fs.createWriteStream('output.txt.gz'))
.on('finish', () => {
console.log('処理完了: 行番号追加 → 圧縮 → 保存');
});6.3 pipeline()でエラーハンドリングを簡単に
Node.js 10以降では、pipeline()を使うとエラーハンドリングが簡単になります。
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('パイプラインエラー:', err);
} else {
console.log('パイプライン完了');
}
}
);7. バックプレッシャーの理解
バックプレッシャーは、読み込み速度と書き込み速度のバランスを取る仕組みです。
7.1 バックプレッシャーとは
書き込み側が遅い場合、読み込み側が待つ必要があります。
const fs = require('fs');
const readStream = fs.createReadStream('large-file.txt');
const writeStream = fs.createWriteStream('output.txt');
readStream.on('data', (chunk) => {
// write() は、バッファに空きがあるかを返す
const canContinue = writeStream.write(chunk);
if (!canContinue) {
console.log('バッファ満杯 - 読み込み一時停止');
readStream.pause(); // 読み込みを停止
}
});
writeStream.on('drain', () => {
console.log('バッファに空き - 読み込み再開');
readStream.resume(); // 読み込みを再開
});
readStream.on('end', () => {
writeStream.end();
});7.2 pipe()は自動的にバックプレッシャーを処理
pipe()を使えば、バックプレッシャーは自動的に処理されます。
const fs = require('fs');
// pipe()が自動的にpause/resumeを管理
fs.createReadStream('large-file.txt')
.pipe(fs.createWriteStream('output.txt'));8. 実践例:大容量ファイル処理
8.1 ログファイル分析ツール
大容量のログファイルを行ごとに処理する例です。
const fs = require('fs');
const { Transform } = require('stream');
const readline = require('readline');
class LogAnalyzer extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.stats = {
totalLines: 0,
errorCount: 0,
warningCount: 0,
infoCount: 0
};
}
_transform(line, encoding, callback) {
this.stats.totalLines++;
if (line.includes('ERROR')) {
this.stats.errorCount++;
this.push({ type: 'error', line });
} else if (line.includes('WARN')) {
this.stats.warningCount++;
this.push({ type: 'warning', line });
} else if (line.includes('INFO')) {
this.stats.infoCount++;
}
callback();
}
_flush(callback) {
console.log('\n=== ログ分析結果 ===');
console.log(`総行数: ${this.stats.totalLines}`);
console.log(`エラー: ${this.stats.errorCount}`);
console.log(`警告: ${this.stats.warningCount}`);
console.log(`情報: ${this.stats.infoCount}`);
callback();
}
}
// 使用例
const rl = readline.createInterface({
input: fs.createReadStream('application.log'),
crlfDelay: Infinity
});
const analyzer = new LogAnalyzer();
const errorLog = fs.createWriteStream('errors.log');
rl.on('line', (line) => {
analyzer.write(line);
});
rl.on('close', () => {
analyzer.end();
});
analyzer.on('data', (item) => {
if (item.type === 'error') {
errorLog.write(item.line + '\n');
}
});
analyzer.on('finish', () => {
errorLog.end();
console.log('\nエラーログを errors.log に保存しました');
});8.2 HTTP ストリーミングサーバー
動画ファイルなどをストリーミング配信する例です。
const http = require('http');
const fs = require('fs');
const path = require('path');
const server = http.createServer((req, res) => {
const filePath = path.join(__dirname, 'video.mp4');
fs.stat(filePath, (err, stats) => {
if (err) {
res.writeHead(404);
res.end('File not found');
return;
}
const fileSize = stats.size;
const range = req.headers.range;
if (range) {
// 範囲リクエスト(シーク対応)
const parts = range.replace(/bytes=/, '').split('-');
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
const chunkSize = (end - start) + 1;
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': chunkSize,
'Content-Type': 'video/mp4'
});
const stream = fs.createReadStream(filePath, { start, end });
stream.pipe(res);
} else {
// 通常のストリーミング
res.writeHead(200, {
'Content-Length': fileSize,
'Content-Type': 'video/mp4'
});
const stream = fs.createReadStream(filePath);
stream.pipe(res);
}
});
});
server.listen(3000, () => {
console.log('ストリーミングサーバー起動: http://localhost:3000');
});8.3 データ処理パイプライン
複数のステップを組み合わせたデータ処理パイプラインです。
const { pipeline, Transform } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// ステップ1: JSON行を解析
class JSONLineParser extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
lines.forEach(line => {
if (line.trim()) {
try {
this.push(JSON.parse(line));
} catch (e) {
console.error('JSON解析エラー:', line);
}
}
});
callback();
}
}
// ステップ2: データをフィルタリング
class DataFilter extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
}
_transform(obj, encoding, callback) {
// 条件に合うデータのみ通過
if (obj.value > 50) {
this.push(obj);
}
callback();
}
}
// ステップ3: データを整形
class DataFormatter extends Transform {
constructor(options) {
super({ ...options, objectMode: false });
}
_transform(obj, encoding, callback) {
const formatted = `ID: ${obj.id}, Value: ${obj.value}, Date: ${obj.date}\n`;
this.push(formatted);
callback();
}
}
// パイプラインを実行
pipeline(
fs.createReadStream('data.jsonl'),
zlib.createGunzip(), // 解凍
new JSONLineParser(), // JSON解析
new DataFilter(), // フィルタリング
new DataFormatter(), // 整形
fs.createWriteStream('filtered.txt'), // 保存
(err) => {
if (err) {
console.error('パイプラインエラー:', err);
} else {
console.log('データ処理完了');
}
}
);まとめ
ストリーム処理は、Node.jsで効率的なデータ処理を実現する重要な技術です。
重要なポイント:
- 4種類のストリーム: Readable、Writable、Duplex、Transform
- メモリ効率: データを小さなチャンクで処理し、メモリ使用量を抑える
- pipe(): ストリームを簡単に連結できる
- バックプレッシャー: 自動的に読み書き速度のバランスを取る
- カスタムストリーム:
_read(),_write(),_transform()を実装して独自のストリームを作成 - pipeline(): エラーハンドリングが簡単で、推奨される方法
ベストプラクティス:
✅ 大容量ファイルは必ずストリームで処理
✅ エラーハンドリングを忘れずに実装
✅ pipeline()を使うと安全で簡潔
✅ オブジェクトモードで構造化データを扱う
✅ バックプレッシャーはpipe()に任せる
次回予告:
第4回では、堅牢なエラーハンドリングについて解説します。try-catch、Promise、async/awaitでのエラー処理から、未処理エラーの対策まで学びましょう!


























