Fetch Upload Streaming でチャットアプリを作ってみる
Fetch Upload Streaming とは、ブラウザの JavaScript の POST リクエストで HTTP のストリーミングを行える機能です。より具体的には、Fetch API の `body` に ReadableStream を渡せるようになります。
Fetch Upload Streaming とは
Fetch Upload Streaming とは、ブラウザの JavaScript の POST リクエストで HTTP のストリーミングを行える機能です。より具体的には、Fetch API の body
に ReadableStream を渡せるようになります。
const stream = new ReadableStream({})
fetch(`/send?room=${roomId}`, {
method: "POST",
body: stream,
duplex: "half",
});
かねてより Fetch API ではレスポンスを ReadableStream
で取得できたのですが、反対にリクエスト時に body
に Stream データを渡すことはできませんでした。Chrome 105 から有効となります。Fetch Upload Streaming を使用すれば、WebSocket のようなリアルタイムなデータのやり取りも Fetch API
だけで可能となります。今回は Fetch API
を利用してリアルタイムなチャットアプリを作成してみます。
コードの内容は以下を参照してください。
クライアントサイド
まずはクライアント側のコードから見ていきましょう。クライアント側のコードは static
配下に作成します。はじめに input
の入力値やフォームの submit
イベントを捕捉するために document.querySelector
で DOM 要素を取得しています。チャットアプリなので roomId
も必要です。roomId
はクエリパラーメタから取得して存在しない場合にはアラートを出して処理を終了しています。
const input = document.querySelector("#input");
const form = document.querySelector("#form");
const messages = document.querySelector("#messages");
const quit = document.querySelector("#quit");
const roomId = new URLSearchParams(window.location.search).get("roomId");
if (!roomId) {
alert("No roomId given");
return;
}
stream をアップロード
stream をアップロードするために ReadableStream
インスタンスを作成します。ReadableStream() コンストラクター の start
メソッドを利用して stream を生成します。
const stream = new ReadableStream({
start(controller) {
form.addEventListener("submit", (event) => {
event.preventDefault();
const message = input.value;
input.value = "";
controller.enqueue(message);
});
quit.addEventListener("click", () => controller.close());
},
}).pipeThrough(new TextEncoderStream());
form
が submit されたとき、input
の入力値を取得します。controller.enqueue
メソッドで stream に文字列を追加します。stream の生成を停止するためには controller.close()
を呼び出します。これは「quit」ボタンをクリックしたときに呼び出すようにしています。
stream は最終的に ReadableStream.pipeThrough() により別の形式に変換されます。ここでは TextEncoderStream() により文字列の stream を UTF=8 エンコーディングでバイトに変換されます。
作成した ReadableStream
インスタンスは fetch
の body
パラメータとして設定されます。
fetch(`/send?room=${roomId}`, {
method: "POST",
headers: { "Content-Type": "text/plain" },
body: stream,
duplex: "half",
});
stream を使用するためには duplex: "half"
を設定する必要があります。HTTP の機能では「リクエストを送信している間にレスポンスを受信し始めることができるかどうか」というものがあります。duplex: "half"
はリクエストボディが完全に送信されるまでレスポンスは受信できません。これはブラウザのデフォルトのパターンです。
しかし、例えば Deno の Fetch などの実装はリクエストが庵寮する間にレスポンスが利用可能となる deplex: "full"
がデフォルトとなっています。
このような互換性の問題を回避するために duplex: "half"
をリクエストに必ず設定する必要があるのです。
stream を読み込む
Fetch API で res.body
を ReadableStream
として取得します。
fetch(`/receive?room=${roomId}`).then(async (res) => {
const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
while (true) {
const { done, value } = await reader.read();
if (done) return;
const newMessage = document.createElement("li");
messages.appendChild(newMessage);
newMessage.innerText = `${newMessage.innerText}${value}`;
}
});
stream を読むためにはリーダーを取り付ける必要があります。はじめに res.body.pipeThrough(new TextDecoderStream())
で文字列の stream を UTF=8 エンコーディングでバイトに変換します。変換後のデータは getReader()
を呼び出す音でリーダーが作成され、stream にロックされます。このリーダーが開放されるまで、他のリーダーはこの stream を読み取ることができません。
リーダーを取り付けたら ReadableStreamDefaultReader.read() メソッドを使用して stream から chunk を読み取ることができます。chunk は value
プロパティから取得できます。done
が true
を返した場合には、stream が閉じられたことを意味します。
取得した chunk を元に li
要素を作成して新たに DOM に追加することで取得したメッセージを表示できます。
Fetch Upload Streaming が有効かどうかの判定
Fetch Upload Streamig をブラウザがサポートしているかどうか判定するために、ちょっぴり奇妙なコードを使用します。
const supportsRequestStreams = (() => {
let duplexAccessed = false;
const hasContentType = new Request('', {
body: new ReadableStream(),
method: 'POST',
get duplex() {
duplexAccessed = true;
return 'half';
},
}).headers.has('Content-Type');
return duplexAccessed && !hasContentType;
})();
if (!supportsRequestStreams) {
// …
}
もしブラウザが Fetch Upload Streaming をサポートしていない場合、body
に ReadableStream
のインスタンスを渡した際に ReadableStream
のインスタンスの toString()
メソッドが呼ばれます。その結果 body
には '[object ReadableStream]'
という string
型が指定されることになります。body
に string
型が渡された場合には、Content-Type
ヘッダーに text/plain;charset=UTF-8
という値が自動的に設定されます。つまりは、Content-Type
ヘッダーが設定されている(headers.has('Content-Type')
が true
)ときにはブラウザが Fetch Upload Streaming をサポートしていないと判定できます。
Safari は少々やっかいで、body
に stream を指定することをサポートしているものの、Fetch API において使用することは許可されていません。そのため、現在 Safari がサポートしていない duplex
オプションが有効かどうかで判定しています。
HTML
HTML コードはシンプルなので、特に説明する必要もないでしょう。
<h1>Chat via fetch upload streaming</h1>
<form id="form">
<input type="text" id="input" placeholder="Message">
<button type="submit" id="send">Send</button>
</form>
<ul id="messages"></ul>
<button id="quit">Quit</button>
サーバーサイド
Fetch Upload Streaming には制限があり HTTP/1.x で利用しようとすると ERR_H2_OR_QUIC_REQUIRED
というエラーで失敗します。これは HTTP/1.1 の規則ではリクエストとレスポンスのボディは Content-Length
ヘッダーを送信して相手側が受け取るデータの量を知るか、メッセージのフォーマットを変更して chunked エンコードを使用する必要があるためです。
HTTP/1x での利用を有効にする allowHTTP1ForStreamingUpload
フラグは Chrome の実験的なバージョンでのみ利用可能です。
https://web.dev/i18n/zh/fetch-upload-streaming/#%E9%BB%98%E8%AE%A4%E4%BB%85%E9%99%90-http2
つまりはサーバーのコードは HTTP/2 または HTTP/3 で実装する必要があります。
プロジェクトのセットアップ
サーバー側のコードは Express で実装します。spdy は Node.js で HTTP/2 サーバーを実装するためのモジュールです。
npm install express spdy
spdy
を使用するためにはサーバー証明書が必要なので openssl
コマンドで作成します。
openssl req -x509 -sha256 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt
package.json
にサーバーを起動するコマンドを追加します。
{
"scripts": {
"start": "node server.js"
},
}
サーバーコードの実装
server.js
ファイルを作成して次のようにコードを実装します。
import express from "express";
import spdy from "spdy";
import fs from "fs";
const app = express();
const receivers = new Map();
app.post("/send", (req, res) => {
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
res.status(200);
req.on("data", (chunk) => {
const set = receivers.get(room);
if (!set) return;
for (const res of set) res.write(chunk);
});
req.on("end", (chunk) => {
if (res.writableEnded) return;
res.send("Ended");
});
});
app.get("/receive", (req, res) => {
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
if (!receivers.has(room)) {
receivers.set(room, new Set());
}
receivers.get(room).add(res);
res.on("close", () => {
const set = receivers.get(room);
set.delete(res);
if (set.size === 0) receivers.delete(room);
});
res.status(200);
res.set("Content-Type", "text/plain");
});
app.use(express.static("static"));
const port = process.env.PORT || 3000;
spdy
.createServer(
{
key: fs.readFileSync("./server.key"),
cert: fs.readFileSync("./server.crt"),
},
app
)
.listen(port, (err) => {
if (err) {
console.error(err);
return;
}
console.log(`Listening on port ${port}`);
});
リクエストの送信
まずはリクエストを送信する /send
パスのコードを見ていきます。クエリパラメータから room
ID を取得します。
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
res.status(200);
req.on("data")
で stream data イベントを購読します。クライアントから controller.close()
で接続が閉じられた場合には req.on("end")
イベントがコールされます。
stream data を受信したとき、receivers
Map オブジェクトから room
ID を使用して対応する /revice
パスの res
オブジェクトに対して受信した chunk データを送信します。
req.on("data", (chunk) => {
const set = receivers.get(room);
if (!set) return;
for (const res of set) res.write(chunk);
});
req.on("end", (chunk) => {
if (res.writableEnded) return;
res.send("Ended");
});
リクエストの受信
/receive
パスにクライアントから接続してリクエストの受信を実施します。/send
パスと同様にクエリパラメータから room
ID を取得します。
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
receivers
Map オブジェクトの対応する room
ID に res
オブジェクトをセットします。res
オブジェクトは /send
パスから chunk データを送信するために使用されます。
if (!receivers.has(room)) {
receivers.set(room, new Set());
}
receivers.get(room).add(res);
HTTP/2 サーバーの作成
最後に spdy
モジュールを使用して HTTP/2 サーバーを作成します。createServer
メソッドにはプロジェクトのセットアップ時に作成したサーバー証明書と Express
の app
オブジェクトを設定します。後は listen
メソッドで 3000 ポートを指定しています。
spdy
.createServer(
{
key: fs.readFileSync("./server.key"),
cert: fs.readFileSync("./server.crt"),
},
app
)
.listen(port, (err) => {
if (err) {
console.error(err);
return;
}
console.log(`Listening on port ${port}`);
});
感想
やっていること自体は WebSocket で実現できることとほとんど変わらないのですが、HTTP の技術のみで実装でいるのが嬉しいですね。WebSocket 用のサーバー建てるのって結構面倒だったりしますしね。