前回(Quarkusはどのようにスレッドを使うのか - #chiroito ’s blog)の記事にある通り、QuarkusでReactiveのアプリケーション開発をしていると、Reactiveに対応したデータストアのAPIを使います。これによってReactiveのメリットを完全に得られます。ですが、全てのデータストアがReactiveに対応したAPIを提供しているわけではありません。
Reactiveに対応していないAPIを使う場合、Reactiveの中でBlockingするAPIを呼び出す必要があります。この時、Blockingする処理はReactiveが使うI/Oスレッドとは異なるスレッドで実行するのが良いです。
QuarkusではReactive用のスレッドからBlockingする処理を実行する方法がいくつかあるので、それぞれについて紹介します。
- Blocking用のスレッドを使うようにメソッドシグネチャを指定する
- @Blockingアノテーションを指定してBlocking用のスレッドを使うように明示する
- ReactiveにCompletableFutureを渡す
Blockingするクライアント
今回の検証ではBlockingする処理はReactiveではないREST Clientを使ってREST APIへアクセスします。REST Clientのインターフェースは以下になります。
@Path("/delay") @RegisterRestClient(configKey = "delay") public interface BlockingRestClient { @GET @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) Message invoke(); }
このREST ClientをエンドポイントでDIして使用します。
@Inject @RestClient BlockingRestClient blockingRestClient;
検証
メソッドシグネチャ
Blocking用のスレッドを使うようにメソッドシグネチャを指定します。戻り値にUniなどReactive系のクラスを使わなければBlocking用のスレッドを使います。
@GET @Path("/api") public Message endpoint1() { return blockingRestClient.invoke(); }
JFRを見ると、Blocking処理がexecutor-thread-0
で動いているのが分かります。
@Blockingアノテーション
次はメソッドのシグネチャにはReactive系のクラスであるUni
を使いますが、@Blockingアノテーションを指定してBlocking用のスレッドを使うように明示します。
@GET @Path("/annotation/api") @Blocking public Uni<Message> endpoint2() { return Uni.createFrom().item(blockingRestClient.invoke()); }
JFRを見ると、こちらも先ほどと同様にBlocking処理がexecutor-thread-0
で動いているのが分かります。
CompletableFuture
最後に、Javaの非同期APIであるCompletableFuture
を引数としてUni
へ渡す方法です。様々なAPIがCompletableFuture
に対応しているので既存の知識やコードを流用して実装できるメリットがあります。
@GET @Path("/completableFuture/api") public Uni<Message> endpoint3() { return Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> blockingRestClient.invoke())); }
この方法ではこれまでと異なり、Blocking処理はJavaの実行環境が提供するForkJoinPool
のスレッドで実行されます。スレッド名はForkJoinPool.commonPool-worker-n
です。
なお、ForkJoinPoolのスレッド数はJVM起動引数-Djava.util.concurrent.ForkJoinPool.common.parallelism
で指定します。
おまけ
当然ながら以下のようにReactive用のI/OスレッドでBlocking処理を実行できます。
@GET @Path("/blocking/api") public Uni<Message> endpoint4() { return Uni.createFrom().item(blockingRestClient.invoke()); }
この場合I/OスレッドがBlockingされてるのが分かります。
この方法はスレッドの遷移がなくなるため、これ単体だと多少高速に動くのですが、I/Oスレッドが不足するとNon Blockingで動いている処理のスループットが頭打ちになりやすく、スレッド数のチューニングが必要となります。さまざまな用途で使用されるスレッドプールのチューニングは難しいため、チューニングに慣れてない人はBlockingを使いましょう。