ストリームの手引き
このドキュメントはストリームを使用したnode.jsプログラムの基本的な書き方の手引書になります。
- 写本
- イントロダクション
- 何故ストリームを使用するべきなのか
- 基本
- 組み込みストリーム
- 制御ストリーム
- メタストリーム
- ステートストリーム
- HTTPストリーム
- IOストリーム
- パーサーストリーム
- ブラウザストリーム
- HTMLストリーム
- オーディオストリーム
- RPCストリーム
- テストストリーム
- パワーコンボ
写本
この手引書は、次のようにnpmのコマンドを実行することでインストールすることが可能です。
npm install -g stream-handbook
これでstream-handbook
コマンドを打てば、$PAGER
にこのReadmeファイルが開くようになりました。
もしくは、このままこのドキュメントを読み進めても構いません。
イントロダクション
"別の方式でデータを扱う必要が生じた際に、庭の水やりに使うホース・スクリューのように、 我々はもう1つのセグメントに接続する方法を持つべきである。 これはIOにも言えることである。" "We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also."
Doug McIlroy. October 11, 1964
ストリームはUNIXの初期の段階から導入されており、
これ自身が1つのことを上手に行う小さなコンポーネントが、
巨大なシステムを構築するうえで信頼できる方法であることを何十年にもわたって証明しています。
UNIXでは、ストリームは|
パイプのシェルによって実装されます。
nodeでは組み込みのStreamモジュールはコア・ライブラリによって使用されており、
またuser-spaceモジュールにも使用されています。
UNIXと類似して、nodeストリームの主な組み立て演算子は.pipe()
で呼び出されることで、
重い処理の書き出しの詰まりを開放するためのバック・プレッシャーの仕組みを利用することが出来ます。
ストリームは再利用可能な一定のインターフェースとして表面の実装を制限してくれるため、 関心の分離の手助けになります。 これにより、1つのストリーム出力を別の入力に繋げ、 ストリームで各々の処理を実行するライブラリを使用して、高階層のフロー制御を構築します。
ストリームは小規模プログラム設計とUNIX哲学における重要な要素ですが、 他にも検討に値する多くの重要な抽象化が存在します。 技術的負債は敵であり、 問題に対処するために最善の抽象化を手探りで模索することを忘れないで下さい。
何故ストリームを使用するべきなのか
nodeのI/Oは非同期であるため、ディスクとネットワークの相互通信はコールバック関数の受け渡しになります。 あなたは、ディスクからファイルを提供するコードを次のように書こうとするかもしれません。
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);
このコードは動作はするものの、コード自体が扱いづらく、
各リクエスト毎にクライアントに結果を返す前にdata.txt
ファイルを丸ごとメモリにバッファしていしまいます。
もし、data.txt
のサイズが非常に大きい場合、
ユーザーが増え始めると、このプログラムは多くのメモリを消費し始める可能性があり、
特に接続の遅いユーザーに対してそれが顕著に現れるでしょう。
コンテンツ受信開始前にサーバー上でファイルがメモリにバッファされるまで待たされることになるため、 ユーザー体験の損失は計り知れないものになるでしょう。
幸いなことに(req, res)
のパラメータは両方ともストリームです。
これは、我々がfs.readFile()
の代わりにfs.createReadStream()
を使用して、
より良い方法で書き直すことが出来ることを意味します。
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);
ここでの.pipe()
はfs.createReadStream()
からの、
'data'
と'end'
イベントをリッスンします。
このコードはただ綺麗になっただけではなく、
ディスクからdata.txt
ファイルを受け取ると、即座にクライアントへ書き込みます。
.pipe()
を使用するメリットはそれだけではなく、
自動的にバックプレッシャーのような扱いをするため、
リモートクライアントからの接続がとても遅く、遅延接続している際に、
不必要にメモリにバッファの固まりを入れるようなことはしません。
ファイルが圧縮したい場合は、それを行うためのストリーミング・モジュールを使用することが出来ます。
var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);
これで、gzipまたはdeflateをサポートしているブラウザでファイルが圧縮されるようになります。 oppressorに、 あらゆるエンコーディングを扱わせることが可能です。
一度ストリームAPIを習得してしまえば、不安定な非ストリーミング・カスタムAPIを通してデータをプッシュする方法を覚える代わりに、 これらストリーミング・モジュールをLEGOブロックや庭のホースのように組み立てることが可能になります。
ストリームはnodeのプログラミングをシンプルに、エレガントに、そして構成可能(コンポーサブル)なものにしてくれます。
基本
ストリームには、読み込み、書き込み、変換、二重(duplex)、"クラシック"(classic)の5種類が存在します。
pipe
全ての異なる種類のストリームで、.pipe()
は入力と出力のペアで使用されます。
.pipe()
は読み込み可能なソースのストリームであるsrc
を取得し、
遷移先の書き込み可能なストリームであるdst
に出力をフックします。
src.pipe(dst)
.pipe(dst)
はdst
を返すため、
複数の.pipe()
呼び出しを繋げることが可能です。
a.pipe(b).pipe(c).pipe(d)
これは、下記のようにしたのと同じになります。
a.pipe(b);
b.pipe(c);
c.pipe(d);
これは、シェルの代わりにノードを使用することを除いて、 コマンドライン上でプログラムをパイプすることと非常に似ているかもしれません。
a | b | c | d
読み込みストリーム
読み込みストリームは.pipe()
の呼び出しによって、
書き込み、変換、二重(duplex)の各ストリームが受け入れ可能なデータを生み出します。
readableStream.pipe(dst)
読み込みストリームの作成
それではここで読み込みストリームを作成してみましょう。
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop
rs.push(null)
は、rs
のデータ出力が完了したことを伝えます。
ここで気をつけなければいけないことは、process.stdout
をパイプする前に、
読み込みストリームへpushをしていますが、それでも完了メッセージが書き出されることです。
ただし、データがバッファされることを完全に回避し、要求された場合にのみデータを生成したいような状況では、 より好ましい挙動であると言えます。
._read
関数を定義することで、pushすることも可能です。
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
$ node read1.js
abcdefghijklmnopqrstuvwxyz
ここでは、'a'
から'z'
までをpushしていますが、
それを行うのは読み込み準備が整ってからになります。
また、_read
関数は第1引数に一時的なサイズのパラメータも取得します。
これには読み込みに必要とされるバイト数を指定することになりますが、
読み込みストリームは必要に応じてそのサイズを無視する可能性もあります。
util.inherits()
を使用して、読み込みストリームのサブクラスを作ることも可能ですが、
このアプローチは理解しやすいものではありません。
リクエストがあった時にだけ_read
関数が呼び出されることを確認出来るように、
読み込みストリームのコードに遅延処理を追加しました。
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);
このプログラムを実行すると、5バイト出力をした時にだけ、
_read()
が5回だけ呼び出されることを確認できます。
$ node read2.js | head -c5
abcde
_read() called 5 times
パイプを閉じるために、OSが適切なシグナルを送信する時間が必要となるため、setTimeout
の遅延が必須となります。
また、head
がそれ以上プログラムの出力を行わない場合に、
OSがprocess.stdout
上でEPIPEエラーを発行するSIGPIPEをプロセスに送信するため、
process.stdout.on('error', fn)
ハンドラも必須となります。(翻訳に自信なし)
これらは外部OSとのパイプを連結する際の、避けては通れない問題ではありますが、 全てを直接nodeのストリームで接続していれば自動化されます。
もし、ただの文字列とバッファに代わる任意の値をpushする読み込みストリームを作成したいのであれば、
Readable({ objectMode: true })
を使用した読み込みストリームを作成してください。
読み込みストリームの消費
読み込みストリームを別の種類のストリーム、 またはthroughや、 concat-streamのようなモジュールで作成されたストリームにパイプするだけであれば、 ほとんどのケースで苦労することはありませんが、時折、読み込みストリームを直接消費する方が便利なケースも存在します。
process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null
データが利用可能であれば'readable'
イベントが発火するので、
.read()
を呼び出すことで、バッファからデータを取得することが可能です。
ストリームが完了すると、それ以上取得するバイトが存在しないため、.read()
はnull
を返します。
n
バイトのデータを返すように、.read(n)
と指定することも可能です。
指定するバイト数は単なる指針であり、オブジェクトのストリームに作用を及ぼすものではありませんが、
コア・ストリームの全てがそれをサポートします。
下記の例は、.read(n)
を使用して3バイトの固まりでstdinにバッファしています。
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});
この例を実行すると、不完全な結果を得ることになります。
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
これは余計なデータが内部バッファに残るためで、
nodeに対して既に読み込んだ3バイトのデータの他に読み込みデータがあり、
残っているものを"追い出す"ように指示する必要があります。
.read(0)
を使うと、簡単にこれを行うことができます。
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});
このようにすれば、期待通り3バイトの固まりで動作するようになります。
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>
.read()
が必要以上のデータを渡してきた場合に、
.unshift()
を使用してデータを引き戻すことも出来るので、
同じ読み込みロジックを実行します。
.unshift()
を使用することで、不要なバッファのコピーを防いでくれます。
下記は、改行を切り離す読み込みパーサーの例になります。
var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'
ただし、npmにはsplitのようなモジュールが存在するため、 自分でパーサーのロジックを作る代わりに、これらを使用するのが良いでしょう。
書き込みストリーム
書き込みストリームは、.pipe()
対象にすることが出来るストリームであり、
その逆はありません。
src.pipe(writableStream)
書き込みストリームの作成
._write(chunk, enc, next)
関数を定義することで、
読み込みストリームをパイプすることが出来ます。
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
1つ目の引数は書き込みデータの固まりになります。
2つ目の引数のenc
はエンコーディングの文字列を指定しますが、
opts.decodeString
がfalseで、文字列書き込みの場合に限定されます。
3つ目の引数であるnext(err)
は、書き込むデータが他にもあることを伝えるコールバックになります。
任意でエラーオブジェクトであるerr
を渡すことが可能で、
ストリームのインスタンス上で'error'
イベントを発行します。
書き込むための文字列から読み込みストリームをパイプしている場合は、
Writable({ decodeStrings: false })
を使用した書き込みストリームの作成無しに、
Buffers
へ変換されます。
書き込み(write)オブジェクトから読み込みストリームをパイプしている場合は、
Writable({ objectMode: true })
を使用して書き込みストリームを作成してください。
書き込みストリームへの書き込み
書き込みストリームへ書き込むには、
データ(data)を.write(data)
として呼び出すだけです。
process.stdout.write('beep boop\n');
書き込みストリームに書き込みの完了を伝えるには、.end()
を呼び出します。
.end(data)
と指定することで、終了直前にデータを書き込むことも可能です。
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
$ node writing1.js
$ cat message.txt
beep boop
opts.highWaterMark
オプションがWritable()
に渡され、
入ってくるバッファがそれよりも大きいデータ際に、
高水準のパフォーマンスとバッファリングが求められるのであれば、
.write()
はfalseを返すようにしてください。
もし再びバッファが空になるまで待機したい場合は、'drain'
イベントをリッスンします。
変換(Transform)ストリーム
変換(Transform)ストリームは、二重のストリーム(読み込みと書き込みの両方)タイプです。 変換ストリームの特筆すべき点は、入力が何らかの算出を経て出力されるという点です。
あなたは既に"through streams"を通じて、変換ストリームという名を耳にしたことがあるかもしれません。
"through streams"はシンプルな読み込み/書き込みのフィルターで、 入力を変換して出力を生み出します。
二重(duplex)ストリーム
二重ストリームは読み込み/書き込みの役割を果たし、 電話のように前後にメッセージをやり取りして、 双方向の対話を噛み合わせます。 RPC通信は二重ストリームの分かりやすい例です。 あなたは、次のような処理を何度か見たことがあるでしょう。
a.pipe(b).pipe(a)
あなたは、おそらく既に二重ストリームを取り扱っているのではないでしょうか。
クラシックストリーム
クラシックストリームはnode 0.4で初めて実装された古いインターフェースです。 あなたは、おそらく長期間にわたり、この形式のストリームに遭遇することになるでしょう。 そのため、これらの挙動を把握しておくことは悪いことではありません。
ストリームが登録された"data"リスナーを持っていれば、 古いAPIの挙動に沿った"classic"モードに切り替えられます。
クラシック・読み込みストリーム
クラシック読み込みストリームは、対象(消費者)のためのデータを持つ際に"data"イベントを発行し、 対象(消費者)のためのデータを生成し終えた際に"end"イベントを発行する単なるイベントの発行者です。
.pipe()
は、stream.readable
の真偽値から、クラシックストリームが読み込み可能か否かを確認します。
下記はAからJまでを出力する、非常にシンプルな読み込みストリームの例になります。
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ
クラシック読み込みストリームから読み取るために、"data"と"end"リスナーを登録します。
下記は古い読み込みストリーム形式を使用して、process.stdin
を読み取るサンプルになります。
process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
"data"リスナーが登録されていると、ストリームが互換性モードになるため、 新しいstreams2のAPIの利点を失うことに注意してください。
あなた自身が、今以上に"data"と"end"ハンドラを登録して増やすようなことは、絶対にするべきではありません。
もし、古いストリームと対話する必要があるのであれば、
可能な限り.pipe()
できるライブラリを使用するべきです。
例えばthroughを使用して、 明示的な"data"と"end"リスナーのセットアップを避けることができます。
var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
またはconcat-streamを使用して、 ストリームの内容全体をバッファします。
var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js
{ beep: 'boop' }
クラシック読み取りストリームは、一時的にストリームを停止するための.pause()
と.resume()
ロジックを持ちますが、
これは単なる警告(勧告)にすぎませんでした。
もしクラシック読み取りストリームで.pause()
と.resume()
を使用する場合は、
自身で書き込みをする代わりに、バッファを扱うthroughを使用するべきです。
クラシック書き込みストリーム
クラシック書き込みストリームは非常にシンプルです。
.write(buf)
、.end(buf)
、.destroy()
を定義するだけです。
.end(buf)
はbuf
を取得しても、しなくても構いませんが、
nodeユーザーはstream.end(buf)
はstream.write(buf)
を意味するだろうと期待するため、
stream.end()
はその期待を裏切るべきではありません。
read more
- core stream documentation
-
readable-streamモジュールを使用することで、
node 0.8とそれ以下のバージョンに準拠するstreams2のコードを作ることが可能です。
npm install readable-stream
を実行した後に、require('stream')
の代わりにrequire('readable-stream') と書くだけです。
組み込みストリーム
これらのストリームはnode自身に組み込まれています。
process
child_process.spawn()
fs
net
http
zlib
制御ストリーム
メタストリーム
ステートストリーム
HTTPストリーム
IOストリーム
パーサーストリーム
ブラウザストリーム
HTMLストリーム
オーディオストリーム
RPCストリーム
テストストリーム
パワーコンボ
Licensed under the Creative Commons Attribution License 3.0.
このページは、ページトップのリンク先のGitHubのsubstack氏のstream-handbookのページを翻訳した内容を基に構成されています。 下記の項目を確認し、必要に応じて公式のドキュメントをご確認ください。 もし、誤訳などの間違いを見つけましたら、 @tomofまで教えていただければ幸いです。
- 元のコンテンツと比べてドキュメントの情報が古くなっている可能性があります。
- "訳注:"などの断わりを入れた上で、日本人向けの情報やより分かり易くするための追記を行っている事があります。