ストリームの手引き

このドキュメントはストリームを使用したnode.jsプログラムの基本的な書き方の手引書になります。

翻訳元の記事はgulpのドキュメントからリンクされていたGitHub上のsubstack氏のstream-handbookであり、 あくまでgulpをはじめとしたタスクランナーの内部処理で重要な役割を果たすファイル・ストリームについて書かれた手引書(handbook)であり、 gulpのドキュメントの一部というわけではありません。

写本

この手引書は、次のようにnpmのコマンドを実行することでインストールすることが可能です。

npm install -g stream-handbook

これでstream-handbookコマンドを打てば、$PAGERにこのReadmeファイルが開くようになりました。 もしくは、このままこのドキュメントを読み進めても構いません。

イントロダクション

"別の方式でデータを扱う必要が生じた際に、庭の水やりに使うホース・スクリューのように、
我々はもう1つのセグメントに接続する方法を持つべきである。
これはIOにも言えることである。"

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964

Doug mcilroy

ストリームはUNIXの初期の段階から導入されており、 これ自身が1つのことを上手に行う小さなコンポーネントが、 巨大なシステムを構築するうえで信頼できる方法であることを何十年にもわたって証明しています。 UNIXでは、ストリームは|パイプのシェルによって実装されます。 nodeでは組み込みのStreamモジュールはコア・ライブラリによって使用されており、 またuser-spaceモジュールにも使用されています。 UNIXと類似して、nodeストリームの主な組み立て演算子は.pipe()で呼び出されることで、 重い処理の書き出しの詰まりを開放するためのバック・プレッシャーの仕組みを利用することが出来ます。

ストリームは再利用可能な一定のインターフェースとして表面の実装を制限してくれるため、 関心の分離の手助けになります。 これにより、1つのストリーム出力を別の入力に繋げ、 ストリームで各々の処理を実行するライブラリを使用して、高階層のフロー制御を構築します。

ストリームは小規模プログラム設計UNIX哲学における重要な要素ですが、 他にも検討に値する多くの重要な抽象化が存在します。 技術的負債は敵であり、 問題に対処するために最善の抽象化を手探りで模索することを忘れないで下さい。

Brian kernighan

何故ストリームを使用するべきなのか

nodeのI/Oは非同期であるため、ディスクとネットワークの相互通信はコールバック関数の受け渡しになります。 あなたは、ディスクからファイルを提供するコードを次のように書こうとするかもしれません。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

このコードは動作はするものの、コード自体が扱いづらく、 各リクエスト毎にクライアントに結果を返す前にdata.txtファイルを丸ごとメモリにバッファしていしまいます。 もし、data.txtのサイズが非常に大きい場合、 ユーザーが増え始めると、このプログラムは多くのメモリを消費し始める可能性があり、 特に接続の遅いユーザーに対してそれが顕著に現れるでしょう。

コンテンツ受信開始前にサーバー上でファイルがメモリにバッファされるまで待たされることになるため、 ユーザー体験の損失は計り知れないものになるでしょう。

幸いなことに(req, res)のパラメータは両方ともストリームです。 これは、我々がfs.readFile()の代わりにfs.createReadStream()を使用して、 より良い方法で書き直すことが出来ることを意味します。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

ここでの.pipe()fs.createReadStream()からの、 'data''end'イベントをリッスンします。 このコードはただ綺麗になっただけではなく、 ディスクからdata.txtファイルを受け取ると、即座にクライアントへ書き込みます。

.pipe()を使用するメリットはそれだけではなく、 自動的にバックプレッシャーのような扱いをするため、 リモートクライアントからの接続がとても遅く、遅延接続している際に、 不必要にメモリにバッファの固まりを入れるようなことはしません。

ファイルが圧縮したい場合は、それを行うためのストリーミング・モジュールを使用することが出来ます。

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

これで、gzipまたはdeflateをサポートしているブラウザでファイルが圧縮されるようになります。 oppressorに、 あらゆるエンコーディングを扱わせることが可能です。

一度ストリームAPIを習得してしまえば、不安定な非ストリーミング・カスタムAPIを通してデータをプッシュする方法を覚える代わりに、 これらストリーミング・モジュールをLEGOブロックや庭のホースのように組み立てることが可能になります。

ストリームはnodeのプログラミングをシンプルに、エレガントに、そして構成可能(コンポーサブル)なものにしてくれます。

基本

ストリームには、読み込み、書き込み、変換、二重(duplex)、"クラシック"(classic)の5種類が存在します。

pipe

全ての異なる種類のストリームで、.pipe()は入力と出力のペアで使用されます。

.pipe()は読み込み可能なソースのストリームであるsrcを取得し、 遷移先の書き込み可能なストリームであるdstに出力をフックします。

src.pipe(dst)

.pipe(dst)dstを返すため、 複数の.pipe()呼び出しを繋げることが可能です。

a.pipe(b).pipe(c).pipe(d)

これは、下記のようにしたのと同じになります。

a.pipe(b);
b.pipe(c);
c.pipe(d);

これは、シェルの代わりにノードを使用することを除いて、 コマンドライン上でプログラムをパイプすることと非常に似ているかもしれません。

a | b | c | d

読み込みストリーム

読み込みストリームは.pipe()の呼び出しによって、 書き込み、変換、二重(duplex)の各ストリームが受け入れ可能なデータを生み出します。

readableStream.pipe(dst)
読み込みストリームの作成

それではここで読み込みストリームを作成してみましょう。

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);
$ node read0.js
beep boop

rs.push(null)は、rsのデータ出力が完了したことを伝えます。

ここで気をつけなければいけないことは、process.stdoutをパイプする前に、 読み込みストリームへpushをしていますが、それでも完了メッセージが書き出されることです。

ただし、データがバッファされることを完全に回避し、要求された場合にのみデータを生成したいような状況では、 より好ましい挙動であると言えます。

._read関数を定義することで、pushすることも可能です。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);
$ node read1.js
abcdefghijklmnopqrstuvwxyz

ここでは、'a'から'z'までをpushしていますが、 それを行うのは読み込み準備が整ってからになります。

また、_read関数は第1引数に一時的なサイズのパラメータも取得します。 これには読み込みに必要とされるバイト数を指定することになりますが、 読み込みストリームは必要に応じてそのサイズを無視する可能性もあります。

util.inherits()を使用して、読み込みストリームのサブクラスを作ることも可能ですが、 このアプローチは理解しやすいものではありません。

リクエストがあった時にだけ_read関数が呼び出されることを確認出来るように、 読み込みストリームのコードに遅延処理を追加しました。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

このプログラムを実行すると、5バイト出力をした時にだけ、 _read()が5回だけ呼び出されることを確認できます。

$ node read2.js | head -c5
abcde
_read() called 5 times

パイプを閉じるために、OSが適切なシグナルを送信する時間が必要となるため、setTimeoutの遅延が必須となります。

また、headがそれ以上プログラムの出力を行わない場合に、 OSがprocess.stdout上でEPIPEエラーを発行するSIGPIPEをプロセスに送信するため、 process.stdout.on('error', fn)ハンドラも必須となります。(翻訳に自信なし)

これらは外部OSとのパイプを連結する際の、避けては通れない問題ではありますが、 全てを直接nodeのストリームで接続していれば自動化されます。

もし、ただの文字列とバッファに代わる任意の値をpushする読み込みストリームを作成したいのであれば、 Readable({ objectMode: true })を使用した読み込みストリームを作成してください。

読み込みストリームの消費

読み込みストリームを別の種類のストリーム、 またはthroughや、 concat-streamのようなモジュールで作成されたストリームにパイプするだけであれば、 ほとんどのケースで苦労することはありませんが、時折、読み込みストリームを直接消費する方が便利なケースも存在します。

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

データが利用可能であれば'readable'イベントが発火するので、 .read()を呼び出すことで、バッファからデータを取得することが可能です。

ストリームが完了すると、それ以上取得するバイトが存在しないため、.read()nullを返します。

nバイトのデータを返すように、.read(n)と指定することも可能です。 指定するバイト数は単なる指針であり、オブジェクトのストリームに作用を及ぼすものではありませんが、 コア・ストリームの全てがそれをサポートします。

下記の例は、.read(n)を使用して3バイトの固まりでstdinにバッファしています。

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

この例を実行すると、不完全な結果を得ることになります。

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

これは余計なデータが内部バッファに残るためで、 nodeに対して既に読み込んだ3バイトのデータの他に読み込みデータがあり、 残っているものを"追い出す"ように指示する必要があります。 .read(0)を使うと、簡単にこれを行うことができます。

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

このようにすれば、期待通り3バイトの固まりで動作するようになります。

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

.read()が必要以上のデータを渡してきた場合に、 .unshift()を使用してデータを引き戻すことも出来るので、 同じ読み込みロジックを実行します。

.unshift()を使用することで、不要なバッファのコピーを防いでくれます。 下記は、改行を切り離す読み込みパーサーの例になります。

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

ただし、npmにはsplitのようなモジュールが存在するため、 自分でパーサーのロジックを作る代わりに、これらを使用するのが良いでしょう。

書き込みストリーム

書き込みストリームは、.pipe()対象にすることが出来るストリームであり、 その逆はありません。

src.pipe(writableStream)
書き込みストリームの作成

._write(chunk, enc, next)関数を定義することで、 読み込みストリームをパイプすることが出来ます。

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

1つ目の引数は書き込みデータの固まりになります。

2つ目の引数のencはエンコーディングの文字列を指定しますが、 opts.decodeStringがfalseで、文字列書き込みの場合に限定されます。

3つ目の引数であるnext(err)は、書き込むデータが他にもあることを伝えるコールバックになります。 任意でエラーオブジェクトであるerrを渡すことが可能で、 ストリームのインスタンス上で'error'イベントを発行します。

書き込むための文字列から読み込みストリームをパイプしている場合は、 Writable({ decodeStrings: false })を使用した書き込みストリームの作成無しに、 Buffersへ変換されます。

書き込み(write)オブジェクトから読み込みストリームをパイプしている場合は、 Writable({ objectMode: true })を使用して書き込みストリームを作成してください。

書き込みストリームへの書き込み

書き込みストリームへ書き込むには、 データ(data)を.write(data)として呼び出すだけです。

process.stdout.write('beep boop\n');

書き込みストリームに書き込みの完了を伝えるには、.end()を呼び出します。 .end(data)と指定することで、終了直前にデータを書き込むことも可能です。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);
$ node writing1.js
$ cat message.txt
beep boop

opts.highWaterMarkオプションがWritable()に渡され、 入ってくるバッファがそれよりも大きいデータ際に、 高水準のパフォーマンスとバッファリングが求められるのであれば、 .write()はfalseを返すようにしてください。

もし再びバッファが空になるまで待機したい場合は、'drain'イベントをリッスンします。

変換(Transform)ストリーム

変換(Transform)ストリームは、二重のストリーム(読み込みと書き込みの両方)タイプです。 変換ストリームの特筆すべき点は、入力が何らかの算出を経て出力されるという点です。

あなたは既に"through streams"を通じて、変換ストリームという名を耳にしたことがあるかもしれません。

"through streams"はシンプルな読み込み/書き込みのフィルターで、 入力を変換して出力を生み出します。

二重(duplex)ストリーム

二重ストリームは読み込み/書き込みの役割を果たし、 電話のように前後にメッセージをやり取りして、 双方向の対話を噛み合わせます。 RPC通信は二重ストリームの分かりやすい例です。 あなたは、次のような処理を何度か見たことがあるでしょう。

a.pipe(b).pipe(a)

あなたは、おそらく既に二重ストリームを取り扱っているのではないでしょうか。

クラシックストリーム

クラシックストリームはnode 0.4で初めて実装された古いインターフェースです。 あなたは、おそらく長期間にわたり、この形式のストリームに遭遇することになるでしょう。 そのため、これらの挙動を把握しておくことは悪いことではありません。

ストリームが登録された"data"リスナーを持っていれば、 古いAPIの挙動に沿った"classic"モードに切り替えられます。

クラシック・読み込みストリーム

クラシック読み込みストリームは、対象(消費者)のためのデータを持つ際に"data"イベントを発行し、 対象(消費者)のためのデータを生成し終えた際に"end"イベントを発行する単なるイベントの発行者です。

.pipe()は、stream.readableの真偽値から、クラシックストリームが読み込み可能か否かを確認します。

下記はAからJまでを出力する、非常にシンプルな読み込みストリームの例になります。

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ

クラシック読み込みストリームから読み取るために、"data"と"end"リスナーを登録します。 下記は古い読み込みストリーム形式を使用して、process.stdinを読み取るサンプルになります。

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

"data"リスナーが登録されていると、ストリームが互換性モードになるため、 新しいstreams2のAPIの利点を失うことに注意してください。

あなた自身が、今以上に"data"と"end"ハンドラを登録して増やすようなことは、絶対にするべきではありません。 もし、古いストリームと対話する必要があるのであれば、 可能な限り.pipe()できるライブラリを使用するべきです。

例えばthroughを使用して、 明示的な"data"と"end"リスナーのセットアップを避けることができます。

var through = require('through');
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

またはconcat-streamを使用して、 ストリームの内容全体をバッファします。

var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js
{ beep: 'boop' }

クラシック読み取りストリームは、一時的にストリームを停止するための.pause().resume()ロジックを持ちますが、 これは単なる警告(勧告)にすぎませんでした。 もしクラシック読み取りストリームで.pause().resume()を使用する場合は、 自身で書き込みをする代わりに、バッファを扱うthroughを使用するべきです。

クラシック書き込みストリーム

クラシック書き込みストリームは非常にシンプルです。 .write(buf).end(buf).destroy()を定義するだけです。

.end(buf)bufを取得しても、しなくても構いませんが、 nodeユーザーはstream.end(buf)stream.write(buf)を意味するだろうと期待するため、 stream.end()はその期待を裏切るべきではありません。

read more

  • core stream documentation
  • readable-streamモジュールを使用することで、 node 0.8とそれ以下のバージョンに準拠するstreams2のコードを作ることが可能です。 npm install readable-streamを実行した後に、 require('stream')の代わりにrequire('readable-stream')と書くだけです。

組み込みストリーム

これらのストリームはnode自身に組み込まれています。

process

process.stdin

この読み込みストリームは、プログラム用のシステムの標準入力のストリームを含みます。

デフォルトでは停止(pause)されていますが、最初にそれを参照すると、 next tick.resume()が暗黙のうちに呼び出されます。

もしprocess.stdinがttyの場合(tty.isatty()で確認可)、 入力イベントはline-bufferedされます。 process.stdin.setRawMode(true)を呼び出すことで、line-bufferedをオフにすることが可能です。 ただし^C^Dのようなデフォルトのキー連携が削除されます。

process.stdout

この書き込みストリームは、プログラム用のシステムの標準出力を含みます。 標準出力にデータを送りたい場合は、それをwriteします。

process.stderr

この書き込みストリームは、プログラム用のシステムの標準エラー出力を含みます。 標準エラー出力にデータを送りたい場合は、それをwriteします。

child_process.spawn()

fs

fs.createReadStream()
fs.createWriteStream()

net

net.connect()

この関数はリモートホストへTCP越しに接続する[二重ストリーム]を返します。 すぐにストリームを書き込むことが可能で、書き込んだ内容は'connect'イベントが発火するまでバッファされます。

net.createServer()

http

http.request()
http.createServer()

zlib

zlib.createGzip()
zlib.createGunzip()
zlib.createDeflate()
zlib.createInflate()

制御ストリーム

through
from
pause-stream
concat-stream

concat-streamは単一のバッファにストリーム・コンテンツをバッファリングします。 concat(cb)は、ストリームが完了した際に、 バッファされたbodyと一緒に単一のコールバックcb(body)を取得します。

例えば、このプログラムはcs.end()が呼び出される度に、 文字列"beep boop"のbody文字列を持つconcatコールバックが発火されます。 このプログラムはbodyを所得してそれを大文字に変換し、BEEP BOOPを出力します。

var concat = require('concat-stream');

var cs = concat(function (body) {
    console.log(body.toUpperCase());
});
cs.write('beep ');
cs.write('boop.');
cs.end();
$ node concat.js
BEEP BOOP.

ここでのconcat-streamの使用例では、 受け取ったURLエンコード形式のデータを解析し、 フォームのパラメーターをJSON文字列化(stringify)形式で応答します。

var http = require('http');
var qs = require('querystring');
var concat = require('concat-stream');

var server = http.createServer(function (req, res) {
    req.pipe(concat(function (body) {
        var params = qs.parse(body.toString());
        res.end(JSON.stringify(params) + '\n');
    }));
});
server.listen(5005);
$ curl -X POST -d 'beep=boop&dinosaur=trex' http://localhost:5005
{"beep":"boop","dinosaur":"trex"}
duplex
duplexer
emit-stream
invert-stream
map-stream
remote-events
buffer-stream
event-stream
auth-stream

メタストリーム

mux-demux
stream-router
multi-channel-mdm

ステートストリーム

crdt
delta-stream
scuttlebutt

scuttlebuttは、 ノードが仲介者を通してのみ接続され、全データを信頼できるノードが存在しないメッシュトポロジー(P2P型ネットワーク)上で、 ピアツーピアの状態を同期するために使用されます。(翻訳に自信なし)

ネットワーク障壁が異なるノード間で、同じ状態を共有・更新する必要がある場合、 scuttlebuttが提供する分散型ピアツーピアのネットワークは特に便利です。 この種類のネットワーク例として、HTTPサーバーを通して各々にメッセージを送信するブラウザクライアントと、 ブラウザが直接接続することが出来ないバックエンドプロセスの組み合わせが該当するかもしれません。 もう一つのユースケースとして、IPv4アドレスが枯渇している内部ネットワークをまたがるようなシステムも該当するかもしれません。

scuttlebuttは接続されたノード間でメッセージを渡すのにゴシッププロトコルを使用するため、 全てのノード間の状態が、最終的に全ての場所で同じ値に収束する(eventually converge)ことになります。

scuttlebutt/modelインターフェースを使用して、 必要であればどのような種類のネットワークでも、ノードを作成して各々をパイプすることが出来ます。

var Model = require('scuttlebutt/model');
var am = new Model;
var as = am.createStream();

var bm = new Model;
var bs = bm.createStream();

var cm = new Model;
var cs = cm.createStream();

var dm = new Model;
var ds = dm.createStream();

var em = new Model;
var es = em.createStream();

as.pipe(bs).pipe(as);
bs.pipe(cs).pipe(bs);
bs.pipe(ds).pipe(bs);
ds.pipe(es).pipe(ds);

em.on('update', function (key, value, source) {
    console.log(key + ' => ' + value + ' from ' + source);
});

am.set('x', 555);

作成したネットワークは、下記の図のような明確な宛先のないネットワークになります。

a <-> b <-> c
      ^
      |
      v
      d <-> e

ノードaとノードeは直接は接続されていませんが、 このスクリプトを実行すると、

$ node model.js
x => 555 from 1347857300518

ノードbdを経由して見つけた値を、 ノードaが設定することに注目してください。 ここでは全てのノードが同じプロセス内にいますが、 scuttlebuttはシンプルなストリーミング・インターフェースを使用するため、 ノードはどのようなプロセス、サーバーにでも置くことが可能で、文字列データを扱うことが出来れば、 どのようなストリーミング通信でも接続します。(翻訳に自信なし)

下記に、より実践的な例として、ネットワーク越しに値をカウントする例を用意しました。

var Model = require('scuttlebutt/model');
var net = require('net');

var m = new Model;
m.set('count', '0');
m.on('update', function (key, value) {
    console.log(key + ' = ' + m.get('count'));
});

var server = net.createServer(function (stream) {
    stream.pipe(m.createStream()).pipe(stream);
});
server.listen(8888);

setInterval(function () {
    m.set('count', Number(m.get('count')) + 1);
}, 320);

ここでこのサーバーに接続して一定間隔でカウントを更新し、 受け取った更新の値を全て出力するクライアントを作成します。

var Model = require('scuttlebutt/model');
var net = require('net');

var m = new Model;
var s = m.createStream();

s.pipe(net.connect(8888, 'localhost')).pipe(s);

m.on('update', function cb (key) {
    // ネットワークから少なくとも1カウントを取得するまで待機
    if (key !== 'count') return;
    m.removeListener('update', cb);

    setInterval(function () {
        m.set('count', Number(m.get('count')) + 1);
    }, 100);
});

m.on('update', function (key, value) {
    console.log(key + ' = ' + value);
});

このクライアントは、自身のカウンターを開始するために別のものからの更新を待つか、 カウンターをゼロにあわせる必要があるため、少し扱いづらい面があります。

サーバーといずれかのクライアントが動作すると、次のようなシーケンスが表示されるはずです。

count = 183
count = 184
count = 185
count = 186
count = 187
count = 188
count = 189

時折、あるノードが下記のような重複したシーケンスを表示することがあるかもしれません。

count = 147
count = 148
count = 149
count = 149
count = 150
count = 151

これらの値は、scuttlebuttによる、 全てのノードの状態が最終的に一定になるように動作するように作られてきた、 歴史的な背景に基づく解決アルゴリズムの衝突が原因になっています。

この例にあるサーバーは、それに接続したクライアントと同じ権限を持つ、単なる別のノードであることに注意してください。 ここでの"クライアント"と"サーバー"という用語は、状態の同期がどのように進行するかに影響されるものではなく、 単に誰が接続の初期化を始めたのかにすぎません。 このプロパティを持つプロトコルは、よく対称(symmetric)プロトコルと呼ばれます。 dnodeで、 対称(symmetric)プロトコルの別の例を参照してください。

append-only

HTTPストリーム

request
oppressor
response-stream

IOストリーム

reconnect
kv
discovery-network

パーサーストリーム

tar
trumpet
JSONStream

ストリームからJSONデータを解析、文字列化(stringify)するには、このモジュールを使用します。

もし、遅い接続を通して大きなJSONデータを渡す必要がある、 またはゆっくりと取り込む必要のあるJSONオブジェクトがある場合、 このモジュールは到着した増加分のデータを解析できるようにしてくれます。

json-scrape
stream-serializer

ブラウザストリーム

shoe
domnode
sorta
graph-stream
arrow-keys
attribute
data-bind

HTMLストリーム

hyperstream

オーディオストリーム

baudio

RPCストリーム

substack/dnode

dnodeは、 あらゆる種類のストリームを通して、リモート関数を呼び出せるようにしてくれます。 下記は基本的なdnodeサーバーになります。

var dnode = require('dnode');
var net = require('net');

var server = net.createServer(function (c) {
    var d = dnode({
        transform : function (s, cb) {
            cb(s.replace(/[aeiou]{2,}/, 'oo').toUpperCase())
        }
    });
    c.pipe(d).pipe(c);
});

server.listen(5004);

次にサーバーの.transform()関数を呼び出すシンプルなクライアントを作ります。

var dnode = require('dnode');
var net = require('net');

var d = dnode();
d.on('remote', function (remote) {
    remote.transform('beep', function (s) {
        console.log('beep => ' + s);
        d.end();
    });
});

var c = net.connect(5004);
c.pipe(d).pipe(c);

サーバーを起動してクライアントを実行すると、次のように表示されるはずです。

$ node client.js
beep => BOOP

クライアントがサーバーのtransform()関数へ'beep'を送信すると、 サーバーはクライアントのコールバックを結果とともに呼び出してくれるのが確認できますね!

ここでdnodeが提供するストリーミング・インターフェースは二重(duplex)ストリームであるため、 クライアントとサーバーはどちらも、両側からのリクエストとレスポンスを、 お互いにパイプ(c.pipe(d).pipe(c))します。

あなたがコールバックに対して引数を渡せるようにしようとすると、 dnodeは滅茶苦茶なことになるでしょう。 下記は多段階でコールバックを渡す、先程のサーバーをバージョンアップしたものになります。

var dnode = require('dnode');
var net = require('net');

var server = net.createServer(function (c) {
    var d = dnode({
        transform : function (s, cb) {
            cb(function (n, fn) {
                var oo = Array(n+1).join('o');
                fn(s.replace(/[aeiou]{2,}/, oo).toUpperCase());
            });
        }
    });
    c.pipe(d).pipe(c);
});

server.listen(5004);

下記はクライアントのバージョンアップ版になります。

var dnode = require('dnode');
var net = require('net');

var d = dnode();
d.on('remote', function (remote) {
    remote.transform('beep', function (cb) {
        cb(10, function (s) {
            console.log('beep:10 => ' + s);
            d.end();
        });
    });
});

var c = net.connect(5004);
c.pipe(d).pipe(c);

バージョンアップしたサーバーを起動して、クライアントを実行すると、次のような結果を得ることができます。

$ node client.js
beep:10 => BOOOOOOOOOOP

It just works!™

基本的な考え方はオブジェクト内に関数を置いて、もう片方のストリームからそれらを呼び出し、 最後にはその関数は往復を終わらせるために最初に元の関数があった側で消されます。 関数を引数として、対となる関数に渡す際の最善の策は、 それらの関数がもう片方の側で消されることです。

この再帰的に引数の関数を対にしていくアプローチは、 今後"turtles all the way down"()の口火となって知られていくでしょう。(翻訳に自信なし)

訳注: "turtles all the way down"は、 下記サイトで説明されている内容が分かりやすいと考え、ここに引用させていただきました。

基本的な使い方 — deform 0.9.7 documentation

“It’s turtles all the way down” です (訳注: 親亀の上に子亀、孫亀が無数に乗っている様子。 上から下まですべて同じものでできていること)

いずれも関数の戻り値は無視され、オブジェクトの列挙型のプロパティのみがjsonスタイルで送信されます。

It's turtles all the way down!

Turtles all the way down

dnodeはノードまたはブラウザ上でストリームをまたいで動作するため、 どこで定義された関数であっても簡単に呼び出せることができます。 特にバルクデータストリームを並べて制御するために、 mux-demuxを使用して、複合的なrpcストリームを対にする際に便利です。 (翻訳に自信なし)

rpc-stream

テストストリーム

tap
stream-spec

パワーコンボ

分散型の分割耐性チャット(distributed partition-tolerant chat)

append-onlyモジュールは、 scuttlebuttの先頭に追加のみ可能(append-only)な便利な配列を提供し、 これは書き込みの最終的な一定化、別ノードの分散チャットの再現、ネットワーク分割の存続を非常に簡単にしてくれます。

TODO: the rest

roll your own socket.io

我々はsocket.io形式のイベントエミッター(event emitter)APIを、 このドキュメントの最初の方で紹介したライブラリを使用して構築することができます。

まず始めに、shoeを使用してサーバー側の新しいWebソケットのハンドラを作成し、 emit-streamを使用してイベントエミッター(event emitter)を、 ストリームに変換することができます。 オブジェクトのストリームは、オブジェクトのシリアライズをするためにJSONStreamに与えることが可能で、 そこでシリアライズされたストリームはリモートのブラウザにパイプすることが可能です。

var EventEmitter = require('events').EventEmitter;
var shoe = require('shoe');
var emitStream = require('emit-stream');
var JSONStream = require('JSONStream');

var sock = shoe(function (stream) {
    var ev = new EventEmitter;
    emitStream(ev)
        .pipe(JSONStream.stringify())
        .pipe(stream)
    ;
    ...
});

shoeコールバックの内部で、ev関数へイベントを発行することができます。 下記は、一定間隔で異なる種類のイベントを発行しています。

var intervals = [];

intervals.push(setInterval(function () {
    ev.emit('upper', 'abc');
}, 500));

intervals.push(setInterval(function () {
    ev.emit('lower', 'def');
}, 300));

stream.on('end', function () {
    intervals.forEach(clearInterval);
});

最終的にshoeインスタンスは、httpサーバーにバインドされる必要があります。

var http = require('http');
var server = http.createServer(require('ecstatic')(__dirname));
server.listen(8080);

sock.install(server, '/sock');

一方ブラウザ側ではjsonのshoeストリームを解析して、 その結果のオブジェクトのストリームをeventStream()に渡します。 eventStream()は、サーバー側のイベントを発行するイベントエミッター(event emitter)を返します。

var shoe = require('shoe');
var emitStream = require('emit-stream');
var JSONStream = require('JSONStream');

var parser = JSONStream.parse([true]);
var stream = parser.pipe(shoe('/sock')).pipe(parser);
var ev = emitStream(stream);

ev.on('lower', function (msg) {
    var div = document.createElement('div');
    div.textContent = msg.toLowerCase();
    document.body.appendChild(div);
});

ev.on('upper', function (msg) {
    var div = document.createElement('div');
    div.textContent = msg.toUpperCase();
    document.body.appendChild(div);
});

このブラウザのソースコードをビルドするために、 browserifyを使用してください。 ブラウザ側でこれら全ての素晴らしいモジュールのrequire()が可能になります。

$ browserify main.js -o bundle.js

次にHTMLへ<script src="/bundle.js"></script>を埋め込み、 サーバー側のイベントのストリームをブラウザを通して確認してみてください。

このストリーミングのアプローチを使用することで、 ストリームとの対話方法さえ知っていれば、再利用可能な小さなコンポーネントに依存することができます。 グローバルなイベントシステムであるsoket.ioスタイルを介するルーティングメッセージに代わって、 アプリケーションの機能を細かく分解し、「1つのことを上手に行う」ことにより集中することができます。

例えば、この例でのJSONStreamを、異なるシリアライズを行うstream-serializerに簡単に入れ替えることができます。(翻訳に自信なし) シンプルなストリーミングインターフェースを使用した再接続(reconnect)またはハートビートを扱うための層を、 shoeよりも上の層に固定することができるでしょう。(翻訳に自信なし) 中枢のEventEmitterの代わりに、 eventemitter2による名前空間が割り当てられたイベントを使用するために、 チェーンにストリームを追加することも可能です。

異なる方法で動作する異なるストリームが欲しいのであれば、 そのための分離チャンネルを作成するmux-demuxを通し、 この例でshoeストリームを実行したのと全く同じように行います。(翻訳に自信なし)

時間とともに進化することが必須となるシステムであれば、 独自のフレームワークのアプローチによって必然的に生じてしまう大きなリスクを何度も抱えることなく、 これら各ストリーミングの部品をそれぞれ必要なものに交換することができます。

ブラウザとサーバーのためのHTMLストリーム

クライアントとサーバー用に、同じHTMLレンダリングのロジックを再利用するストリーミング・モジュールを使用することが可能です。 この手法は、インデックス可能でSEOフレンドリーであり、リアルタイム更新を提供してくれます。

レンダラー(renderer)は入力としてjsonの行を取得し、その出力としてHTML文字列を返します。 テキストこそ、万能なインターフェースです!

render.js:

var through = require('through');
var hyperglue = require('hyperglue');
var fs = require('fs');
var html = fs.readFileSync(__dirname + '/static/row.html', 'utf8');

module.exports = function () {
    return through(function (line) {
        try { var row = JSON.parse(line) }
        catch (err) { return this.emit('error', err) }

        this.queue(hyperglue(html, {
            '.who': row.who,
            '.message': row.message
        }).outerHTML);
    });
};

brfsを使用してブラウザのコード用にfs.readFileSync()呼び出しをインライン化し、 hyperglueを使用してCSSセレクタを基にしたHTML更新を行うことが可能です。 ここでは、hyperglueの使用が必ずしも必要というわけではありません。 HTMLの文字列さえ返されれば、どのようなものでも構いません。

使用されるrow.htmlは、下記のように非常にシンプルなものとします。

row.html:

<div class="row">
  <div class="who"></div>
  <div class="message"></div>
</div>

このサーバーは、slice-fileを使用して、 全てがシンプルになるように維持します。 slice-fileは、 tail/tail -fよりも少しだけよく見せるAPIですが、 このインターフェースでは通常の結果とデータベースに上手にマッピングされ、 CouchDBのように変更が加えられます。(翻訳に自信なし)

server.js:

var http = require('http');
var fs = require('fs');
var hyperstream = require('hyperstream');
var ecstatic = require('ecstatic')(__dirname + '/static');

var sliceFile = require('slice-file');
var sf = sliceFile(__dirname + '/data.txt');

var render = require('./render');

var server = http.createServer(function (req, res) {
    if (req.url === '/') {
        var hs = hyperstream({
            '#rows': sf.slice(-5).pipe(render())
        });
        hs.pipe(res);
        fs.createReadStream(__dirname + '/static/index.html').pipe(hs);
    }
    else ecstatic(req, res)
});
server.listen(8000);

var shoe = require('shoe');
var sock = shoe(function (stream) {
    sf.follow(-1,0).pipe(stream);
});
sock.install(server, '/sock');

サーバーの最初の部分で/ルートを処理し、#rowsのdivの中にdata.txtの最後の5行をストリーミングします。

サーバーの2つめの部分では、シンプルなストリーミングWebソケットのPolyfillとなるshoeを使用した、 #rowsのリアルタイム更新の処理を行います。(翻訳に自信なし)

次にshoeからのリアルタイム更新を#rowのdiv内に取り込むシンプルなブラウザのコードを書いてみます。

var through = require('through');
var render = require('./render');

var shoe = require('shoe');
var stream = shoe('/sock');

var rows = document.querySelector('#rows');
stream.pipe(render()).pipe(through(function (html) {
    rows.innerHTML += html;
}));

これを、Browserifybrfsを使用してコンパイルします。

$ browserify -t brfs browser.js > static/bundle.js

これで完了です。それでは、data.txtにちょっとしたデータを入れてみましょう。

$ echo '{"who":"substack","message":"beep boop."}' >> data.txt
$ echo '{"who":"zoltar","message":"COWER PUNY HUMANS"}' >> data.txt

次にサーバーを起動します。

$ node server.js

次にlocalhost:8000を開いて、作成したコンテンツを確認してみましょう。 もし、更にコンテンツを追加すれば、

$ echo '{"who":"substack","message":"oh hello."}' >> data.txt
$ echo '{"who":"zoltar","message":"HEAR ME!"}' >> data.txt

ページがリアルタイム更新によって、自動的に更新されるでしょう。hooray(万歳)!

クライアントとサーバーの両方で全く同じロジックを使用して、 SEOフレンドリーでインデックス可能なリアルタイムでコンテンツを提供することが出来るサーバーを立てることができました。 Hooray(万歳)!

 Back to top

Licensed under the Creative Commons Attribution License 3.0.

このページは、ページトップのリンク先のGitHubのsubstack氏のstream-handbookのページを翻訳した内容を基に構成されています。 下記の項目を確認し、必要に応じて公式のドキュメントをご確認ください。 もし、誤訳などの間違いを見つけましたら、 @tomofまで教えていただければ幸いです。

  • 元のコンテンツと比べてドキュメントの情報が古くなっている可能性があります。
  • "訳注:"などの断わりを入れた上で、日本人向けの情報やより分かり易くするための追記を行っている事があります。