デザインについての学習メモブログ

Node.js入門(4):ストリーム処理をマスターする

記事内に広告が含まれています。

Node.js入門(4):ストリーム処理をマスターする

ストリーム(Stream)は、Node.jsで大容量データを効率的に扱うための強力な仕組みです。

メモリを節約しながら、ファイルやネットワークデータを処理できます。

💡この記事でわかる事

以下の内容を習得することができます:

  1. ストリームとは何か
  2. 4種類のストリーム
  3. Readableストリーム(読み込み)
  4. Writableストリーム(書き込み)
  5. Transformストリーム(変換)
  6. パイプとチェーン
  7. バックプレッシャーの理解
  8. 実践例:大容量ファイル処理

1. ストリームとは何か

1.1 ストリームの必要性

通常のファイル読み込みでは、全データをメモリに読み込む必要があります。

JavaScript
const fs = require('fs');

// 悪い例: 1GBのファイルを一度に読み込む
fs.readFile('large-file.txt', (err, data) => {
  if (err) throw err;
  console.log(data); // 1GB分のメモリを消費!
});

ストリームを使えば、データを小さな塊(チャンク)に分けて処理できます。

JavaScript
const fs = require('fs');

// 良い例: データを少しずつ読み込む
const stream = fs.createReadStream('large-file.txt');

stream.on('data', (chunk) => {
  console.log(`${chunk.length}バイト受信`);// 少量のメモリで処理
});

1.2 ストリームのメリット

  • メモリ効率: データ全体をメモリに載せる必要がない
  • 時間効率: データが到着次第、すぐに処理を開始できる
  • コンポーザビリティ: 複数の処理を連結(パイプ)できる

2. 4種類のストリーム

Node.jsには4種類のストリームがあります。

  • Readable(読み込み専用)
  • Writable(書き込み専用)
  • Duplex(読み書き両方)
  • Transform(変換)

2.1 ストリームの種類

Readable(読み込み専用)

説明: データを読み込む
用途例: ファイル読み込み、HTTPリクエスト受信

Writable(書き込み専用)

説明: データを書き込む
用途例: ファイル書き込み、HTTPレスポンス送信

Duplex(読み書き両方)

説明: 読み書き両方可能
用途例: TCPソケット、WebSocket

Transform(変換)

説明: データを変換しながら転送
用途例: 圧縮、暗号化、データ整形

2.2 ストリームの基本構造

すべてのストリームはEventEmitterを継承しており、イベントで制御します。

JavaScript
const { Readable } = require('stream');

const readable = new Readable();

// データが利用可能になったとき
readable.on('data', (chunk) => {
  console.log('データ受信:', chunk);
});

// ストリームが終了したとき
readable.on('end', () => {
  console.log('完了');
});

// エラーが発生したとき
readable.on('error', (err) => {
  console.error('エラー:', err);
});

3. Readableストリーム(読み込み)

Readableストリームは、データソースからデータを読み込みます。

3.1 ファイルから読み込む

fs.createReadStream()は最も一般的なReadableストリームです。

JavaScript
const fs = require('fs');

const readStream = fs.createReadStream('input.txt', {
  encoding: 'utf8',
  highWaterMark: 16 * 1024// 16KB ずつ読み込む(デフォルトは64KB)
});

let totalBytes = 0;

readStream.on('data', (chunk) => {
  totalBytes += chunk.length;
  console.log(`読み込み中: ${totalBytes} バイト`);
});

readStream.on('end', () => {
  console.log(`完了: 合計 ${totalBytes} バイト`);
});

readStream.on('error', (err) => {
  console.error('エラー:', err);
});

3.2 カスタムReadableストリームの作成

独自のデータソースからストリームを作成できます。

JavaScript
const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max, options) {
    super(options);
    this.max = max;
    this.current = 0;
  }

  _read() {
    // データを生成してプッシュ
    if (this.current < this.max) {
      this.current++;
      this.push(`数値: ${this.current}\n`);
    } else {
      // データがなくなったらnullをプッシュして終了
      this.push(null);
    }
  }
}

// 使用例
const counter = new CounterStream(5);

counter.on('data', (chunk) => {
  console.log(chunk.toString());
});

counter.on('end', () => {
  console.log('カウント完了');
});

3.3 Readableモード

Readableストリームには2つのモードがあります。

Flowing モード(自動): データが自動的に読み込まれる

JavaScript
const stream = fs.createReadStream('file.txt');

stream.on('data', (chunk) => {
  console.log(chunk);// 自動的にデータが流れてくる
});

Paused モード(手動): 明示的に読み込む

JavaScript
const stream = fs.createReadStream('file.txt');

stream.on('readable', () => {
  let chunk;
  while ((chunk = stream.read()) !== null) {
    console.log(chunk);// 手動でread()を呼ぶ
  }
});

4. Writableストリーム(書き込み)

Writableストリームは、データを書き込み先に送ります。

4.1 ファイルへの書き込み

JavaScript
const fs = require('fs');

const writeStream = fs.createWriteStream('output.txt', {
  encoding: 'utf8'
});

// データを書き込む
writeStream.write('1行目\n');
writeStream.write('2行目\n');
writeStream.write('3行目\n');

// ストリームを終了
writeStream.end('最終行\n');

writeStream.on('finish', () => {
  console.log('書き込み完了');
});

writeStream.on('error', (err) => {
  console.error('エラー:', err);
});

4.2 カスタムWritableストリームの作成

データを受け取って処理するカスタムストリームを作成できます。

JavaScript
const { Writable } = require('stream');

class LogStream extends Writable {
  constructor(options) {
    super(options);
    this.lineCount = 0;
  }

  _write(chunk, encoding, callback) {
    // データを処理
    const lines = chunk.toString().split('\n').filter(line => line.length > 0);
    this.lineCount += lines.length;
    
    lines.forEach(line => {
      console.log(`[ログ ${this.lineCount}] ${line}`);
    });
    
    // 処理完了を通知
    callback();
  }

  _final(callback) {
    // ストリーム終了時の処理
    console.log(`\n合計 ${this.lineCount} 行を処理しました`);
    callback();
  }
}

// 使用例
const logger = new LogStream();

logger.write('最初のメッセージ\n');
logger.write('2番目のメッセージ\n');
logger.write('3番目のメッセージ\n');
logger.end('最後のメッセージ\n');

4.3 バッファ管理

write()メソッドは、バッファが満杯かどうかを示すブール値を返します。

JavaScript
const fs = require('fs');

const writeStream = fs.createWriteStream('output.txt');

function writeMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // 最後のデータ
        writer.write(data, encoding, callback);
      } else {
        // バッファに空きがあるか確認
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    
    if (i > 0) {
      // バッファが満杯なので、空くまで待つ
      writer.once('drain', write);
    }
  }
}

writeMillionTimes(writeStream, 'データ\n', 'utf8', () => {
  console.log('書き込み完了');
});

5. Transformストリーム(変換)

Transformストリームは、データを読み込み、変換して書き出します。

5.1 大文字変換ストリーム

データを受け取って変換する例です。

JavaScript
const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // データを大文字に変換
    const upperCased = chunk.toString().toUpperCase();
    this.push(upperCased);
    callback();
  }
}

// 使用例</em>
const fs = require('fs');
const upperCase = new UpperCaseTransform();

fs.createReadStream('input.txt')
  .pipe(upperCase)
  .pipe(fs.createWriteStream('output.txt'));

5.2 CSVパーサー

実用的な例として、CSV形式のデータをオブジェクトに変換するストリームです。

JavaScript
const { Transform } = require('stream');

class CSVParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });// オブジェクトモード
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    // バッファにデータを追加
    this.buffer += chunk.toString();
    
    // 行ごとに処理
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop();// 最後の不完全な行は残す
    
    lines.forEach(line => {
      if (!line.trim()) return;
      
      const values = line.split(',');
      
      if (!this.headers) {
        // 最初の行はヘッダー
        this.headers = values;
      } else {
        // データ行をオブジェクトに変換
        const obj = {};
        this.headers.forEach((header, i) => {
          obj[header.trim()] = values[i]?.trim() || '';
        });
        this.push(obj);
      }
    });
    
    callback();
  }

  _flush(callback) {
    // 残りのバッファを処理
    if (this.buffer.trim() && this.headers) {
      const values = this.buffer.split(',');
      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header.trim()] = values[i]?.trim() || '';
      });
      this.push(obj);
    }
    callback();
  }
}

// 使用例
const fs = require('fs');
const parser = new CSVParser();

fs.createReadStream('data.csv')
  .pipe(parser)
  .on('data', (obj) => {
    console.log('パース結果:', obj);
  })
  .on('end', () => {
    console.log('CSV解析完了');
  });

5.3 データ圧縮

zlibモジュールを使った圧縮・解凍の例です。

JavaScript
const fs = require('fs');
const zlib = require('zlib');

// ファイルを圧縮
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'))
  .on('finish', () => {
    console.log('圧縮完了');
  });

// ファイルを解凍
fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'))
  .on('finish', () => {
    console.log('解凍完了');
  });

6. パイプとチェーン

pipe()メソッドを使うと、ストリームを簡単に連結できます。

6.1 基本的なパイプ

パイプは、あるストリームの出力を別のストリームの入力に接続します。

JavaScript
const fs = require('fs');

// ファイルをコピー
fs.createReadStream('source.txt')
  .pipe(fs.createWriteStream('destination.txt'));

// エラーハンドリング付き
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('destination.txt');

readStream
  .on('error', (err) => console.error('読み込みエラー:', err))
  .pipe(writeStream)
  .on('error', (err) => console.error('書き込みエラー:', err))
  .on('finish', () => console.log('コピー完了'));

6.2 複数のストリームをチェーン

複数のTransformストリームを連結できます。

JavaScript
const fs = require('fs');
const zlib = require('zlib');
const { Transform } = require('stream');

// 行番号を追加するTransform
class LineNumberTransform extends Transform {
  constructor(options) {
    super(options);
    this.lineNumber = 0;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop();
    
    lines.forEach(line => {
      this.lineNumber++;
      this.push(`${this.lineNumber}: ${line}\n`);
    });
    
    callback();
  }

  _flush(callback) {
    if (this.buffer) {
      this.lineNumber++;
      this.push(`${this.lineNumber}: ${this.buffer}\n`);
    }
    callback();
  }
}

// パイプラインを構築
fs.createReadStream('input.txt')
  .pipe(new LineNumberTransform())      <em>// 行番号を追加</em>
  .pipe(zlib.createGzip())               <em>// 圧縮</em>
  .pipe(fs.createWriteStream('output.txt.gz'))
  .on('finish', () => {
    console.log('処理完了: 行番号追加 → 圧縮 → 保存');
  });

6.3 pipeline()でエラーハンドリングを簡単に

Node.js 10以降では、pipeline()を使うとエラーハンドリングが簡単になります。

JavaScript
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz'),
  (err) => {
    if (err) {
      console.error('パイプラインエラー:', err);
    } else {
      console.log('パイプライン完了');
    }
  }
);

7. バックプレッシャーの理解

バックプレッシャーは、読み込み速度と書き込み速度のバランスを取る仕組みです。

7.1 バックプレッシャーとは

書き込み側が遅い場合、読み込み側が待つ必要があります。

JavaScript
const fs = require('fs');

const readStream = fs.createReadStream('large-file.txt');
const writeStream = fs.createWriteStream('output.txt');

readStream.on('data', (chunk) => {
  // write() は、バッファに空きがあるかを返す
  const canContinue = writeStream.write(chunk);
  
  if (!canContinue) {
    console.log('バッファ満杯 - 読み込み一時停止');
    readStream.pause(); // 読み込みを停止
  }
});

writeStream.on('drain', () => {
  console.log('バッファに空き - 読み込み再開');
  readStream.resume(); // 読み込みを再開
});

readStream.on('end', () => {
  writeStream.end();
});

7.2 pipe()は自動的にバックプレッシャーを処理

pipe()を使えば、バックプレッシャーは自動的に処理されます。

JavaScript
const fs = require('fs');

// pipe()が自動的にpause/resumeを管理
fs.createReadStream('large-file.txt')
  .pipe(fs.createWriteStream('output.txt'));

8. 実践例:大容量ファイル処理

8.1 ログファイル分析ツール

大容量のログファイルを行ごとに処理する例です。

JavaScript
const fs = require('fs');
const { Transform } = require('stream');
const readline = require('readline');

class LogAnalyzer extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.stats = {
      totalLines: 0,
      errorCount: 0,
      warningCount: 0,
      infoCount: 0
    };
  }

  _transform(line, encoding, callback) {
    this.stats.totalLines++;
    
    if (line.includes('ERROR')) {
      this.stats.errorCount++;
      this.push({ type: 'error', line });
    } else if (line.includes('WARN')) {
      this.stats.warningCount++;
      this.push({ type: 'warning', line });
    } else if (line.includes('INFO')) {
      this.stats.infoCount++;
    }
    
    callback();
  }

  _flush(callback) {
    console.log('\n=== ログ分析結果 ===');
    console.log(`総行数: ${this.stats.totalLines}`);
    console.log(`エラー: ${this.stats.errorCount}`);
    console.log(`警告: ${this.stats.warningCount}`);
    console.log(`情報: ${this.stats.infoCount}`);
    callback();
  }
}

// 使用例
const rl = readline.createInterface({
  input: fs.createReadStream('application.log'),
  crlfDelay: Infinity
});

const analyzer = new LogAnalyzer();
const errorLog = fs.createWriteStream('errors.log');

rl.on('line', (line) => {
  analyzer.write(line);
});

rl.on('close', () => {
  analyzer.end();
});

analyzer.on('data', (item) => {
  if (item.type === 'error') {
    errorLog.write(item.line + '\n');
  }
});

analyzer.on('finish', () => {
  errorLog.end();
  console.log('\nエラーログを errors.log に保存しました');
});

8.2 HTTP ストリーミングサーバー

動画ファイルなどをストリーミング配信する例です。

JavaScript
const http = require('http');
const fs = require('fs');
const path = require('path');

const server = http.createServer((req, res) => {
  const filePath = path.join(__dirname, 'video.mp4');
  
  fs.stat(filePath, (err, stats) => {
    if (err) {
      res.writeHead(404);
      res.end('File not found');
      return;
    }
    
    const fileSize = stats.size;
    const range = req.headers.range;
    
    if (range) {
      // 範囲リクエスト(シーク対応)
      const parts = range.replace(/bytes=/, '').split('-');
      const start = parseInt(parts[0], 10);
      const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
      const chunkSize = (end - start) + 1;
      
      res.writeHead(206, {
        'Content-Range': `bytes ${start}-${end}/${fileSize}`,
        'Accept-Ranges': 'bytes',
        'Content-Length': chunkSize,
        'Content-Type': 'video/mp4'
      });
      
      const stream = fs.createReadStream(filePath, { start, end });
      stream.pipe(res);
    } else {
      // 通常のストリーミング
      res.writeHead(200, {
        'Content-Length': fileSize,
        'Content-Type': 'video/mp4'
      });
      
      const stream = fs.createReadStream(filePath);
      stream.pipe(res);
    }
  });
});

server.listen(3000, () => {
  console.log('ストリーミングサーバー起動: http://localhost:3000');
});

8.3 データ処理パイプライン

複数のステップを組み合わせたデータ処理パイプラインです。

JavaScript
const { pipeline, Transform } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// ステップ1: JSON行を解析
class JSONLineParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop();
    
    lines.forEach(line => {
      if (line.trim()) {
        try {
          this.push(JSON.parse(line));
        } catch (e) {
          console.error('JSON解析エラー:', line);
        }
      }
    });
    
    callback();
  }
}

// ステップ2: データをフィルタリング
class DataFilter extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
  }

  _transform(obj, encoding, callback) {
    // 条件に合うデータのみ通過
    if (obj.value > 50) {
      this.push(obj);
    }
    callback();
  }
}

// ステップ3: データを整形
class DataFormatter extends Transform {
  constructor(options) {
    super({ ...options, objectMode: false });
  }

  _transform(obj, encoding, callback) {
    const formatted = `ID: ${obj.id}, Value: ${obj.value}, Date: ${obj.date}\n`;
    this.push(formatted);
    callback();
  }
}

// パイプラインを実行
pipeline(
  fs.createReadStream('data.jsonl'),
  zlib.createGunzip(),                    // 解凍
  new JSONLineParser(),                   // JSON解析
  new DataFilter(),                       // フィルタリング
  new DataFormatter(),                    // 整形
  fs.createWriteStream('filtered.txt'),  // 保存
  (err) => {
    if (err) {
      console.error('パイプラインエラー:', err);
    } else {
      console.log('データ処理完了');
    }
  }
);

まとめ

ストリーム処理は、Node.jsで効率的なデータ処理を実現する重要な技術です。

重要なポイント:

  • 4種類のストリーム: Readable、Writable、Duplex、Transform
  • メモリ効率: データを小さなチャンクで処理し、メモリ使用量を抑える
  • pipe(): ストリームを簡単に連結できる
  • バックプレッシャー: 自動的に読み書き速度のバランスを取る
  • カスタムストリーム: _read(), _write(), _transform()を実装して独自のストリームを作成
  • pipeline(): エラーハンドリングが簡単で、推奨される方法

ベストプラクティス:

✅ 大容量ファイルは必ずストリームで処理
✅ エラーハンドリングを忘れずに実装
pipeline()を使うと安全で簡潔
✅ オブジェクトモードで構造化データを扱う
✅ バックプレッシャーはpipe()に任せる

次回予告:
第4回では、堅牢なエラーハンドリングについて解説します。try-catch、Promise、async/awaitでのエラー処理から、未処理エラーの対策まで学びましょう!