JavaScript

Fetch Upload Streaming でチャットアプリを作ってみる

Fetch Upload Streaming とは、ブラウザの JavaScript の POST リクエストで HTTP のストリーミングを行える機能です。より具体的には、Fetch API の `body` に ReadableStream を渡せるようになります。

Fetch Upload Streaming とは

Fetch Upload Streaming とは、ブラウザの JavaScript の POST リクエストで HTTP のストリーミングを行える機能です。より具体的には、Fetch APIbodyReadableStream を渡せるようになります。

const stream = new ReadableStream({})
 
fetch(`/send?room=${roomId}`, {
  method: "POST",
  body: stream,
  duplex: "half",
});

かねてより Fetch API ではレスポンスを ReadableStream で取得できたのですが、反対にリクエスト時に body に Stream データを渡すことはできませんでした。Chrome 105 から有効となります。Fetch Upload Streaming を使用すれば、WebSocket のようなリアルタイムなデータのやり取りも Fetch API だけで可能となります。今回は Fetch API を利用してリアルタイムなチャットアプリを作成してみます。

fetch-chat

コードの内容は以下を参照してください。

クライアントサイド

まずはクライアント側のコードから見ていきましょう。クライアント側のコードは static 配下に作成します。はじめに input の入力値やフォームの submit イベントを捕捉するために document.querySelector で DOM 要素を取得しています。チャットアプリなので roomId も必要です。roomId はクエリパラーメタから取得して存在しない場合にはアラートを出して処理を終了しています。

const input = document.querySelector("#input");
const form = document.querySelector("#form");
const messages = document.querySelector("#messages");
const quit = document.querySelector("#quit");
 
const roomId = new URLSearchParams(window.location.search).get("roomId");
if (!roomId) {
  alert("No roomId given");
  return;
}

stream をアップロード

stream をアップロードするために ReadableStream インスタンスを作成します。ReadableStream() コンストラクターstart メソッドを利用して stream を生成します。

const stream = new ReadableStream({
  start(controller) {
    form.addEventListener("submit", (event) => {
      event.preventDefault();
      const message = input.value;
      input.value = "";
 
      controller.enqueue(message);
    });
 
    quit.addEventListener("click", () => controller.close());
  },
}).pipeThrough(new TextEncoderStream());

form が submit されたとき、input の入力値を取得します。controller.enqueue メソッドで stream に文字列を追加します。stream の生成を停止するためには controller.close() を呼び出します。これは「quit」ボタンをクリックしたときに呼び出すようにしています。

stream は最終的に ReadableStream.pipeThrough() により別の形式に変換されます。ここでは TextEncoderStream() により文字列の stream を UTF=8 エンコーディングでバイトに変換されます。

作成した ReadableStream インスタンスは fetchbody パラメータとして設定されます。

fetch(`/send?room=${roomId}`, {
  method: "POST",
  headers: { "Content-Type": "text/plain" },
  body: stream,
  duplex: "half",
});

stream を使用するためには duplex: "half" を設定する必要があります。HTTP の機能では「リクエストを送信している間にレスポンスを受信し始めることができるかどうか」というものがあります。duplex: "half" はリクエストボディが完全に送信されるまでレスポンスは受信できません。これはブラウザのデフォルトのパターンです。

しかし、例えば Deno の Fetch などの実装はリクエストが庵寮する間にレスポンスが利用可能となる deplex: "full" がデフォルトとなっています。

このような互換性の問題を回避するために duplex: "half" をリクエストに必ず設定する必要があるのです。

stream を読み込む

Fetch API で res.bodyReadableStream として取得します。

fetch(`/receive?room=${roomId}`).then(async (res) => {
  const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) return;
    const newMessage = document.createElement("li");
    messages.appendChild(newMessage);
    newMessage.innerText = `${newMessage.innerText}${value}`;
  }
});

stream を読むためにはリーダーを取り付ける必要があります。はじめに res.body.pipeThrough(new TextDecoderStream()) で文字列の stream を UTF=8 エンコーディングでバイトに変換します。変換後のデータは getReader() を呼び出す音でリーダーが作成され、stream にロックされます。このリーダーが開放されるまで、他のリーダーはこの stream を読み取ることができません。

リーダーを取り付けたら ReadableStreamDefaultReader.read() メソッドを使用して stream から chunk を読み取ることができます。chunk は value プロパティから取得できます。donetrue を返した場合には、stream が閉じられたことを意味します。

取得した chunk を元に li 要素を作成して新たに DOM に追加することで取得したメッセージを表示できます。

Fetch Upload Streaming が有効かどうかの判定

Fetch Upload Streamig をブラウザがサポートしているかどうか判定するために、ちょっぴり奇妙なコードを使用します。

const supportsRequestStreams = (() => {
  let duplexAccessed = false;
 
  const hasContentType = new Request('', {
    body: new ReadableStream(),
    method: 'POST',
    get duplex() {
      duplexAccessed = true;
      return 'half';
    },
  }).headers.has('Content-Type');
 
  return duplexAccessed && !hasContentType;
})();
 
if (!supportsRequestStreams) {
  // …
}

もしブラウザが Fetch Upload Streaming をサポートしていない場合、bodyReadableStream のインスタンスを渡した際に ReadableStream のインスタンスの toString() メソッドが呼ばれます。その結果 body には '[object ReadableStream]' という string 型が指定されることになります。bodystring 型が渡された場合には、Content-Type ヘッダーに text/plain;charset=UTF-8 という値が自動的に設定されます。つまりは、Content-Type ヘッダーが設定されている(headers.has('Content-Type')true)ときにはブラウザが Fetch Upload Streaming をサポートしていないと判定できます。

Safari は少々やっかいで、body に stream を指定することをサポートしているものの、Fetch API において使用することは許可されていません。そのため、現在 Safari がサポートしていない duplex オプションが有効かどうかで判定しています。

HTML

HTML コードはシンプルなので、特に説明する必要もないでしょう。

<h1>Chat via fetch upload streaming</h1>
<form id="form">
  <input type="text" id="input" placeholder="Message">
  <button type="submit" id="send">Send</button>
</form>
<ul id="messages"></ul>
<button id="quit">Quit</button>

サーバーサイド

Fetch Upload Streaming には制限があり HTTP/1.x で利用しようとすると ERR_H2_OR_QUIC_REQUIRED というエラーで失敗します。これは HTTP/1.1 の規則ではリクエストとレスポンスのボディは Content-Length ヘッダーを送信して相手側が受け取るデータの量を知るか、メッセージのフォーマットを変更して chunked エンコードを使用する必要があるためです。

HTTP/1x での利用を有効にする allowHTTP1ForStreamingUpload フラグは Chrome の実験的なバージョンでのみ利用可能です。

https://web.dev/i18n/zh/fetch-upload-streaming/#%E9%BB%98%E8%AE%A4%E4%BB%85%E9%99%90-http2

つまりはサーバーのコードは HTTP/2 または HTTP/3 で実装する必要があります。

プロジェクトのセットアップ

サーバー側のコードは Express で実装します。spdy は Node.js で HTTP/2 サーバーを実装するためのモジュールです。

npm install express spdy

spdy を使用するためにはサーバー証明書が必要なので openssl コマンドで作成します。

openssl req -x509 -sha256 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt

package.json にサーバーを起動するコマンドを追加します。

{
  "scripts": {
    "start": "node server.js"
  },
}

サーバーコードの実装

server.js ファイルを作成して次のようにコードを実装します。

import express from "express";
import spdy from "spdy";
import fs from "fs";
 
const app = express();
const receivers = new Map();
 
app.post("/send", (req, res) => {
  const room = req.query.room;
  if (!room) {
    res.status(400).send("No room given");
    return;
  }
 
  res.status(200);
 
  req.on("data", (chunk) => {
    const set = receivers.get(room);
    if (!set) return;
    for (const res of set) res.write(chunk);
  });
 
  req.on("end", (chunk) => {
    if (res.writableEnded) return;
    res.send("Ended");
  });
});
 
app.get("/receive", (req, res) => {
  const room = req.query.room;
  if (!room) {
    res.status(400).send("No room given");
    return;
  }
 
  if (!receivers.has(room)) {
    receivers.set(room, new Set());
  }
 
  receivers.get(room).add(res);
 
  res.on("close", () => {
    const set = receivers.get(room);
    set.delete(res);
    if (set.size === 0) receivers.delete(room);
  });
 
  res.status(200);
  res.set("Content-Type", "text/plain");
});
 
app.use(express.static("static"));
 
const port = process.env.PORT || 3000;
spdy
  .createServer(
    {
      key: fs.readFileSync("./server.key"),
      cert: fs.readFileSync("./server.crt"),
    },
    app
  )
  .listen(port, (err) => {
    if (err) {
      console.error(err);
      return;
    }
    console.log(`Listening on port ${port}`);
  });

リクエストの送信

まずはリクエストを送信する /send パスのコードを見ていきます。クエリパラメータから room ID を取得します。

const room = req.query.room;
if (!room) {
  res.status(400).send("No room given");
  return;
}
 
res.status(200);

req.on("data") で stream data イベントを購読します。クライアントから controller.close() で接続が閉じられた場合には req.on("end") イベントがコールされます。

stream data を受信したとき、receivers Map オブジェクトから room ID を使用して対応する /revice パスの res オブジェクトに対して受信した chunk データを送信します。

req.on("data", (chunk) => {
  const set = receivers.get(room);
  if (!set) return;
  for (const res of set) res.write(chunk);
});
 
req.on("end", (chunk) => {
  if (res.writableEnded) return;
  res.send("Ended");
});

リクエストの受信

/receive パスにクライアントから接続してリクエストの受信を実施します。/send パスと同様にクエリパラメータから room ID を取得します。

const room = req.query.room;
if (!room) {
  res.status(400).send("No room given");
  return;
}

receivers Map オブジェクトの対応する room ID に res オブジェクトをセットします。res オブジェクトは /send パスから chunk データを送信するために使用されます。

if (!receivers.has(room)) {
  receivers.set(room, new Set());
}
 
receivers.get(room).add(res);

HTTP/2 サーバーの作成

最後に spdy モジュールを使用して HTTP/2 サーバーを作成します。createServer メソッドにはプロジェクトのセットアップ時に作成したサーバー証明書と Express の app オブジェクトを設定します。後は listen メソッドで 3000 ポートを指定しています。

spdy
  .createServer(
    {
      key: fs.readFileSync("./server.key"),
      cert: fs.readFileSync("./server.crt"),
    },
    app
  )
 .listen(port, (err) => {
    if (err) {
      console.error(err);
      return;
    }
    console.log(`Listening on port ${port}`);
  });

感想

やっていること自体は WebSocket で実現できることとほとんど変わらないのですが、HTTP の技術のみで実装でいるのが嬉しいですね。WebSocket 用のサーバー建てるのって結構面倒だったりしますしね。

参考


Contributors

> GitHub で修正を提案する
この記事をシェアする
はてなブックマークに追加

関連記事