実際に動いているチャットサービス(小規模)のインフラ構成を公開 | AWS イベントソーシング Auth0
はじめに
個人開発で制作し先日(2020/11/08)に初めのバージョンを公開したチャットサービス Someteria - Webサイト横断チャット (Chrome拡張) のインフラ構成を掲載します。 現状の総インストール数は8人(2020/11/10 時点)、生まれたてのサービスなので本格的な商業的サービスで適応できるかは保証しかねます。しかし、ミニマムにリアルタイムなメッセージの送受信基盤を作る際の一例として参考になれば幸いです。
インフラ構成図
概略
- AWSのサーバレスフレームワークのSAMを利用
- Webサーバーの管理は基本しない
- RESTfulなHTTP APIとしてのエンドポイントとWebSocketのエンドポイントがある
- メッセージPush通知にWebSocketを利用
- データストレージはDynamoDBとElasticsearchを利用
- イベントソーシングの考えをベースにしている (後述)
- 認証処理はAuth0 (IDaaSの一つ)を利用
DBまわり, イベントソーシング
書き込み要件と読み込み要件を分離するためにイベントソーシングのパターンを適応し、データソースを分けています。
流れ:
- API Gatewayから書き込み系の処理のLambdaを起動
- Lambdaから書き込みイベントをDynamoDBに保存
- DynamoDB Streamsをトリガーとして以下のLambdaをそれぞれ起動
- Elasticsearchに表示用の非正規化されたデータを反映・構築
- ブラウザに対してWebSocket経由で更新メッセージを送信
- 後ほど、読み込み系の処理のLambdaが表示用データにアクセス
少し回りくどくなりますが、1.の書き込み系の処理を行うLambdaと、3.の読み込み要件を担うLambdaとで関心を分離できます。そのおかげで実際の経験として、当初はブラウザからポーリングして取得していたデータを、サーバーからのプッシュにするようにする変更があったのですが、その際に1.の書き込み処理用のLambdaの変更は一切不要でした。
認証・ユーザ情報周り
認証にはAuth0 (IDaaS)を利用し、Google, TwitterなどのプロバイダーとOpenID Connectのフローにより連携しています。 フロントのアプリケーションが直接Auth0のエンドポイントとやりとりし認証しトークン(JWT)を受け取ります。そのJWTをサーバーサイドに送りアクセス可否を判断します。
Elasticsearchへユーザー情報の同期
Auth0に保存されるユーザー情報(ユーザ名, アイコンURLなど)は、Auth0のログイン時に起動できるフックによってElasticsearchに同期しています。 Auth0上のユーザー情報はAuth0のAPIから直接アクセスもできます。しかしユーザー情報は頻繁にアクセスするのでAPIのレートリミットを超えないようにするためそうしています。
サーバープッシュ WebSocket
サーバーからのプッシュ通知はAPI GatewayのWebSocket APIを使用しています。クライアントがWebSocket APIに接続した際に得られるコネクションIDとメッセージサブスクリプションの情報をDynamoDBに保存します。後ほど、関連する更新イベントが起きた場合にコネクションIDを取得してクライアントに通知できます。
その他特徴
- AWS Lambdaをベースとしたサーバレスアプリなので、サーバーのスケーリングをAWSに委ねられる
- AWS SAMはCloudFormationによってコードベースでインフラを定義できるので、再現性が高い
- ステージング環境などの複製環境をターミナルのコマンドから構築できる
まとめ
いかがだったでしょうか。チャットに限らず、リアルタイムなメッセージパッシングを実装する機会があれば、参考にしていただければと思います。
Web横断チャットサービスSometeriaも是非お試しください。
コマンド1発でAWSに極小構成のWebサーバー(SSL/TLS対応済み)を構築したい
「とりあえず動けばいい。お金払いたくない。ドメインとかSSL証明書設定めんどい。ロードバランサーとかいらん。」という忙しくてケチな人向け。
なにする
SSL/TLS対応済みのWebサーバーの極小の構成をささっと作ります。CloudFormationを使います。
大体の流れをつかめばDockerなどとの組み合わせで、コマンド1発でWebサーバーとそのインフラを構築、削除できます。
今回書いたコード等:
GitHub - ponsea/aws-minimal-webserver: Minimal Web server with SSL/TLS on AWS
インフラ仕様
- サーバーはEC2インスタンス1個のみ
- WebサーバーのURLのホスト名は適当になるがHTTPSで接続できる。(自前でドメインや証明書の用意をする必要無し)
- このようなURLになる =>
https://xxxxxxx.execute-api.ap-northeast-1.amazonaws.com/
- このようなURLになる =>
- サーバーにSSHできる
- 課金は 任意のEC2インスタンスの料金 + Route53のホストゾーン(月50円) + CloudMapのNamespace (月10円) ほど
- 料金面は適宜、公式ページでご確認ください
インフラ構成
最終的なインフラ構成です。
API GatewayのVPCリンクと、プライベート統合機能を用いてリクエストをEC2にバイパスします。 Cloud MapはApiGatewayがEC2インスタンスのIPアドレス/ポート番号を解決するために用います。
前提
今回はCloudFormationを使います。CloudFormationとは端的に言うと、前述のインフラ構成を定義したファイル(yamlまたはjson形式)を用意して実行すると、そのインフラ構成を実際に構築してくれる優れものです。
以下に今回のCloudFormationのテンプレートファイル(インフラ構成を定義したファイル)があります。
GitHub - ponsea/aws-minimal-webserver: Minimal Web server with SSL/TLS on AWS
template.yaml
がインフラ構成の定義ファイルです。
template.yaml
の中ではすでに、自動でDockerのhttpdイメージが80番ポートで実行される様に設定されています。(詳細は後述)
docker run -d -p 80:80 httpd
コマンド1発でインフラを構築する
template.yaml
があるディレクトリで以下のコマンドを実行します。すると実際にインフラ構成が構築されます。
aws cloudformation create-stack \ --stack-name minimal-webserver \ 任意のスタック名 (実際に作成されるAWSリソースの総称のようなもの) --template-body file://template.yaml \ --parameters \ ParameterKey=Ec2InstanceType,ParameterValue=t2.micro \ インスタンスタイプはお好みで(t2.microの部分) ParameterKey=SshKeyName,ParameterValue=<YOUR SSH KEY NAME> \ 予め作っておいたEC2のSSHキーペア名を指定 ParameterKey=SshLocation,ParameterValue=<YOUR SSH LOCATION> SSH元のIPをx.x.x.x/x形式で指定し制限することをお勧めします。省略すれば0.0.0.0/0(制限なし)となります。
構築の様子はAWSのCloudFormationのコンソールで確認できます。ステータスがCREATE_COMPLETE
になれば構築完了です。
構築が完了したあと、EC2インスタンスのパブリックIPとWebサーバーのURL(エンドポイント)を確認します。
aws cloudformation describe-stacks --stack-name minimal-webserver { "Stacks": [ { ... "StackStatus": "CREATE_COMPLETE" ... "Outputs": [ { "OutputKey": "ApiGatewayUrl", ↓ WebサーバーのURL "OutputValue": "https://xxxxxx.execute-api.ap-northeast-1.amazonaws.com/", "Description": "API Gateway endpoint URL for $default stage" }, { "OutputKey": "Ec2InstancePublicIp", ↓ EC2 インスタンスのパブリックIP "OutputValue": "xxx.xxx.xxx.xxx", "Description": "EC2 Instance Public IP" } ], ... ] }
"OutputValue": "https://xxxxxx.execute-api.ap-northeast-1.amazonaws.com/",
のURLにアクセスすれば、Webページが見れます。
現状ではDockerのhttpdイメージの「It works!」というページが表示されるはずです。
SSH接続したい場合
ssh -i "/path/to/your-ssh-key.pem" ec2-user@<YOUR EC2 PUBLIC IP>
全部全部まとめて削除したい場合:
aws cloudformation delete-stack --stack-name minimal-webserver
template.yaml 分析
今回事前に用意したtemplate.yamlで定義したEC2インスタンスは、予めDocker, docker-compose, gitをインストールし、Dockerのhttpdイメージを実行するスクリプトを埋め込んでいます。このスクリプトはEC2インスタンスの初回起動時に実行されます。
UserData: Fn::Base64: !Sub | #!/bin/bash -xe # docker, gitインストール yum install -y docker git # docker-composeインストール curl -L "https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose chmod +x /usr/local/bin/docker-compose systemctl start docker # Dockerデーモン起動 systemctl enable docker # Dockerデーモン自動起動設定 usermod -a -G docker ec2-user # dockerコマンドをec2-userで実行できる様に docker run -d -p 80:80 httpd # httpdイメージを実行
aws-minimal-webserver/template.yaml at master · ponsea/aws-minimal-webserver · GitHub
用途に合わせて、ここでDockerのコンテナ構成を定義すれば、すぐにシングルノードの小さいWebサービス建てれそうですね。
CloudFormation便利!
Playframework (Scala) でRedisにさくっとアクセスする。non-blockingで。
はじめに
ScalaでRedisにアクセスする例です。ScalaでのRedisクライアントライブラリはいくつか選択肢はあるようですが、ここではJava製のLettuceを使います。
サンプルコードは以下にあります github.com
一見
先にPlayframeworkで実際にRedisにアクセスしているコードをお見せします。 Redisに値をSETしてからGETするだけのControllerのActionです。
@Singleton class RedisController @Inject()( cc: ControllerComponents, redisConnection: StatefulRedisConnection[String, String] // RedisのコネクションをDI )(implicit ec: ExecutionContext) extends AbstractController(cc) { def redis = Action.async { import scala.jdk.FutureConverters._ // JavaのCompletableFutureをScalaのFutureにするコンバータ val asyncCommands = redisConnection.async() // Redisのコマンドの非同期API for { _ <- asyncCommands.set("key", "Hello, Redis!").asScala // Redisに値をセット message <- asyncCommands.get("key").asScala // Redisから値をゲット } yield Ok(message) } }
RedisController.scala - GitHub
アクセスすると "Hello, Redis!" と表示されます。
DI設定等は後述
Lettuceとは
公式ページ: https://lettuce.io/
特徴:
- Java製 Redisクライアントライブラリ
- ノンブロッキングIO (netty NIOベース)
- 3種類のAPIを提供している
- 同期API (blocking IO)
- 非同期API (javaのCompletableFutureベース)
- Reactive API (Reactor coreベース)
- Redis ClusterやSentinelにも対応している。
- Redis Pub/Subも使える
JavaのCompletableFutureはScalaのFutureに簡単に変換できます。 また、Reactive APIはReactive Streamsの実装なのでAkka Streamなどとも相互変換可能です。
DI設定
Playframework標準のGoogle Guiceを使っている場合の設定例です。
まず、LettuceのRedisClientやStatefulRedisConnection (Redisへのコネクション)のプロバイダーを定義します。
// RedisClientのプロバイダー @Singleton class RedisClientProvider @Inject()( config: Configuration, lifecycle: ApplicationLifecycle ) extends Provider[RedisClient] { private val redisClient = RedisClient.create(config.get[String]("lettuce.redis-uri")) // RedisへのURIはapplication.confで設定する // アプリケーションの終了時のクリーンアップ lifecycle.addStopHook { () => redisClient.shutdownAsync().asScala } override val get: RedisClient = redisClient } // StatefulRedisConnectionのプロバイダー @Singleton class StatefulRedisConnectionProvider { // 省略。RedisClientProviderと同様です }
RedisClientProvider, StatefulRedisConnectionProvider - GitHub
次に、Google GuiceのModule設定をします。PlayframeworkはデフォルトではルートパッケージにModule
という名前のクラスを配置すると読み込まれます。
class Module extends AbstractModule { override def configure() = { bind(classOf[RedisClient]) .toProvider(classOf[RedisClientProvider]) bind(new TypeLiteral[StatefulRedisConnection[String, String]] {}) .toProvider(classOf[StatefulRedisConnectionProvider]) } }
型パラメーター付きのクラスのDIを設定するときは、new TypeLiteral[< DIしたい型 >]
とする必要があります。
Webページでマウスのシェイクイベント(ブルブルっと動かした)の検知 | TypeScript, RxJS
はじめに
ユーザーがマウスをブルブルっと動かした(シェイクした)時になんらかのタスクを実行したいと思ったことはありますか?ありませんよね。
こんなイメージです。マウスシェイク検知したらその旨をコンソールに出力します。
RxJSを使えばそこまで難しくはありません。
ちなみにRxJSとは非同期処理やストリーム処理のライブラリです。
実装コード
今回はマウスのシェイクイベントを、このように噛み砕きました。
"500ミリ秒以内に4回、マウスのX軸方向の動きが反転した"
それをRxJSで表現します。
// ↓最終的なマウスシェイクイベントのストリーム const mouseShakeEvents$ = fromEvent<MouseEvent>(document, 'mousemove').pipe( // 上流はmousemoveイベントのストリーム // ↓マウスのx軸が動いたイベントだけにフィルタ filter(e => e.movementX !== 0), // マウスのx軸の動きが反転した時だけ流すように distinctUntilChanged((e1, e2) => Math.sign(e1.movementX) === Math.sign(e2.movementX)), // 直近の4件のイベントのまとまりで流すように bufferCount(4, 1), // 直近4件のイベントの、最初と最後の間の時間が500ms以下の場合に流すように filter(events => events.length === 4 && events[3].timeStamp - events[0].timeStamp < 500) );
要点・補足:
- RxJSのfromEventで、document (DOM)上のmousemoveイベントを、
Observable<MouseEvent>
に変換します。- Observableは端的に言うとストリームのようなオブジェクトです。
- mousemoveイベントにはどの方向にどれだけ動いたかを示すmovementX, movementYプロパティがあります
- RxJSのdistinctUntilChangedオペレーターは、流れてきた値と前回の値の等価性を検証する関数を取り、true(等価とみなした)場合はフィルタします。
Math.sign(e1.movementX)
は、movementXの値がマイナスなら-1, プラスなら1を返します。
- RxJSのbufferCountは値を指定個数分バッファして配列で流すオペレータです。
bufferCount(3, 2)
とした時、1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7
という値の流れは[1, 2, 3] -> [3, 4, 5] -> [5, 6, 7]
のように変換されます。 (3つのまとまりで2つごとに流れる)
RxJSとっても便利です!
外部リンクを新規タブで開くようにするだけ (プレーンなJavaScript)
したいこと
ブログなどのWebサイトでリンクをクリックした時、特にそのリンクが外部のWebサイトのものだと新規タブで開いた方が見る側として楽だと思います(たぶん)。
また、コンテンツ提供者側としても、自分のサイトに長く滞在してくれることになり嬉しいです。とは言ったものの1つ1つの<a>
タグにtarget="_blank"
と書くのは面倒なので、共通のスクリプト書きました。jQueryなど無しのプレーンJavaScriptです(ECMA 2015 (ES6) を前提としていますが。)
スクリプト
<script> window.addEventListener("DOMContentLoaded", () => { document .querySelectorAll(`a[href^="http"]:not([href*="${location.hostname}"])`) .forEach(link => link.setAttribute("target", "_blank")); }); </script>
処理はシンプルです。DOMが読み込まれた時に
querySelectorAll
で外部リンクのaタグを取得。条件は- aタグのhrefが
http
で始まる絶対URL かつ - 現在のサイトのホスト名(このブログで言うと
ponsea.hatenablog.com
)を含まない
- aタグのhrefが
forEach
でそれぞれのリンクにtarget="_blank"
HTML属性をつける
スクリプトの影響範囲は大きいのですこし注意です。
はてなブログで設定したい場合
はてなブログの場合は、デザイン => カスタム => フッタ
で上記のスクリプトコピペして「変更を保存する」で、ブログ内全体で適用されます。
Slick (Scala) のテーブル定義からSQLのCREATE TABLE文を標準出力するだけ (MySQLでのカラム型一覧あり)
目次:
なにがしたい
Scalaのデータベース処理のライブラリであるSlickで、Slickのテーブル定義(クラス定義)からSQLのCREATE TABLE文を標準出力したいです。 Slickのテーブル定義(クラス定義)を書いて、それからまた同じDBマイグレーションのためのSQLファイルを書くケースでは、何度も1から書くのは避けたいですよね。 あと、Slickがサポートしているカラムの型が結局DBでなにの型になるのかよくわからないので、それを一覧で確認したい次第です。
コード:
↓ データベースのテーブルスキーマからSlickのテーブル定義を生成する(今回と逆)のツールはありますが、設定とかややこしそうで手を出しづらいです😅
Schema Code Generation — Slick 3.2.0 documentation
コードを書く時点ですでにデータベース上にスキーマがある場合は便利だと思います。
テーブル定義書いて出力
テーブル定義その1 (カラム型列挙テーブル)
ColumnTypesTable.scala - GitHub
Slickのテーブル定義です。Slickのテーブル定義ではJdbcProfile
が必要になりますが、DIで取ってくることが多いと思うのでそのようにいています。(それか直接MySQLProfile
などを参照するか。)
今回はせっかくなのでサンプルのテーブル定義はSlickのJDBCProfileでサポートされている型全てカラムに加えました。このテーブル定義はColumnTypesTable
と名付けています。
テーブル定義その2 (テーブル制約列挙テーブル)
TableConstraintsTable.scala - GitHub
もう1つ、テーブル定義を書いています。Slickのテーブル定義では複合主キーや外部キー、インデックスの設定などRDBにある多くのテーブル制約もかけるので、それらを書いたTableConstraintsTable
です。
標準出力するスクリプト
以下は、上記2つのテーブル定義のCREATE TABLE文とDROP TABLE文を出力するスクリプトです。ScalaTestで書いてsbt testOnly クラスパス
で実行する想定です。今はデータベースはMySQLを指定しています。
PrintSlickTableSchemas.scala - GitHub
今回はScalaTestを使いましたが、こう言ったスクリプトをまとめたsbtのsub-moduleを作ってそこに置いてもいいかもしれません。
Slickのカラム型 => MySQLのカラム型対応
ColumnTypesTableのCREATE TABLEの結果を最後に載せます。Slickのバージョンは3.3.2
です。
ColumnTypesTable.scala - GitHub
CREATE TABLE `column_types`( `int` INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, `long` BIGINT NOT NULL, `float` REAL NOT NULL, `double` DOUBLE NOT NULL, `big_decimal` DECIMAL(21, 2) NOT NULL, `string` TEXT NOT NULL, `char` CHAR(1) NOT NULL, `boolean` BOOLEAN NOT NULL, `byte` TINYINT NOT NULL, `byte_array` BLOB NOT NULL, `blob` BLOB NOT NULL, `clob` CLOB NOT NULL, `uuid` BINARY(16) NOT NULL, `date` DATE NOT NULL, `time` TIME NOT NULL, `timestamp` TIMESTAMP NOT NULL, `local_time` TEXT NOT NULL, `local_date` DATE NOT NULL, `local_date_time` TEXT NOT NULL, `offset_date_time` TEXT NOT NULL, `zoned_date_time` TEXT NOT NULL, `instant` TEXT NOT NULL )
一方TableConstraintsTable.scalaの出力結果はGitHubのリポジトリのREADMEで載せています。
クラスタリングされたリアルタイムなチャットサーバーをクリーンアーキテクチャ風に作るサンプル その2 実装 (全2篇) | Scala, Playframework, Akka, ZIO
※ この記事は以下の記事の続きです
その2(この記事)ではコード上の実装のポイントをいくつか紹介します。ZIOに関する内容が多めです。
目次:
- ストリーミング実装
- DI (依存性注入) の戦略 ・コンパイルタイムDI
- Slickによるデータベース処理
- エラー表現・ハンドリング
- コントローラー・ActionBuilder (Playframework)
- 所感
- 特に参考にさせていただいた文献
サンプルコード:
ストリーミング実装
以下は、Conversation (会話)の更新を見張るユースケースで、ドメインイベントをサブスクライブ・フィルタして更新情報のStream変換する部分です。 ConversationUpdateSubscriber.scala
class ConversationUpdateSubscriber(domainEventBus: DomainEventBus) { def subscribeByConversationKey( targetConversationKey: Conversation.Key ): UIO[Stream[Nothing, ConversationUpdate]] = { // ↓ドメインイベントをサブスクライブしてQueue[DomainEvent]を取得 domainEventBus.subscribe().map { domainEvents: Queue[DomainEvent] => ZStream // ↓QueueをStreamへ変換。コンシューマーの終了時はQueueはシャットダウンされる .fromQueueWithShutdown(domainEvents) .collect { // ↓関心のあるドメインイベントを拾ってクライアントへの通知用の更新情報に変換 case CommentEvent.Sent(commentId, commentContent, authorId, conversationKey, occurredAt) => ConversationUpdate.CommentAdded(commentId, commentContent, authorId, conversationKey, occurredAt) // ↑ 今回はこの変換しか実際使ってない。↓2つの変換(case句)は例えばのもの case CommentEvent.Modified(commentId, authorId, conversationKey, occurredAt) => ConversationUpdate.CommentUpdated(commentId, authorId, conversationKey, occurredAt) case CommentEvent.Removed(commentId, authorId, conversationKey, occurredAt) => ConversationUpdate.CommentDeleted(commentId, authorId, conversationKey, occurredAt) } .filter(_.conversationKey == targetConversationKey) // クライアントの処理が遅くバックプレッシャーがかかった場合は16件までバッファリングし、 // 超過した場合は古いものから削除する .bufferSliding(16) } } }
ここのStreamはZIOのクラスで、バッファーなどを設けてバックプレッシャーの制御もできます。最終的にこのZIOのStreamはコントローラー側でAkka Streamに変換されます。変換処理は
- ZIOのStream
- Reactive Streams1のPublisher
- Akka StreamのSource
- Akka StreamのFlow (PlayframeworkのWebSocketのレスポンスとして返す値)
という手順を踏みます。
DI (依存性注入) の戦略 ・コンパイルタイムDI
依存性の解決はPlayframework標準のGoogle GuiceではなくZIOのZLayerというものを試してみました。ZLayerは依存性の解決を表現する型です。ZLayer[RIn, E, ROut]というシグネチャを持ちます。RInは依存するモジュールです。もし、あるモジュールがAとBに依存するなら、RInはHas[A] with Has[B]
と表現します。対してROut
は依存を解決した結果となるモジュールです。Eは依存性解決の際に生じるエラーの型です。
今回は各クラスにこのZLayerを定義して、最後にこのZLayerを組み合わせて依存性を解決しました。以下は依存性を解決しているコードの抜粋で、Use CaseのInteractorたちに依存性注入しています。
// ↓解決後はZLayer[Any, Nothing, Has[SendCommentUseCase.Interactor] with Has[WatchConversationUpdatesUseCase.Interactor]] def interactors = // ↓この行のZLayerたちのROutを合算して、以下のInteractorのRInに注入する (repositories ++ utils ++ factories ++ services ++ domainEventBus) >>> { // ↓このレイヤーはZLayer[Has[CommentRepository] with Has[...] with etc...., Nothing, Has[SendCommentUseCase.Interactor]] SendCommentUseCase.Interactor.layer ++ WatchConversationUpdatesUseCase.Interactor.layer // その他のInteractorを追加していく... }
ここで出てくるrepositories
は依存性を解決済みの全てのリポジトリのZLayerです。ZLayerは、++
でRInとROutを合算します。そして、>>>
で左辺のZLayerのROutを右辺のZLayerのRInに依存注入します。
ZLayer[Has[A], _, Has[B]] ++ ZLayer[Has[C], _, Has[D]]
の結果はZLayer[Has[A] with Has[C], _, Has[B] with Has[D]]
ですZLayer[Any, _, Has[A]] >>> ZLayer[Has[A], _, Has[B]]
の結果はZLayer[Any, _, Has[B]]
です
RInがAnyの場合、依存するものが無いということです。
Slickによるデータベース処理
DBIORunnerというものを独自で定義しました。これはSlickのDBタスクであるDBIOAction
を実行し、ZIOの値で返します。DBIORunnerはDatabasePairというものに依存しています。
case class DatabasePair( master: BasicProfile#Backend#Database, readonly: BasicProfile#Backend#Database )
BasicProfile#Backend#Database
というのは、SlickのDBIOActionを実行できる型です。データベースを冗長化する場合、書き込み可能なマスターデータベースと、読み込み専用のリードレプリカでデータベースのホストが別れるケースを鑑み、その2つを切り分けています。DBIORunnerは、書き込み処理を含むDBIOActionを実行できるrun
メソッドと、読み込み専用のrunReadonly
メソッドを持ちます。
エラー表現・ハンドリング
エラーの表現はZIO[R, E, A]の値が内部的に保持するCause[E]というモデルをベースに構築します。
Cause[E]にはフィールドとしてList[E]型のfailure
とList[Throwable]型のdefects
があります。Cause[E]のEはZIO[R, E, A]のEと対応しています。ZIOは並行で実行できる値なので、同時に複数のエラーが発生することがあるためList型で表現されます。以下はZIO[R, E, A]のEとして扱う予定のDomainErrorの定義です。
sealed trait DomainError { val errorCode: String val message: String } // システム上の復帰不可なエラー型は定義しない、例えば // `case class SystemError(throwable: Throwable)` // こういったエラーは、ZIO値の`Cause`内にある`defects`として保持する case class ValidationError(resource: String, override val message: String) extends DomainError { override val errorCode = "validation-error" } case class ResourceNotFound(resource: String, condition: Any) extends DomainError { override val errorCode = "validation-error" override val message = s"$resource ($condition) is not found" }
コメントで示す通り復帰不可能なエラー(DB接続エラーなどのThrowable値)はDomainErrorとして保持せず、ZIO[R, E, A]の型表現に登場させていません。こう言ったエラーはドメイン上の処理でリカバリーできないことが多いのでZIO値のCauseが持つdefectsで保持します。ZIO[_, Throwable, _]という値はorDie
メソッドによってZIO[_, Nothing, _]に変換できます。エラーは型に現れることなくこのタスクは失敗し終了します。
以下はデータベースタスクの実行部分の抜粋です。発生しうるエラーをorDie
によってZIOの型情報から消しています。
def run[R](action: DBIOAction[R, NoStream, Effect.All]): UIO[R] = { // FutureをZIO[Any, Throwable, R]に変換。そしてorDieによってZIO[Any, Nothing, R]に変換。(UIOはZIO[Any, Nothing, _]のエイリアス) ZIO.fromFuture(_ => masterDb.run(action)).orDie }
エラーハンドリング (ログ出力等) の例は以下です。
def handleErrors(cause: Cause[DomainError]): UIO[Result] = { for { // 復帰不可能なdefectsをログ出力 _ <- ZIO.foreach(cause.defects)(logger.throwable("Non-recoverable error occurred.", _)) result = (cause.failures, cause.defects) match { // failuresはList[DomainError]型 // failures(バリデーションエラーなど)がある場合は適切なHTTPエラーレスポンスを返す case (fail :: _, _) => Status(statusCodeOf(fail))(errorsToJson(cause.failures)) // `defects`(復帰不可なシステムエラー)のみの場合はInternalServerエラー case _ => InternalServerError } } yield result }
ちなみに、ZIOのorDie
でZIO[_, Throwable, _]をZIO[_, Nothing, _]にした後でも、absorb
というメソッドでもう一度ZIO[_, Throwable, _]へ変換を試みることができます。
データベース等のエラーが発生しても、別のデータベースに接続して復帰したいといった用件が生まれたときに便利かもしれません。
コントローラー・ActionBuilder (Playframework)
Playframeworkのコントローラー周りについてです。ZIO
のタスクを実行するためのActionBuilderとしてZioActionBuilderを独自で定義しました。以下はコントローラーで使用する例です。(PlayframeworkのAction定義)
def invoke = { ZioAction andThen // ZioActionBuilderはandThenで繋げられる loggingAction andThen authAction // Requestの`attachments`に認証情報(UserCredential)を付与する // ↓ unsafeRunによってZIO[_, _, Result]をPlayのActionに (zio.Runtimeを暗黙の引数でとる) }.unsafeRun(parse.json[Input]) { request => // このrequestは`RequestWithAttachments`型 // Option値ではなくそのまま型安全にUserCredentialを取得できる val credential = request.attachments.get[UserCredential] val input = request.body val result = interactor.execute(input)(credential) result.foldCauseM( cause => errorHandler.handleErrors(cause), output => UIO(Created(Json.toJson(output.createdComment))) ) }
今回はRequestWithAttachmentsという独自のRequest型を定義しています。これは、attachments
というAT <: Has[_]
型を保持します。Has[_]はZIOの型で、例えばHas[A] with Has[B]
の値の場合はget[A]
メソッドでA型の値を取得できます。ZioActionFunctionがこのattachmentsに追加情報を付与します。attachments
の型はHas[XXX] with Has[YYY] ...
と型情報が増え最終的にget[任意の型]
で値を取得できます。Play備え付けのRequest
にはattr
という似たようなフィールドがありますが、こちらはget[任意の型]
で取り出すとOptionが返るので少し不便です。
所感
ZIO独自のテストライブラリ、テストランナーが一長一短
ZIOには独自のテストライブラリ、テストランナーが存在します。ScalaTestではなくこちらを使うと、ZIOのテスト用のサポートが簡単に使えます。例えば、TestClock
というものを使えば、システムのクロック時刻を簡単に操作でき、並行処理の過程を細かくテストできます。ZIO独自のモックライブラリもあり、モック用のコードを自動生成するマクロもあります。
ただマクロはZIOで推奨されているモジュールのコーディング方法に沿っていないと使えないようで、今回は使えませんでした。また、現時点で使った限りでは、「モックのメソッドを呼び出さなかった」という検証をしようとしても対応するものが無かったりしました。普段ScalaTest等になれている場合はまた新しいものを覚えないといけないので学習コストもかさみます。
ZLayerを使ったコンパイルタイムDI、やはり面倒さは残る。あと分かりにくいかもしれない
すこし大雑把にDI設定をかけますがやはり手間です。ZLayerについて知らないと奇抜なコードに見えると思います。型注釈がめちゃくちゃ長くなりがちで書くか迷います。あとコンパイルタイムかなり遅くなっているのではと思います。
依存性解決している部分: AppLayers.scala
実装上の懸念点: ユーザーのコネクションごとに全てのドメインイベントのフィルター処理が走る
スリーミング実装の項で紹介しているDomainEventBusのサブスクライブですが、現時点ではユーザーのサブスクライブごとに全てのドメインイベントのフィルター処理が走ります。共通化できるストリームをキャッシュするなどしないと処理コストがかさみそうです。 また、基本的にドメインイベントは全てのノードに毎度ブロードキャストされます。ネットワーク帯域が心配です。
特に参考にさせていただいた文献
- creators-note.chatwork.com
- speakerdeck.com
- 上2つはサンプルコードも含めかなり参考にさせていただきました。
- ZIO Environment 〜 Tagless Final の後継? - Qiita
- 実装クリーンアーキテクチャ - Qiita
- ZIOのエラー・モデルとエラー処理 - Qiita
他多数
これらにいいねいただけると幸いです。