Scalaには、非同期実行をサポートするために Future というクラスがあります。
正確にはJavaにも同名の仕組みがあるのですが、それをちょっと便利にした感じのものです。
Futureの仕組みを簡単に言うと、関数を渡すと別のThreadでその関数を実行してくれるものです。
それだけだと、Threadクラスとあまり変わらないのですが、ExecutionContext という管理クラスを渡してあげることによって、Threadの挙動をよりインテリジェントに制御してくれるため、Thread周りの煩わしさが大きく軽減します。
そのThread制御を担当するのが、ExecutionContextExecutorServiceというクラスで、こいつは ThreadPoolExecutor など、ExecutorService インターフェイスの実装オブジェクトをプールとして利用して生成されます。今回図解するのは、ExecutionContext のデフォルト実装になっている ThreadPoolExecutor を対象としますが、ExecutionContext の別の実装である ForkJionPool に関しても追記にて簡単に説明します。
ただし、ForkJoinPool はパフォーマンス特性に癖があるようですので、Scala(およびJava)のFutureの挙動を制御するためには、まずは、ThreadPoolExecutor を第一候補に考えておけば良いと思います。
ThreadPoolExecutor の挙動
だいぶ端折ったのですが、それでも前置きが長くなりました。
ThreadPoolExecutor の挙動については、こんな記事を見つけて、これが詳しいので参照すれば良いだけなのですが、一点だけ腑に落ちない部分があります。
それは、ThreadPoolExecutor のコンストラクタに渡す、maximumPoolSize というパラメータの挙動です(と連動してkeepAliveTimeの挙動も)。
この挙動を知るためには、ThreadPoolExecutorのjavadocを紐解く必要があります。
ThreadPoolExecutor (Java Platform SE 8)
この説明の「キューイング」の項目に詳しく説明があるのですが、ゴチャゴチャ書いてあってわかりにくいので図解します。
まず、ThreadPoolExecutor は重要なパラメータとして、1.Theadプールの初期値(corePoolSize), 2.最大プールサイズ(maximumPoolSize), 3.Queueの容量。の3つがあります。
ThreadプールのサイズとQueueの容量は、このように動作します。ここまでは直感的でわかりやすい挙動だと思います。
面白いのはこの後で、Queueの容量を越えたタスクが投入されたときです。
このように、後から投入されたタスクのためにThreadプールが拡張され、そのTheadを使って実行されます。そのため、実行順としては先に投入したタスクよりも先に実行されるという挙動をします。
そして、この時拡張されるTheadプールの最大値を決めるのが、maximumPoolSize となります。なお、拡張されたThreadプールは、実行するタスクが無くなって keepAliveTime が経過すると破棄されます。
次に、Theadプールが maximumPoolSize に達した状態で更にタスクが投入された場合の挙動です。
システムエラーになります。
正確に言うと、この挙動は RejectedExecutionHandler を渡すことによって制御できるのですが、Futureの場合投げっぱなし(ノンブロッキング)が基本となりますので、この挙動をカスタマイズするのは注意が必要です(DiscardPolicyにしたらタイムアウトまで返ってこなくなった、CallerRunsPolicyは実行が遅くても処理継続されるがFutureがシステム資源を食い尽くす可能性がある)。
ということで、現実的な路線で取り得る選択肢は3つに絞られると思います。
この内パターン3は例外発生の危険性があります。とはいえ、Queueを無制限にしてもThreadを無制限にしても、メモリーやらThreadやらのリソースを消費するわけで、挙動の読めないエラーが発生するぐらいなら、いっそ「意図的にウイークポイントを作っておく」という戦略もありかもしれません。
結論: Executors が提供する ThreadPoolExecutor を使おう
ごちゃごちゃわかりにくいので、java.util.concurrent.Executors が提供するできあいの ThreadPoolExecutor を使っていれば良いという結論に達しました。
使うべきは2つです。
newFixedThreadPool(int nThreads)
「Threadプールが固定長=Queueが無制限」なプールを生成します。Threadプールのサイズを決められるので一番使い勝手が良いと思われます。Queueに積まれるタスクは、所詮はタダのオブジェクトなので、メモリー以外のリソースを消費しません。Thread数をシステム資源を圧迫しない程度の常識的な値に設定してメモリーをたんと積めば良いだけなので、リソース管理が楽ですね。
newCachedThreadPool()
Threadプールを無制限に拡張するプールを生成します。Queueサイズはなんと「ゼロ」です。そのため、投入されたタスクは即座にThreadプールに投入され、プールが足りなければ無限に拡張されます。スレッドプールの初期サイズも割り切りの「ゼロ」です。Queueに投入されてしまったタスクは「何もしない」ので実行効率が悪く、Threadに投入されていれば、OSレベルでタスク制御されるので、効率が上がります。ただし、Threadというクリティカルなリソースを消費するため、システムが不安定になるリスクを抱えることになります。
という特徴を持つ2つのプールを必要に応じて選択すれば、99割のケースには対応できるのでは無いかと思います。
なお、Scalaがデフォルトで提供している scala.concurrent.ExecutionContext.Implicits.global は、newFixedThreadPool()にCPUコア数を渡して生成するようなので、ちょっとThread数がプア過ぎます(設定で変えられるらしい)。
システムの用途に合わせて ExecutionContext を切り替えたほうが良いですね。
ソースコード
と、ここまでコードレベルでの説明を一切してこなかったのですが、今回の話を検証したサンプルコードをgithubに乗せておきましたので、必要に応じて確認してください(と手抜き)。
GitHub - qtamaki/scala-executioncontext-example
追記: ForkJoinPool
ExecutionContext の実行アルゴリズムとなりうる ExecutorService を実装したクラスは ThreadPoolExecutor 以外にも、(標準ライブラリ上に)幾つかあります。
そのうちの一つが ForkJoinPool です。
ForkJoinPool は、ThreadPoolExecutor と似たThreadプールを提供しますが、そのアルゴリズムは似て非なるものです。
簡単に言うと、ThreadPoolExecutor は1本のタスクキューとThreadプールを持ち、タスクを終えたThreadに順次キューからスレッドを割り当てていきますが、 ForkJoinPool では、ThreadプールのThread毎にキューを持っており、個別にキューを消費していきます。個別のキューの消費時には、コンテクストスイッチングが発生しないため、高速に動作する可能性があります。
Fork/Join Frameworkの性能について(PDF)
この辺の資料を読む限りだと、タスクの発生とプールのThreadの数によってパフォーマンスにバラツキがあり、最適解を求めるのであればチューニングが必須で、考えなしに使うとシングルスレッドの場合より処理効率がだいぶ落ちる事があるようです。
なお、使用の際のパラメータは、Threadプールの数を指定するのみなので簡単です。
処理に応じて最適な数を渡してあげると良いと思います(言うは易し)。