Web開発のしおりRepository

Webエンジニアリング関連の技術記事を掲載させていただいております。

クラスタリングされたリアルタイムなチャットサーバーをクリーンアーキテクチャ風に作るサンプル その2 実装 (全2篇) | Scala, Playframework, Akka, ZIO

※ この記事は以下の記事の続きです

クラスタリングされたリアルタイムなチャットサーバーをクリーンアーキテクチャ風に作るサンプル その1 設計 (全2篇) | Scala, Playframework, Akka, ZIO - Web開発のしおりRepository

その2(この記事)ではコード上の実装のポイントをいくつか紹介します。ZIOに関する内容が多めです。

目次:

サンプルコード:

github.com

ストリーミング実装

以下は、Conversation (会話)の更新を見張るユースケースで、ドメインイベントをサブスクライブ・フィルタして更新情報のStream変換する部分です。 ConversationUpdateSubscriber.scala

class ConversationUpdateSubscriber(domainEventBus: DomainEventBus) {
  def subscribeByConversationKey(
    targetConversationKey: Conversation.Key
  ): UIO[Stream[Nothing, ConversationUpdate]] = {
    // ↓ドメインイベントをサブスクライブしてQueue[DomainEvent]を取得
    domainEventBus.subscribe().map { domainEvents: Queue[DomainEvent] =>
      ZStream // ↓QueueをStreamへ変換。コンシューマーの終了時はQueueはシャットダウンされる
        .fromQueueWithShutdown(domainEvents)
        .collect {  // ↓関心のあるドメインイベントを拾ってクライアントへの通知用の更新情報に変換
          case CommentEvent.Sent(commentId, commentContent, authorId, conversationKey, occurredAt) =>
            ConversationUpdate.CommentAdded(commentId, commentContent, authorId, conversationKey, occurredAt)
          // ↑ 今回はこの変換しか実際使ってない。↓2つの変換(case句)は例えばのもの
          case CommentEvent.Modified(commentId, authorId, conversationKey, occurredAt) =>
            ConversationUpdate.CommentUpdated(commentId, authorId, conversationKey, occurredAt)
          case CommentEvent.Removed(commentId, authorId, conversationKey, occurredAt) =>
            ConversationUpdate.CommentDeleted(commentId, authorId, conversationKey, occurredAt)
        }
        .filter(_.conversationKey == targetConversationKey)
        // クライアントの処理が遅くバックプレッシャーがかかった場合は16件までバッファリングし、
        // 超過した場合は古いものから削除する
        .bufferSliding(16)
    }
  }
}

ここのStreamはZIOのクラスで、バッファーなどを設けてバックプレッシャーの制御もできます。最終的にこのZIOのStreamはコントローラー側でAkka Streamに変換されます。変換処理は

  1. ZIOのStream
  2. Reactive Streams1のPublisher
  3. Akka StreamのSource
  4. Akka StreamのFlow (PlayframeworkのWebSocketのレスポンスとして返す値)

という手順を踏みます。

DI (依存性注入) の戦略 ・コンパイルタイムDI

依存性の解決はPlayframework標準のGoogle GuiceではなくZIOのZLayerというものを試してみました。ZLayerは依存性の解決を表現する型です。ZLayer[RIn, E, ROut]というシグネチャを持ちます。RInは依存するモジュールです。もし、あるモジュールがAとBに依存するなら、RInはHas[A] with Has[B]と表現します。対してROutは依存を解決した結果となるモジュールです。Eは依存性解決の際に生じるエラーの型です。 今回は各クラスにこのZLayerを定義して、最後にこのZLayerを組み合わせて依存性を解決しました。以下は依存性を解決しているコードの抜粋で、Use CaseのInteractorたちに依存性注入しています。

  // ↓解決後はZLayer[Any, Nothing, Has[SendCommentUseCase.Interactor] with Has[WatchConversationUpdatesUseCase.Interactor]]
  def interactors = 
    // ↓この行のZLayerたちのROutを合算して、以下のInteractorのRInに注入する
    (repositories ++ utils ++ factories ++ services ++ domainEventBus) >>> { 
     // ↓このレイヤーはZLayer[Has[CommentRepository] with Has[...] with etc...., Nothing, Has[SendCommentUseCase.Interactor]]
      SendCommentUseCase.Interactor.layer ++
      WatchConversationUpdatesUseCase.Interactor.layer

      // その他のInteractorを追加していく...

    }

AppLayers.scala - GitHub

ここで出てくるrepositoriesは依存性を解決済みの全てのリポジトリのZLayerです。ZLayerは、++ でRInとROutを合算します。そして、>>>で左辺のZLayerのROutを右辺のZLayerのRInに依存注入します。

  • ZLayer[Has[A], _, Has[B]] ++ ZLayer[Has[C], _, Has[D]]の結果はZLayer[Has[A] with Has[C], _, Has[B] with Has[D]]です
  • ZLayer[Any, _, Has[A]] >>> ZLayer[Has[A], _, Has[B]]の結果はZLayer[Any, _, Has[B]]です

RInがAnyの場合、依存するものが無いということです。

Slickによるデータベース処理

DBIORunnerというものを独自で定義しました。これはSlickのDBタスクであるDBIOActionを実行し、ZIOの値で返します。DBIORunnerはDatabasePairというものに依存しています。

case class DatabasePair(
  master: BasicProfile#Backend#Database,
  readonly: BasicProfile#Backend#Database
)

BasicProfile#Backend#Databaseというのは、SlickのDBIOActionを実行できる型です。データベースを冗長化する場合、書き込み可能なマスターデータベースと、読み込み専用のリードレプリカでデータベースのホストが別れるケースを鑑み、その2つを切り分けています。DBIORunnerは、書き込み処理を含むDBIOActionを実行できるrunメソッドと、読み込み専用のrunReadonlyメソッドを持ちます。

エラー表現・ハンドリング

エラーの表現はZIO[R, E, A]の値が内部的に保持するCause[E]というモデルをベースに構築します。 Cause[E]にはフィールドとしてList[E]型のfailureとList[Throwable]型のdefectsがあります。Cause[E]のEはZIO[R, E, A]のEと対応しています。ZIOは並行で実行できる値なので、同時に複数のエラーが発生することがあるためList型で表現されます。以下はZIO[R, E, A]のEとして扱う予定のDomainErrorの定義です。

sealed trait DomainError {
  val errorCode: String
  val message: String
}

// システム上の復帰不可なエラー型は定義しない、例えば
// `case class SystemError(throwable: Throwable)`
// こういったエラーは、ZIO値の`Cause`内にある`defects`として保持する

case class ValidationError(resource: String, override val message: String) extends DomainError {
  override val errorCode = "validation-error"
}

case class ResourceNotFound(resource: String, condition: Any) extends DomainError {
  override val errorCode = "validation-error"
  override val message   = s"$resource ($condition) is not found"
}

DomainError.scala - GitHub

コメントで示す通り復帰不可能なエラー(DB接続エラーなどのThrowable値)はDomainErrorとして保持せず、ZIO[R, E, A]の型表現に登場させていません。こう言ったエラーはドメイン上の処理でリカバリーできないことが多いのでZIO値のCauseが持つdefectsで保持します。ZIO[_, Throwable, _]という値はorDieメソッドによってZIO[_, Nothing, _]に変換できます。エラーは型に現れることなくこのタスクは失敗し終了します。 以下はデータベースタスクの実行部分の抜粋です。発生しうるエラーをorDieによってZIOの型情報から消しています。

  def run[R](action: DBIOAction[R, NoStream, Effect.All]): UIO[R] = {
    // FutureをZIO[Any, Throwable, R]に変換。そしてorDieによってZIO[Any, Nothing, R]に変換。(UIOはZIO[Any, Nothing, _]のエイリアス)
    ZIO.fromFuture(_ => masterDb.run(action)).orDie
  }

DBIORunner.scala - GitHub

エラーハンドリング (ログ出力等) の例は以下です。

  def handleErrors(cause: Cause[DomainError]): UIO[Result] = {
    for {
      // 復帰不可能なdefectsをログ出力
      _ <- ZIO.foreach(cause.defects)(logger.throwable("Non-recoverable error occurred.", _))
      result = (cause.failures, cause.defects) match { // failuresはList[DomainError]型
         // failures(バリデーションエラーなど)がある場合は適切なHTTPエラーレスポンスを返す
        case (fail :: _, _) => Status(statusCodeOf(fail))(errorsToJson(cause.failures))
        // `defects`(復帰不可なシステムエラー)のみの場合はInternalServerエラー
        case _              => InternalServerError
      }
    } yield result
  }

ErrorHandler.scala - GitHub

ちなみに、ZIOのorDieでZIO[_, Throwable, _]をZIO[_, Nothing, _]にした後でも、absorbというメソッドでもう一度ZIO[_, Throwable, _]へ変換を試みることができます。 データベース等のエラーが発生しても、別のデータベースに接続して復帰したいといった用件が生まれたときに便利かもしれません。

コントローラー・ActionBuilder (Playframework)

Playframeworkのコントローラー周りについてです。ZIOのタスクを実行するためのActionBuilderとしてZioActionBuilderを独自で定義しました。以下はコントローラーで使用する例です。(PlayframeworkのAction定義)

  def invoke = {
    ZioAction andThen // ZioActionBuilderはandThenで繋げられる
    loggingAction andThen
    authAction // Requestの`attachments`に認証情報(UserCredential)を付与する
    // ↓ unsafeRunによってZIO[_, _, Result]をPlayのActionに (zio.Runtimeを暗黙の引数でとる)
  }.unsafeRun(parse.json[Input]) { request => // このrequestは`RequestWithAttachments`型
    // Option値ではなくそのまま型安全にUserCredentialを取得できる
    val credential = request.attachments.get[UserCredential] 
    val input      = request.body
    val result     = interactor.execute(input)(credential)

    result.foldCauseM(
      cause => errorHandler.handleErrors(cause),
      output => UIO(Created(Json.toJson(output.createdComment)))
    )
  }

今回はRequestWithAttachmentsという独自のRequest型を定義しています。これは、attachmentsというAT <: Has[_]型を保持します。Has[_]はZIOの型で、例えばHas[A] with Has[B]の値の場合はget[A]メソッドでA型の値を取得できます。ZioActionFunctionがこのattachmentsに追加情報を付与します。attachmentsの型はHas[XXX] with Has[YYY] ...と型情報が増え最終的にget[任意の型]で値を取得できます。Play備え付けのRequestにはattrという似たようなフィールドがありますが、こちらはget[任意の型]で取り出すとOptionが返るので少し不便です。

所感

ZIO独自のテストライブラリ、テストランナーが一長一短

ZIOには独自のテストライブラリ、テストランナーが存在します。ScalaTestではなくこちらを使うと、ZIOのテスト用のサポートが簡単に使えます。例えば、TestClockというものを使えば、システムのクロック時刻を簡単に操作でき、並行処理の過程を細かくテストできます。ZIO独自のモックライブラリもあり、モック用のコードを自動生成するマクロもあります。 ただマクロはZIOで推奨されているモジュールのコーディング方法に沿っていないと使えないようで、今回は使えませんでした。また、現時点で使った限りでは、「モックのメソッドを呼び出さなかった」という検証をしようとしても対応するものが無かったりしました。普段ScalaTest等になれている場合はまた新しいものを覚えないといけないので学習コストもかさみます。

ZLayerを使ったコンパイルタイムDI、やはり面倒さは残る。あと分かりにくいかもしれない

すこし大雑把にDI設定をかけますがやはり手間です。ZLayerについて知らないと奇抜なコードに見えると思います。型注釈がめちゃくちゃ長くなりがちで書くか迷います。あとコンパイルタイムかなり遅くなっているのではと思います。

依存性解決している部分: AppLayers.scala

実装上の懸念点: ユーザーのコネクションごとに全てのドメインイベントのフィルター処理が走る

スリーミング実装の項で紹介しているDomainEventBusのサブスクライブですが、現時点ではユーザーのサブスクライブごとに全てのドメインイベントのフィルター処理が走ります。共通化できるストリームをキャッシュするなどしないと処理コストがかさみそうです。 また、基本的にドメインイベントは全てのノードに毎度ブロードキャストされます。ネットワーク帯域が心配です。

特に参考にさせていただいた文献

他多数

これらにいいねいただけると幸いです。


  1. ノンブロッキングバックプレッシャーを処理する非同期ストリームを標準化する取り組み。