Web開発のしおりRepository

Webエンジニアリング関連の技術記事を掲載させていただいております。

実際に動いているチャットサービス(小規模)のインフラ構成を公開 | AWS イベントソーシング Auth0

はじめに

個人開発で制作し先日(2020/11/08)に初めのバージョンを公開したチャットサービス Someteria - Webサイト横断チャット (Chrome拡張) のインフラ構成を掲載します。 現状の総インストール数は8人(2020/11/10 時点)、生まれたてのサービスなので本格的な商業的サービスで適応できるかは保証しかねます。しかし、ミニマムにリアルタイムなメッセージの送受信基盤を作る際の一例として参考になれば幸いです。

インフラ構成図

Someteriaインフラ構成図

概略

  • AWSのサーバレスフレームワークのSAMを利用
    • Webサーバーの管理は基本しない
  • RESTfulなHTTP APIとしてのエンドポイントとWebSocketのエンドポイントがある
    • メッセージPush通知にWebSocketを利用
  • データストレージはDynamoDBとElasticsearchを利用
    • イベントソーシングの考えをベースにしている (後述)
  • 認証処理はAuth0 (IDaaSの一つ)を利用

DBまわり, イベントソーシング

書き込み要件と読み込み要件を分離するためにイベントソーシングのパターンを適応し、データソースを分けています。

流れ:

  1. API Gatewayから書き込み系の処理のLambdaを起動
  2. Lambdaから書き込みイベントをDynamoDBに保存
  3. DynamoDB Streamsをトリガーとして以下のLambdaをそれぞれ起動
    • Elasticsearchに表示用の非正規化されたデータを反映・構築
    • ブラウザに対してWebSocket経由で更新メッセージを送信
  4. 後ほど、読み込み系の処理のLambdaが表示用データにアクセス

少し回りくどくなりますが、1.の書き込み系の処理を行うLambdaと、3.の読み込み要件を担うLambdaとで関心を分離できます。そのおかげで実際の経験として、当初はブラウザからポーリングして取得していたデータを、サーバーからのプッシュにするようにする変更があったのですが、その際に1.の書き込み処理用のLambdaの変更は一切不要でした。

認証・ユーザ情報周り

Someteria Auth0

認証にはAuth0 (IDaaS)を利用し、Google, TwitterなどのプロバイダーとOpenID Connectのフローにより連携しています。 フロントのアプリケーションが直接Auth0のエンドポイントとやりとりし認証しトークン(JWT)を受け取ります。そのJWTをサーバーサイドに送りアクセス可否を判断します。

Elasticsearchへユーザー情報の同期

Auth0に保存されるユーザー情報(ユーザ名, アイコンURLなど)は、Auth0のログイン時に起動できるフックによってElasticsearchに同期しています。 Auth0上のユーザー情報はAuth0のAPIから直接アクセスもできます。しかしユーザー情報は頻繁にアクセスするのでAPIのレートリミットを超えないようにするためそうしています。

サーバープッシュ WebSocket

サーバープッシュ WebSocket

サーバーからのプッシュ通知はAPI GatewayのWebSocket APIを使用しています。クライアントがWebSocket APIに接続した際に得られるコネクションIDとメッセージサブスクリプションの情報をDynamoDBに保存します。後ほど、関連する更新イベントが起きた場合にコネクションIDを取得してクライアントに通知できます。

その他特徴

  • AWS Lambdaをベースとしたサーバレスアプリなので、サーバーのスケーリングをAWSに委ねられる
  • AWS SAMはCloudFormationによってコードベースでインフラを定義できるので、再現性が高い
    • ステージング環境などの複製環境をターミナルのコマンドから構築できる

まとめ

いかがだったでしょうか。チャットに限らず、リアルタイムなメッセージパッシングを実装する機会があれば、参考にしていただければと思います。

Web横断チャットサービスSometeriaも是非お試しください。

www.someteria.com

コマンド1発でAWSに極小構成のWebサーバー(SSL/TLS対応済み)を構築したい

「とりあえず動けばいい。お金払いたくない。ドメインとかSSL証明書設定めんどい。ロードバランサーとかいらん。」という忙しくてケチな人向け。

なにする

SSL/TLS対応済みのWebサーバーの極小の構成をささっと作ります。CloudFormationを使います。

大体の流れをつかめばDockerなどとの組み合わせで、コマンド1発でWebサーバーとそのインフラを構築、削除できます。

今回書いたコード等:

GitHub - ponsea/aws-minimal-webserver: Minimal Web server with SSL/TLS on AWS

インフラ仕様

  • サーバーはEC2インスタンス1個のみ
  • WebサーバーのURLのホスト名は適当になるがHTTPSで接続できる。(自前でドメインや証明書の用意をする必要無し)
    • このようなURLになる => https://xxxxxxx.execute-api.ap-northeast-1.amazonaws.com/
  • サーバーにSSHできる
  • 課金は 任意のEC2インスタンスの料金 + Route53のホストゾーン(月50円) + CloudMapのNamespace (月10円) ほど
    • 料金面は適宜、公式ページでご確認ください

インフラ構成

最終的なインフラ構成です。

f:id:ponsea:20200905135551p:plain
インフラ構成

API GatewayVPCリンクと、プライベート統合機能を用いてリクエストをEC2にバイパスします。 Cloud MapはApiGatewayがEC2インスタンスIPアドレス/ポート番号を解決するために用います。

前提

  • アカウント登録してAWS CLIをインストール済み
  • 前述のインフラ構成を構築する権限がある
  • EC2のキーペアを1個作っている

今回はCloudFormationを使います。CloudFormationとは端的に言うと、前述のインフラ構成を定義したファイル(yamlまたはjson形式)を用意して実行すると、そのインフラ構成を実際に構築してくれる優れものです。

以下に今回のCloudFormationのテンプレートファイル(インフラ構成を定義したファイル)があります。

GitHub - ponsea/aws-minimal-webserver: Minimal Web server with SSL/TLS on AWS

template.yaml がインフラ構成の定義ファイルです。

template.yamlの中ではすでに、自動でDockerのhttpdイメージが80番ポートで実行される様に設定されています。(詳細は後述)

docker run -d -p 80:80 httpd

コマンド1発でインフラを構築する

template.yamlがあるディレクトリで以下のコマンドを実行します。すると実際にインフラ構成が構築されます。

aws cloudformation create-stack \
  --stack-name minimal-webserver \ 任意のスタック名 (実際に作成されるAWSリソースの総称のようなもの)
  --template-body file://template.yaml \
  --parameters \
      ParameterKey=Ec2InstanceType,ParameterValue=t2.micro \ インスタンスタイプはお好みで(t2.microの部分)
      ParameterKey=SshKeyName,ParameterValue=<YOUR SSH KEY NAME> \ 予め作っておいたEC2のSSHキーペア名を指定
      ParameterKey=SshLocation,ParameterValue=<YOUR SSH LOCATION> SSH元のIPをx.x.x.x/x形式で指定し制限することをお勧めします。省略すれば0.0.0.0/0(制限なし)となります。

構築の様子はAWSのCloudFormationのコンソールで確認できます。ステータスがCREATE_COMPLETEになれば構築完了です。

構築が完了したあと、EC2インスタンスのパブリックIPとWebサーバーのURL(エンドポイント)を確認します。

aws cloudformation describe-stacks --stack-name minimal-webserver

{
    "Stacks": [
        {
            ...

            "StackStatus": "CREATE_COMPLETE"

            ...

            "Outputs": [
                { 
                    "OutputKey": "ApiGatewayUrl",
                    ↓ WebサーバーのURL
                    "OutputValue": "https://xxxxxx.execute-api.ap-northeast-1.amazonaws.com/",
                    "Description": "API Gateway endpoint URL for $default stage"
                },
                {
                    "OutputKey": "Ec2InstancePublicIp",
                    ↓ EC2 インスタンスのパブリックIP
                    "OutputValue": "xxx.xxx.xxx.xxx",
                    "Description": "EC2 Instance Public IP"
                }
            ],

            ...
    ]
}

"OutputValue": "https://xxxxxx.execute-api.ap-northeast-1.amazonaws.com/", のURLにアクセスすれば、Webページが見れます。

現状ではDockerのhttpdイメージの「It works!」というページが表示されるはずです。

SSH接続したい場合

ssh -i "/path/to/your-ssh-key.pem" ec2-user@<YOUR EC2 PUBLIC IP>

全部全部まとめて削除したい場合:

aws cloudformation delete-stack --stack-name minimal-webserver

template.yaml 分析

今回事前に用意したtemplate.yamlで定義したEC2インスタンスは、予めDocker, docker-compose, gitをインストールし、Dockerのhttpdイメージを実行するスクリプトを埋め込んでいます。このスクリプトはEC2インスタンスの初回起動時に実行されます。

      UserData:
        Fn::Base64: !Sub |
          #!/bin/bash -xe
          # docker, gitインストール
          yum install -y docker git
          # docker-composeインストール
          curl -L "https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
          chmod +x /usr/local/bin/docker-compose
          systemctl start docker # Dockerデーモン起動
          systemctl enable docker # Dockerデーモン自動起動設定
          usermod -a -G docker ec2-user # dockerコマンドをec2-userで実行できる様に
          docker run -d -p 80:80 httpd # httpdイメージを実行

aws-minimal-webserver/template.yaml at master · ponsea/aws-minimal-webserver · GitHub

用途に合わせて、ここでDockerのコンテナ構成を定義すれば、すぐにシングルノードの小さいWebサービス建てれそうですね。

CloudFormation便利!

Playframework (Scala) でRedisにさくっとアクセスする。non-blockingで。

はじめに

ScalaでRedisにアクセスする例です。ScalaでのRedisクライアントライブラリはいくつか選択肢はあるようですが、ここではJava製のLettuceを使います。

サンプルコードは以下にあります github.com

一見

先にPlayframeworkで実際にRedisにアクセスしているコードをお見せします。 Redisに値をSETしてからGETするだけのControllerのActionです。

@Singleton
class RedisController @Inject()(
  cc: ControllerComponents,
  redisConnection: StatefulRedisConnection[String, String] // RedisのコネクションをDI
)(implicit ec: ExecutionContext)
    extends AbstractController(cc) {

  def redis = Action.async {
    import scala.jdk.FutureConverters._ // JavaのCompletableFutureをScalaのFutureにするコンバータ
    val asyncCommands = redisConnection.async() // Redisのコマンドの非同期API
    for {
      _       <- asyncCommands.set("key", "Hello, Redis!").asScala // Redisに値をセット
      message <- asyncCommands.get("key").asScala // Redisから値をゲット
    } yield Ok(message)
  }
}

RedisController.scala - GitHub

アクセスすると "Hello, Redis!" と表示されます。

DI設定等は後述

Lettuceとは

公式ページ: https://lettuce.io/

特徴:

  • Java製 Redisクライアントライブラリ
  • ノンブロッキングIO (netty NIOベース)
  • 3種類のAPIを提供している
  • Redis ClusterやSentinelにも対応している。
  • Redis Pub/Subも使える

JavaのCompletableFutureはScalaのFutureに簡単に変換できます。 また、Reactive APIReactive Streamsの実装なのでAkka Streamなどとも相互変換可能です。

DI設定

Playframework標準のGoogle Guiceを使っている場合の設定例です。

まず、LettuceのRedisClientやStatefulRedisConnection (Redisへのコネクション)のプロバイダーを定義します。

// RedisClientのプロバイダー
@Singleton
class RedisClientProvider @Inject()(
  config: Configuration,
  lifecycle: ApplicationLifecycle
) extends Provider[RedisClient] {
  private val redisClient =
    RedisClient.create(config.get[String]("lettuce.redis-uri")) // RedisへのURIはapplication.confで設定する

  // アプリケーションの終了時のクリーンアップ
  lifecycle.addStopHook { () =>
    redisClient.shutdownAsync().asScala
  }

  override val get: RedisClient = redisClient
}


// StatefulRedisConnectionのプロバイダー
@Singleton
class StatefulRedisConnectionProvider {
  //  省略。RedisClientProviderと同様です
}

RedisClientProvider, StatefulRedisConnectionProvider - GitHub

application.conf - GitHub

次に、Google GuiceのModule設定をします。PlayframeworkはデフォルトではルートパッケージにModuleという名前のクラスを配置すると読み込まれます。

class Module extends AbstractModule {
  override def configure() = {
    bind(classOf[RedisClient])
      .toProvider(classOf[RedisClientProvider])
    bind(new TypeLiteral[StatefulRedisConnection[String, String]] {})
      .toProvider(classOf[StatefulRedisConnectionProvider])
  }
}

型パラメーター付きのクラスのDIを設定するときは、new TypeLiteral[< DIしたい型 >]とする必要があります。

Webページでマウスのシェイクイベント(ブルブルっと動かした)の検知 | TypeScript, RxJS

はじめに

ユーザーがマウスをブルブルっと動かした(シェイクした)時になんらかのタスクを実行したいと思ったことはありますか?ありませんよね。

こんなイメージです。マウスシェイク検知したらその旨をコンソールに出力します。

f:id:ponsea:20200601191158g:plain
マウスシェイク検知したらコンソールに出力

RxJSを使えばそこまで難しくはありません。

ちなみにRxJSとは非同期処理やストリーム処理のライブラリです。

実装コード

今回はマウスのシェイクイベントを、このように噛み砕きました。

"500ミリ秒以内に4回、マウスのX軸方向の動きが反転した"

それをRxJSで表現します。

// ↓最終的なマウスシェイクイベントのストリーム
const mouseShakeEvents$ = fromEvent<MouseEvent>(document, 'mousemove').pipe(
  // 上流はmousemoveイベントのストリーム
  // ↓マウスのx軸が動いたイベントだけにフィルタ
  filter(e => e.movementX !== 0),
  // マウスのx軸の動きが反転した時だけ流すように
  distinctUntilChanged((e1, e2) => Math.sign(e1.movementX) === Math.sign(e2.movementX)),
  // 直近の4件のイベントのまとまりで流すように
  bufferCount(4, 1),
  // 直近4件のイベントの、最初と最後の間の時間が500ms以下の場合に流すように
  filter(events => events.length === 4 && events[3].timeStamp - events[0].timeStamp < 500)
);

要点・補足:

  • RxJSのfromEventで、document (DOM)上のmousemoveイベントを、Observable<MouseEvent>に変換します。
    • Observableは端的に言うとストリームのようなオブジェクトです。
  • mousemoveイベントにはどの方向にどれだけ動いたかを示すmovementX, movementYプロパティがあります
  • RxJSのdistinctUntilChangedオペレーターは、流れてきた値と前回の値の等価性を検証する関数を取り、true(等価とみなした)場合はフィルタします。
    • Math.sign(e1.movementX) は、movementXの値がマイナスなら-1, プラスなら1を返します。
  • RxJSのbufferCountは値を指定個数分バッファして配列で流すオペレータです。
    • bufferCount(3, 2)とした時、1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 という値の流れは[1, 2, 3] -> [3, 4, 5] -> [5, 6, 7] のように変換されます。 (3つのまとまりで2つごとに流れる)

RxJSとっても便利です!

外部リンクを新規タブで開くようにするだけ (プレーンなJavaScript)

したいこと

ブログなどのWebサイトでリンクをクリックした時、特にそのリンクが外部のWebサイトのものだと新規タブで開いた方が見る側として楽だと思います(たぶん)。 また、コンテンツ提供者側としても、自分のサイトに長く滞在してくれることになり嬉しいです。とは言ったものの1つ1つの<a>タグにtarget="_blank"と書くのは面倒なので、共通のスクリプト書きました。jQueryなど無しのプレーンJavaScriptです(ECMA 2015 (ES6) を前提としていますが。)

スクリプト

<script>
window.addEventListener("DOMContentLoaded", () => {
  document
    .querySelectorAll(`a[href^="http"]:not([href*="${location.hostname}"])`)
    .forEach(link => link.setAttribute("target", "_blank"));
});
</script>

処理はシンプルです。DOMが読み込まれた時に

  1. querySelectorAllで外部リンクのaタグを取得。条件は
    • aタグのhrefがhttpで始まる絶対URL かつ
    • 現在のサイトのホスト名(このブログで言うとponsea.hatenablog.com)を含まない
  2. forEachでそれぞれのリンクにtarget="_blank"HTML属性をつける

スクリプトの影響範囲は大きいのですこし注意です。

はてなブログで設定したい場合

はてなブログの場合は、デザイン => カスタム => フッタ で上記のスクリプトコピペして「変更を保存する」で、ブログ内全体で適用されます。

はてなブログでの設定
はてなブログでの設定

Slick (Scala) のテーブル定義からSQLのCREATE TABLE文を標準出力するだけ (MySQLでのカラム型一覧あり)

目次:

なにがしたい

Scalaのデータベース処理のライブラリであるSlickで、Slickのテーブル定義(クラス定義)からSQLのCREATE TABLE文を標準出力したいです。 Slickのテーブル定義(クラス定義)を書いて、それからまた同じDBマイグレーションのためのSQLファイルを書くケースでは、何度も1から書くのは避けたいですよね。 あと、Slickがサポートしているカラムの型が結局DBでなにの型になるのかよくわからないので、それを一覧で確認したい次第です。

コード:

github.com

↓ データベースのテーブルスキーマからSlickのテーブル定義を生成する(今回と逆)のツールはありますが、設定とかややこしそうで手を出しづらいです😅

Schema Code Generation — Slick 3.2.0 documentation

コードを書く時点ですでにデータベース上にスキーマがある場合は便利だと思います。

テーブル定義書いて出力

テーブル定義その1 (カラム型列挙テーブル)

ColumnTypesTable.scala - GitHub

Slickのテーブル定義です。Slickのテーブル定義ではJdbcProfileが必要になりますが、DIで取ってくることが多いと思うのでそのようにいています。(それか直接MySQLProfileなどを参照するか。) 今回はせっかくなのでサンプルのテーブル定義はSlickのJDBCProfileでサポートされている型全てカラムに加えました。このテーブル定義はColumnTypesTableと名付けています。

テーブル定義その2 (テーブル制約列挙テーブル)

TableConstraintsTable.scala - GitHub

もう1つ、テーブル定義を書いています。Slickのテーブル定義では複合主キーや外部キー、インデックスの設定などRDBにある多くのテーブル制約もかけるので、それらを書いたTableConstraintsTableです。

標準出力するスクリプト

以下は、上記2つのテーブル定義のCREATE TABLE文とDROP TABLE文を出力するスクリプトです。ScalaTestで書いてsbt testOnly クラスパスで実行する想定です。今はデータベースはMySQLを指定しています。

PrintSlickTableSchemas.scala - GitHub

今回はScalaTestを使いましたが、こう言ったスクリプトをまとめたsbtのsub-moduleを作ってそこに置いてもいいかもしれません。

Slickのカラム型 => MySQLのカラム型対応

ColumnTypesTableのCREATE TABLEの結果を最後に載せます。Slickのバージョンは3.3.2です。

ColumnTypesTable.scala - GitHub

CREATE TABLE `column_types`(
    `int`              INTEGER        NOT NULL AUTO_INCREMENT PRIMARY KEY,
    `long`             BIGINT         NOT NULL,
    `float`            REAL           NOT NULL,
    `double`           DOUBLE         NOT NULL,
    `big_decimal`      DECIMAL(21, 2) NOT NULL,
    `string`           TEXT           NOT NULL,
    `char`             CHAR(1)        NOT NULL,
    `boolean`          BOOLEAN        NOT NULL,
    `byte`             TINYINT        NOT NULL,
    `byte_array`       BLOB           NOT NULL,
    `blob`             BLOB           NOT NULL,
    `clob`             CLOB           NOT NULL,
    `uuid`             BINARY(16)     NOT NULL,
    `date`             DATE           NOT NULL,
    `time`             TIME           NOT NULL,
    `timestamp`        TIMESTAMP      NOT NULL,
    `local_time`       TEXT           NOT NULL,
    `local_date`       DATE           NOT NULL,
    `local_date_time`  TEXT           NOT NULL,
    `offset_date_time` TEXT           NOT NULL,
    `zoned_date_time`  TEXT           NOT NULL,
    `instant`          TEXT           NOT NULL
)

一方TableConstraintsTable.scalaの出力結果はGitHubのリポジトリのREADMEで載せています。

クラスタリングされたリアルタイムなチャットサーバーをクリーンアーキテクチャ風に作るサンプル その2 実装 (全2篇) | Scala, Playframework, Akka, ZIO

※ この記事は以下の記事の続きです

クラスタリングされたリアルタイムなチャットサーバーをクリーンアーキテクチャ風に作るサンプル その1 設計 (全2篇) | Scala, Playframework, Akka, ZIO - Web開発のしおりRepository

その2(この記事)ではコード上の実装のポイントをいくつか紹介します。ZIOに関する内容が多めです。

目次:

サンプルコード:

github.com

ストリーミング実装

以下は、Conversation (会話)の更新を見張るユースケースで、ドメインイベントをサブスクライブ・フィルタして更新情報のStream変換する部分です。 ConversationUpdateSubscriber.scala

class ConversationUpdateSubscriber(domainEventBus: DomainEventBus) {
  def subscribeByConversationKey(
    targetConversationKey: Conversation.Key
  ): UIO[Stream[Nothing, ConversationUpdate]] = {
    // ↓ドメインイベントをサブスクライブしてQueue[DomainEvent]を取得
    domainEventBus.subscribe().map { domainEvents: Queue[DomainEvent] =>
      ZStream // ↓QueueをStreamへ変換。コンシューマーの終了時はQueueはシャットダウンされる
        .fromQueueWithShutdown(domainEvents)
        .collect {  // ↓関心のあるドメインイベントを拾ってクライアントへの通知用の更新情報に変換
          case CommentEvent.Sent(commentId, commentContent, authorId, conversationKey, occurredAt) =>
            ConversationUpdate.CommentAdded(commentId, commentContent, authorId, conversationKey, occurredAt)
          // ↑ 今回はこの変換しか実際使ってない。↓2つの変換(case句)は例えばのもの
          case CommentEvent.Modified(commentId, authorId, conversationKey, occurredAt) =>
            ConversationUpdate.CommentUpdated(commentId, authorId, conversationKey, occurredAt)
          case CommentEvent.Removed(commentId, authorId, conversationKey, occurredAt) =>
            ConversationUpdate.CommentDeleted(commentId, authorId, conversationKey, occurredAt)
        }
        .filter(_.conversationKey == targetConversationKey)
        // クライアントの処理が遅くバックプレッシャーがかかった場合は16件までバッファリングし、
        // 超過した場合は古いものから削除する
        .bufferSliding(16)
    }
  }
}

ここのStreamはZIOのクラスで、バッファーなどを設けてバックプレッシャーの制御もできます。最終的にこのZIOのStreamはコントローラー側でAkka Streamに変換されます。変換処理は

  1. ZIOのStream
  2. Reactive Streams1のPublisher
  3. Akka StreamのSource
  4. Akka StreamのFlow (PlayframeworkのWebSocketのレスポンスとして返す値)

という手順を踏みます。

DI (依存性注入) の戦略 ・コンパイルタイムDI

依存性の解決はPlayframework標準のGoogle GuiceではなくZIOのZLayerというものを試してみました。ZLayerは依存性の解決を表現する型です。ZLayer[RIn, E, ROut]というシグネチャを持ちます。RInは依存するモジュールです。もし、あるモジュールがAとBに依存するなら、RInはHas[A] with Has[B]と表現します。対してROutは依存を解決した結果となるモジュールです。Eは依存性解決の際に生じるエラーの型です。 今回は各クラスにこのZLayerを定義して、最後にこのZLayerを組み合わせて依存性を解決しました。以下は依存性を解決しているコードの抜粋で、Use CaseのInteractorたちに依存性注入しています。

  // ↓解決後はZLayer[Any, Nothing, Has[SendCommentUseCase.Interactor] with Has[WatchConversationUpdatesUseCase.Interactor]]
  def interactors = 
    // ↓この行のZLayerたちのROutを合算して、以下のInteractorのRInに注入する
    (repositories ++ utils ++ factories ++ services ++ domainEventBus) >>> { 
     // ↓このレイヤーはZLayer[Has[CommentRepository] with Has[...] with etc...., Nothing, Has[SendCommentUseCase.Interactor]]
      SendCommentUseCase.Interactor.layer ++
      WatchConversationUpdatesUseCase.Interactor.layer

      // その他のInteractorを追加していく...

    }

AppLayers.scala - GitHub

ここで出てくるrepositoriesは依存性を解決済みの全てのリポジトリのZLayerです。ZLayerは、++ でRInとROutを合算します。そして、>>>で左辺のZLayerのROutを右辺のZLayerのRInに依存注入します。

  • ZLayer[Has[A], _, Has[B]] ++ ZLayer[Has[C], _, Has[D]]の結果はZLayer[Has[A] with Has[C], _, Has[B] with Has[D]]です
  • ZLayer[Any, _, Has[A]] >>> ZLayer[Has[A], _, Has[B]]の結果はZLayer[Any, _, Has[B]]です

RInがAnyの場合、依存するものが無いということです。

Slickによるデータベース処理

DBIORunnerというものを独自で定義しました。これはSlickのDBタスクであるDBIOActionを実行し、ZIOの値で返します。DBIORunnerはDatabasePairというものに依存しています。

case class DatabasePair(
  master: BasicProfile#Backend#Database,
  readonly: BasicProfile#Backend#Database
)

BasicProfile#Backend#Databaseというのは、SlickのDBIOActionを実行できる型です。データベースを冗長化する場合、書き込み可能なマスターデータベースと、読み込み専用のリードレプリカでデータベースのホストが別れるケースを鑑み、その2つを切り分けています。DBIORunnerは、書き込み処理を含むDBIOActionを実行できるrunメソッドと、読み込み専用のrunReadonlyメソッドを持ちます。

エラー表現・ハンドリング

エラーの表現はZIO[R, E, A]の値が内部的に保持するCause[E]というモデルをベースに構築します。 Cause[E]にはフィールドとしてList[E]型のfailureとList[Throwable]型のdefectsがあります。Cause[E]のEはZIO[R, E, A]のEと対応しています。ZIOは並行で実行できる値なので、同時に複数のエラーが発生することがあるためList型で表現されます。以下はZIO[R, E, A]のEとして扱う予定のDomainErrorの定義です。

sealed trait DomainError {
  val errorCode: String
  val message: String
}

// システム上の復帰不可なエラー型は定義しない、例えば
// `case class SystemError(throwable: Throwable)`
// こういったエラーは、ZIO値の`Cause`内にある`defects`として保持する

case class ValidationError(resource: String, override val message: String) extends DomainError {
  override val errorCode = "validation-error"
}

case class ResourceNotFound(resource: String, condition: Any) extends DomainError {
  override val errorCode = "validation-error"
  override val message   = s"$resource ($condition) is not found"
}

DomainError.scala - GitHub

コメントで示す通り復帰不可能なエラー(DB接続エラーなどのThrowable値)はDomainErrorとして保持せず、ZIO[R, E, A]の型表現に登場させていません。こう言ったエラーはドメイン上の処理でリカバリーできないことが多いのでZIO値のCauseが持つdefectsで保持します。ZIO[_, Throwable, _]という値はorDieメソッドによってZIO[_, Nothing, _]に変換できます。エラーは型に現れることなくこのタスクは失敗し終了します。 以下はデータベースタスクの実行部分の抜粋です。発生しうるエラーをorDieによってZIOの型情報から消しています。

  def run[R](action: DBIOAction[R, NoStream, Effect.All]): UIO[R] = {
    // FutureをZIO[Any, Throwable, R]に変換。そしてorDieによってZIO[Any, Nothing, R]に変換。(UIOはZIO[Any, Nothing, _]のエイリアス)
    ZIO.fromFuture(_ => masterDb.run(action)).orDie
  }

DBIORunner.scala - GitHub

エラーハンドリング (ログ出力等) の例は以下です。

  def handleErrors(cause: Cause[DomainError]): UIO[Result] = {
    for {
      // 復帰不可能なdefectsをログ出力
      _ <- ZIO.foreach(cause.defects)(logger.throwable("Non-recoverable error occurred.", _))
      result = (cause.failures, cause.defects) match { // failuresはList[DomainError]型
         // failures(バリデーションエラーなど)がある場合は適切なHTTPエラーレスポンスを返す
        case (fail :: _, _) => Status(statusCodeOf(fail))(errorsToJson(cause.failures))
        // `defects`(復帰不可なシステムエラー)のみの場合はInternalServerエラー
        case _              => InternalServerError
      }
    } yield result
  }

ErrorHandler.scala - GitHub

ちなみに、ZIOのorDieでZIO[_, Throwable, _]をZIO[_, Nothing, _]にした後でも、absorbというメソッドでもう一度ZIO[_, Throwable, _]へ変換を試みることができます。 データベース等のエラーが発生しても、別のデータベースに接続して復帰したいといった用件が生まれたときに便利かもしれません。

コントローラー・ActionBuilder (Playframework)

Playframeworkのコントローラー周りについてです。ZIOのタスクを実行するためのActionBuilderとしてZioActionBuilderを独自で定義しました。以下はコントローラーで使用する例です。(PlayframeworkのAction定義)

  def invoke = {
    ZioAction andThen // ZioActionBuilderはandThenで繋げられる
    loggingAction andThen
    authAction // Requestの`attachments`に認証情報(UserCredential)を付与する
    // ↓ unsafeRunによってZIO[_, _, Result]をPlayのActionに (zio.Runtimeを暗黙の引数でとる)
  }.unsafeRun(parse.json[Input]) { request => // このrequestは`RequestWithAttachments`型
    // Option値ではなくそのまま型安全にUserCredentialを取得できる
    val credential = request.attachments.get[UserCredential] 
    val input      = request.body
    val result     = interactor.execute(input)(credential)

    result.foldCauseM(
      cause => errorHandler.handleErrors(cause),
      output => UIO(Created(Json.toJson(output.createdComment)))
    )
  }

今回はRequestWithAttachmentsという独自のRequest型を定義しています。これは、attachmentsというAT <: Has[_]型を保持します。Has[_]はZIOの型で、例えばHas[A] with Has[B]の値の場合はget[A]メソッドでA型の値を取得できます。ZioActionFunctionがこのattachmentsに追加情報を付与します。attachmentsの型はHas[XXX] with Has[YYY] ...と型情報が増え最終的にget[任意の型]で値を取得できます。Play備え付けのRequestにはattrという似たようなフィールドがありますが、こちらはget[任意の型]で取り出すとOptionが返るので少し不便です。

所感

ZIO独自のテストライブラリ、テストランナーが一長一短

ZIOには独自のテストライブラリ、テストランナーが存在します。ScalaTestではなくこちらを使うと、ZIOのテスト用のサポートが簡単に使えます。例えば、TestClockというものを使えば、システムのクロック時刻を簡単に操作でき、並行処理の過程を細かくテストできます。ZIO独自のモックライブラリもあり、モック用のコードを自動生成するマクロもあります。 ただマクロはZIOで推奨されているモジュールのコーディング方法に沿っていないと使えないようで、今回は使えませんでした。また、現時点で使った限りでは、「モックのメソッドを呼び出さなかった」という検証をしようとしても対応するものが無かったりしました。普段ScalaTest等になれている場合はまた新しいものを覚えないといけないので学習コストもかさみます。

ZLayerを使ったコンパイルタイムDI、やはり面倒さは残る。あと分かりにくいかもしれない

すこし大雑把にDI設定をかけますがやはり手間です。ZLayerについて知らないと奇抜なコードに見えると思います。型注釈がめちゃくちゃ長くなりがちで書くか迷います。あとコンパイルタイムかなり遅くなっているのではと思います。

依存性解決している部分: AppLayers.scala

実装上の懸念点: ユーザーのコネクションごとに全てのドメインイベントのフィルター処理が走る

スリーミング実装の項で紹介しているDomainEventBusのサブスクライブですが、現時点ではユーザーのサブスクライブごとに全てのドメインイベントのフィルター処理が走ります。共通化できるストリームをキャッシュするなどしないと処理コストがかさみそうです。 また、基本的にドメインイベントは全てのノードに毎度ブロードキャストされます。ネットワーク帯域が心配です。

特に参考にさせていただいた文献

他多数

これらにいいねいただけると幸いです。


  1. ノンブロッキングバックプレッシャーを処理する非同期ストリームを標準化する取り組み。