情報アイランド

「情報を制する者は世界を制す」をモットーに様々な情報を提供することを目指すブログです。現在はプログラミング関連情報が多めですが、投資関連情報も取り扱っていきたいです。

Node.jsのストリームの使い方

2016/07/30

ストリームとは

サイズが非常に大きなデータを取り扱う際などには、データの全てを一度にメモリに読み込んでしまうとメモリが逼迫し、プログラムのパフォーマンスを低下させてしまう可能性があります。

そのような場合にはストリームを使用します。

ストリームとはデータを徐々に読み込んだり、書き込んだり、変換したりすることができるものです。

Node.jsのストリーム

Node.jsのストリームはstreamモジュールでストリームの種類毎に下のような別々のクラスとして提供されています。

  • stream.Readableクラス・・・読み込み専用のものです。
  • stream.Writableクラス・・・書き込み専用のものです。
  • stream.Transformクラス・・・変換専用のものです。変換前のデータの書き込みと変換後のデータの読み込みが可能です。
  • stream.Duplexクラス・・・読み込みと書き込みが可能なものです。読み込むデータと書き込むデータは独立しています。

しかし、これらのクラスはストリームの枠組みを提供する抽象的なものでしかありません。

具象的なストリームはこれらのクラスの中で適切なものを継承し、派生クラスとして適切に実装することによって定義します。そして、具象的なストリームを使用する際には派生クラスのインスタンスを作成して使用することになります。

Node.jsで具象的なストリームとして提供されているものにはサードパーティが提供しているものも含めれば様々なものがあります。

勿論自作することもできます。

書き込みストリーム(writable)

書き込みストリームにはデータを書き込むことができます。また、データの書き込みを完了することもできます

以後stream.Writableクラスのインスタンスをwritableと表記します。

文字コード

書き込みストリームのデータの文字コードを設定するにはwritable.setDefaultEncoding関数を使用します。

var writable = writable.setDefaultEncoding('xxx');

第1引数に文字コードを指定します。

ストリームが対応している文字コードはバッファが対応している文字コードと同じですので、ストリームが対応している文字コードに関しては下の記事の「文字コード」の項を参照してください。

返り値として書き込みストリームが得られます。

書き込み

書き込みストリームにデータを書き込むにはwritable.write関数を使用します。

var ready = writable.write('xxx', 'utf-8', function () {
});
var ready = writable.write(buf, function () {
});

第1引数にストリームに書き込むデータを指定します。文字列かバッファを指定します。ただし、ストリームによっては文字列やバッファ以外のデータを指定しなければならない場合もあります。

第2引数にデータの文字コードを指定します。この引数は指定しなくても構いません。デフォルトはutf-8です。ただし、データが文字列でない場合には無視されます。

第3引数にデータが全て処理された時に呼び出される関数を指定します。この引数は指定しなくても構いません。

返り値として真偽値が得られます。

この真偽値は未処理のデータがバッファに溜まり過ぎていないかを表します。

trueの場合、未処理のデータは溜まり過ぎていませんので、更にwritable.write関数を使用してストリームにデータを書き込むことができます。

falseの場合、未処理のデータが溜まり過ぎていますので、更にwritable.write関数を使用してストリームにデータを書き込むべきではありません。未処理のデータが十分に処理されるまで書き込みを待機すべきです。

未処理のデータが十分に処理されるとwritabledrainイベントが発生します。

そのため、drainイベントが発生してから書き込みを再開すべきです。

書き込みの完了

書き込みストリームへのデータの書き込みを完了するにはwritable.end関数を使用します。

writable.end('xxx', 'utf-8', function () {
});
writable.end(buf, function () {
});

この関数の引数はwritable.write関数の引数と同じです。ただし、第1引数は指定しなくても構いません。

書き込みストリームを作成し、ストリームが開いている状態では何度でもwritable.write関数を使用してストリームへのデータの書き込みを行うことができますが、全ての書き込みが終わったらストリームを閉じなければなりません。これを行うのがこの関数です。

なお、writable.end関数が呼び出され、バッファに格納されている未処理のデータが全て処理されるとwritablefinishイベントが発生します。

エラー処理

writableでエラーが発生した場合にはerrorイベントが発生します。

このイベントのイベントハンドラでエラー処理を行います。

writable.on('error', function (err) {
});

読み込みストリーム(readable)

読み込みストリームでは読み込まれたデータを取得することができます。

以後stream.Readableクラスのインスタンスをreadableと表記します。

文字コード

読み込みストリームのデータの文字コードを設定するにはreadable.setEncoding関数を使用します。

var readable = readable.setEncoding('xxx');

第1引数に文字コードを指定します。

返り値として読み込みストリームが得られます。

データの取得

読み込みストリームに読み込まれたデータを取得するには下の3つの方法があります。

  • フロー(flowing)モード
  • ポーズ(paused)モード
  • ストリームの接続(pipe)

フロー(flowing)モード

フローモードの場合、読み込みストリームに読み込まれたデータはイベントの発生により自動的に提供されます。

つまり、読み込みストリームに読み込まれたデータを取得する側は読み込みストリームのイベントにイベントハンドラを登録することにより読み込みストリームのタイミングでデータを取得します。

フローモードでデータを取得するにはreadabledataイベントにイベントハンドラを登録します。

readable.on('data', function (chunk) {
});

このイベントのイベントハンドラの第1引数はデータです。このデータは文字列かバッファです。読み込みストリームの文字コードが設定されている場合には文字列となり、設定されていない場合にはバッファとなります。ただし、ストリームによっては文字列やバッファ以外のデータである場合もあります。

また、どれだけのデータが読み込まれてからdataイベントが発生するかもストリームの実装に依存しています。

データ取得の完了を検知するにはreadableendイベントにイベントハンドラを登録します。

readable.on('end', function () {
});

ポーズ(paused)モード

ポーズモードの場合、読み込みストリームに読み込まれたデータはメソッドの呼び出しにより能動的に取得されなければなりません。

つまり、読み込みストリームに読み込まれたデータを取得する側は読み込みストリームのメソッドを呼び出すことにより取得する側のタイミングでデータを取得します。

ポーズモードで未取得のデータを取得するにはreadable.read関数を使用します。

var chunk = readable.read(16);

第1引数にデータを取得するバイト数を指定します。この引数は指定しなくても構いません。

返り値としてデータが得られます。このデータは文字列かバッファです。読み込みストリームの文字コードが設定されている場合には文字列となり、設定されていない場合にはバッファとなります。ただし、ストリームによっては文字列やバッファ以外のデータである場合もあります。

また、読み込みストリームに読み込まれた未取得のデータが存在しない場合にはnullが返ります読み込まれた未取得のデータの長さが第1引数のバイト数に満たない場合にもnullが返りますデータ取得が完了している場合にもnullが返ります

ただし、読み込みストリームに全てのデータが読み込まれた後にこの関数を初めて呼び出した場合には全ての未取得のデータが返りますので注意してください。

データ取得が可能となったことを検知するにはreadablereadableイベントにイベントハンドラを登録します。

readable.on('readable', function () {
});

データ取得の完了を検知するにはreadableendイベントにイベントハンドラを登録します。

readable.on('end', function () {
});

ストリームの接続(pipe)

読み込みストリームは書き込みストリームに接続することができます。この場合、読み込みストリームに読み込まれたデータがそのまま書き込みストリームに書き込まれることになります。

なお、読み込みストリームは複数の書き込みストリームに接続することもできます。

ストリームを接続するにはreadable.pipe関数を使用します。

var writable = readable.pipe(writable);

第1引数に書き込みストリームを指定します。

返り値として書き込みストリームが得られます。

なお、readable.pipe関数が呼び出され、読み込みストリームが書き込みストリームに接続されると書き込みストリームにおいてpipeイベントが発生します。このイベントのイベントハンドラの第1引数は読み込みストリームです。

ストリームを接続解除するにはreadable.unpipe関数を使用します。

readable.unpipe(writable);

第1引数に書き込みストリームを指定します。

なお、readable.unpipe関数が呼び出され、読み込みストリームが書き込みストリームから接続解除されると書き込みストリームにおいてunpipeイベントが発生します。このイベントのイベントハンドラの第1引数は読み込みストリームです。

エラー処理

readableでエラーが発生した場合にはerrorイベントが発生します。

このイベントのイベントハンドラでエラー処理を行います。

readable.on('error', function (err) {
});

関連

pizyumi
プログラミング歴19年のベテランプログラマー。業務システム全般何でも作れます。現在はWeb系の技術を勉強中。
スポンサーリンク

-Node.js