クラスタリングされたリアルタイムなチャットサーバーをクリーンアーキテクチャ風に作るサンプル その1 設計 (全2篇) | Scala, Playframework, Akka, ZIO
目次:
その2:
はじめに
この記事ではScala1界隈ではおなじみPlayframework2, Akka3と、最近話題(?)の関数型非同期処理ライブラリのZIOを使ってクラスタリングされたチャットサーバーを試しに作ったのでその知見のまとめです。 なお、タイトルにAkkaを含んでいますがサンプルコードにはアクターの実装は一切ありません。zio-akka-clusterというサードパーティライブラリにアクター周りの実装を委譲しています。データの永続化にはSlick、データベースはMySQL (テストではH2のMySQLモード)を使用しています。 設計はクリーンアーキテクチャに擬えたようなものにしています。 サンプルコードは以下にあります。 github.com
動作のイメージ
以下は、簡単にサーバーの挙動をデモできるページを動かした映像です。 左右のブラウザはチャットサーバに対する2つのクライアントです。このクライアント1つにつき2つのサーバー(ノード)に対してWebSocketで接続しています。クライアントからサーバーに対してコメントを送ると、両方のサーバーからリアルタイムに更新情報(JSON)を受けとっています。ページ内の左下が1つ目のサーバー、右下が2つ目のサーバーからの更新情報です。 プロダクション環境ではサーバーとの間にロードバランサーを設置して各ノードを透明化し、トラフィックを分散します。
ネットワーク構成
プロダクション環境のネットワーク構成のイメージが以下です。 各アプリケーションサーバーはAkka Clusterによってクラスタリングされます。Domain Event Busは今回のサンプルコードで登場するモデルで、各ノードで発生したドメインイベントを伝播させる役割を担います。ドメインイベントはこのアプリのユースケースを実行する際に生じたモデル変更などのイベントを表現したものです。ドメインイベントの伝達はAkka Distributed PubSubをベースに構築しています。
Domain Event Busの定義は非常にシンプルなので先に紹介します。
trait DomainEventBus { def publish(domainEvent: DomainEvent): UIO[Unit] def subscribe(): UIO[Queue[DomainEvent]] }
DomainEventBus.scala - GitHub (実装: DomainEventBusByAkka.scala - GitHub)
ドメインイベントを配信するpublish
メソッドと、配信されるドメインイベントを受信するためのsubscribe
メソッドがあります。
UIO[_]
はZIOの非同期処理のモナドで、端的にいうとFuture[_]
のようなものです。
subscribe
メソッドはQueue[DomainEvent]
を返します。このQueue
もZIOのクラスで文字通りキューです。このキューはサブスクライブごとに独立して生成されます。コンシューマーがこのキューにたまったイベントを適宜処理します。今回の実装ではこのQueueに容量制限を設定していませんが、任意の削除ルールでサイズ制限を付与することもできます。今回は、後ほどこのQueueをZIOのStreamに変換する際に制限されたバッファーを設けることで、バックプレッシャー4を制御します。
モジュール・クラス設計
大枠はクリーンアーキテクチャに擬えて、所々もじっています。DDDで登場するAggregate(集約)などの用語も含めています。こういった文脈ではアプリケーションを複数の階層、例えばドメイン層、アプリケーション層、インターフェース・アダプター層、インフラストラクチャー層と区切り依存の方向を1つに制限することが多いです。
今回は大きくドメイン層とインターフェース層をSBTのSub-moduleとして分けました。依存性もインターフェース層 => ドメイン層
と方向付けています。
domainモジュール内ではUse Caseから依存が伸びるように自制してます。
ユースケース・エンドポイント
サンプルで実装したユースケースは2つのみです。それぞれにREST APIとしてのエンドポイントが対応しています。 このサンプルでは主要なドメインモデルとして、Conversation (会話)とそれに対するComment (コメント)があります。
ユースケース | エンドポイント | 対応するクラス |
---|---|---|
会話に対してコメントを送る | POST /v1/comments |
SendComment |
会話の最新情報を見張る | WebSocket /v1/conversation/{会話のキー}/updates |
WatchConversationUpdates |
会話の最新情報の取得はWebSocketですが、クライアントからのコメント送信は従来どおりPOSTリクエストにしています。また、サーバーからWebSocket経由で送信する更新情報は極力リソースID等の最小限のデータにしています。必要に応じてクライアントが完全なデータをGETリクエストなどで再取得する想定です。
上記の理由としては、
- WebSocket通信で送受信するデータは、通信規約を開発者自身が決めないといけないので、極力バリエーションを減らして簡素化したい
- クライアントの利用用途をあまり束縛しない意図もある
- WebSocketのような永続的なエンドポイントと、従来のHTTP通信のステートレスなエンドポイントを分けることでサーバーを分けることもでき、デプロイ戦略も柔軟になる
- リソースの参照は実際、様々なデータの集合体になりえるので、別途のクエリサービス等を実装しそれらにAPIを統一したい
このあたりは通信トラフィックと実装コストのトレードオフだと思うので状況次第で変わりそうです。
続き (実装について):
特に参考にさせていただいた文献
- creators-note.chatwork.com
- speakerdeck.com
- 上2つはサンプルコードも含めかなり参考にさせていただきました。
- ZIO Environment 〜 Tagless Final の後継? - Qiita
- 実装クリーンアーキテクチャ - Qiita
- ZIOのエラー・モデルとエラー処理 - Qiita
他多数
これらにいいねいただけると幸いです。