セカイノカタチ

世界のカタチを探求するブログ。関数型言語に興味があり、HaskellやScalaを勉強中。最近はカメラの話題も多め

マーブルワーズ

Playframework の Future の挙動について調べたら Action と Action.async の差異は無かった

表題の通り。調べたら色々興味深いのでメモとして残そうと思う。文章がぶっきらぼうなのは許して。

Playframework では、 Action と Action.async を切り替えることにより、 Future を利用したノンブロッキング処理を Controller 内部にで行うことができる。

Future は ExecutionContext を必要とし、通常は implicit な変数を定義して Controller 内部で暗黙的に引き渡す。

Playframework のドキュメントでは、 controller の implicit パラメーターでデフォルトの ExecutionContext を受け取る方法が紹介されている。

class Samples @Inject()(components: ControllerComponents)(implicit ec: ExecutionContext)
  extends AbstractController(components) {

https://www.playframework.com/documentation/2.7.x/ThreadPools

デフォルトで渡される、 ExecutionContext は、 Akka のディスパッチャとなっており、 application.conf によって設定することができる。デフォルトでは、 ForkJoinPool 方式のディスパッチャが選択されるようだ。

akka {
  actor {
    default-dispatcher {
      fork-join-executor {
        # Settings this to 1 instead of 3 seems to improve performance.
        parallelism-factor = 1.0

        # @richdougherty: Not sure why this is set below the Akka
        # default.
        parallelism-max = 24

        # Setting this to LIFO changes the fork-join-executor
        # to use a stack discipline for task scheduling. This usually
        # improves throughput at the cost of possibly increasing
        # latency and risking task starvation (which should be rare).
        task-peeking-mode = LIFO
      }
    }
  }
}

余談ではあるが、 ForkJoinPool は、処理が fork しながら細分化し、終了(join)しながら統合していくような処理に適しており、タスクはスタックに積み上げられていく(らしい)。 async タスクで使用される Future は、ネストして処理を重ねていくことが多いため、マッチングが良いのかもしれない。

しかし、 Playframework(Akka) のデフォルト ExecutionContext は、内部で BatchingExecutor という仕組みを用いて実装されており、単一のリクエストにおいては、同一の Thread にてバッチ的に実行される。そのため、 ThreadPool の実装や Future の実装戦略は、あまり意味がなく、デフォルトの環境において、 Action と Action.async の差異は無い

独自のECを定義することにより、 BatchingExecutor の動きもなくなり、普通に Future 毎に Thread が割り当てられるようになり、レスポンスタイムは向上する(手元の環境では20%程)。

Playframework のデフォルト戦略では、リクエストの内部を細かく Thread で分解しても、 Thread 切り替えのオーバーヘッドによりシステム全体の利用効率は上がらないという判断なのだろうか?しかし、その場合、 ForkJoinPool を使うメリットは恐らく無いので、この判断の根拠が知りたい。

それと、 BatchingExecutor 環境であっても、 scala.concurrent.blocking { ... } で処理を囲むことによって、 Thread が消費されるようになる。それでも、 Await っぽい動きではなく、タスクが別 Thread に分配されるようになるだけなので、これを使ってタスクの分散を図ることができる(けど、回りくどいので独自で ExecutionContext を定義すればいいじゃんと思う)。

Futureのネストについて

さらに余談となるが、

Future { Future { Future { ... }}}

のように、 Future がネストされる処理では、外側の Future が保持している Thread が無駄になるのでは?という懸念があり、確かめてみたところ、 Future がネストしていても Thread の消費には影響が無いことが分かった。 Future はコードブロックを内包したクラスのインスタンスに過ぎないため、内部に Thread (になりうるコードブロック)を持ったオブジェクトが再帰的に重なって構築されるだけ。 Thread のスケジューリングは EC によって Future 構造とは独立して行われる。

Thread が枯渇してしまうのは下記のような場合のみである。

Future { Await.result(Future { Await.result(Future { ... })})}

Future の中で、中身を Await で取り出そうとすると、 Thread がブロックされ消費されていくので、 ThreadPool に設定された Thread 数を使い切るとフリーズする。

Await を使わない限り、 Future のネストや大量使用についての心配はあまりしなくても良さそう。

サンプルコード。

import java.util.concurrent.Executors
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.util.Success
import scala.concurrent.duration._
object Main {
  def main(args: Array[String]): Unit ={
    implicit val fixedEc: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(3))

    def createFuture(n: Int): Future[_] = {
      if(n > 0) {
        Future({
          println(s"future: ${n}, th: ${Thread.currentThread().getName}")
          createFuture(n-1)
        })
      } else {
        Future("fin")
      }
    }

    def evalF(fu: Future[_], n:Int): String = {
      println(s"evalF ${n}, th: ${Thread.currentThread().getName}")
      val r = Await.result(fu, Duration.Inf)
      r match {
          case _fu: Future[_] =>
            evalF(_fu, n+1)
          case s: String => s
      }
    }

    val fu = createFuture(100)
    evalF(fu, 0)
    fixedEc.shutdown()
  }
}

waterspout_noaa