#chiroito ’s blog

Java を中心とした趣味の技術について

QuarkusのReactiveからBlockingする処理を呼び出す方法まとめ

前回(Quarkusはどのようにスレッドを使うのか - #chiroito ’s blog)の記事にある通り、QuarkusでReactiveのアプリケーション開発をしていると、Reactiveに対応したデータストアのAPIを使います。これによってReactiveのメリットを完全に得られます。ですが、全てのデータストアがReactiveに対応したAPIを提供しているわけではありません。

Reactiveに対応していないAPIを使う場合、Reactiveの中でBlockingするAPIを呼び出す必要があります。この時、Blockingする処理はReactiveが使うI/Oスレッドとは異なるスレッドで実行するのが良いです。

QuarkusではReactive用のスレッドからBlockingする処理を実行する方法がいくつかあるので、それぞれについて紹介します。

  • Blocking用のスレッドを使うようにメソッドシグネチャを指定する
  • @Blockingアノテーションを指定してBlocking用のスレッドを使うように明示する
  • ReactiveにCompletableFutureを渡す

Blockingするクライアント

今回の検証ではBlockingする処理はReactiveではないREST Clientを使ってREST APIへアクセスします。REST Clientのインターフェースは以下になります。

@Path("/delay")
@RegisterRestClient(configKey = "delay")
public interface BlockingRestClient {

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    Message invoke();
}

このREST ClientをエンドポイントでDIして使用します。

    @Inject
    @RestClient
    BlockingRestClient blockingRestClient;

検証

メソッドシグネチャ

Blocking用のスレッドを使うようにメソッドシグネチャを指定します。戻り値にUniなどReactive系のクラスを使わなければBlocking用のスレッドを使います。

    @GET
    @Path("/api")
    public Message endpoint1() {
        return blockingRestClient.invoke();
    }

JFRを見ると、Blocking処理がexecutor-thread-0で動いているのが分かります。

f:id:chiroito:20220304155759p:plain
Reactiveを使わない場合

@Blockingアノテーション

次はメソッドのシグネチャにはReactive系のクラスであるUniを使いますが、@Blockingアノテーションを指定してBlocking用のスレッドを使うように明示します。

    @GET
    @Path("/annotation/api")
    @Blocking
    public Uni<Message> endpoint2() {
        return Uni.createFrom().item(blockingRestClient.invoke());
    }

JFRを見ると、こちらも先ほどと同様にBlocking処理がexecutor-thread-0で動いているのが分かります。

f:id:chiroito:20220304160008p:plain
Reactiveを使うが@Blockingアノテーションを指定した場合

CompletableFuture

最後に、Javaの非同期APIであるCompletableFutureを引数としてUniへ渡す方法です。様々なAPIがCompletableFutureに対応しているので既存の知識やコードを流用して実装できるメリットがあります。

    @GET
    @Path("/completableFuture/api")
    public Uni<Message> endpoint3() {
        return Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> blockingRestClient.invoke()));
    }

この方法ではこれまでと異なり、Blocking処理はJavaの実行環境が提供するForkJoinPoolのスレッドで実行されます。スレッド名はForkJoinPool.commonPool-worker-nです。

f:id:chiroito:20220304160305p:plain
ReactiveとCompletableFutureを使う場合

なお、ForkJoinPoolのスレッド数はJVM起動引数-Djava.util.concurrent.ForkJoinPool.common.parallelismで指定します。

おまけ

当然ながら以下のようにReactive用のI/OスレッドでBlocking処理を実行できます。

    @GET
    @Path("/blocking/api")
    public Uni<Message> endpoint4() {
        return Uni.createFrom().item(blockingRestClient.invoke());
    }

この場合I/OスレッドがBlockingされてるのが分かります。

f:id:chiroito:20220304160420p:plain
Reactiveを使ってブロック処理を実行

この方法はスレッドの遷移がなくなるため、これ単体だと多少高速に動くのですが、I/Oスレッドが不足するとNon Blockingで動いている処理のスループットが頭打ちになりやすく、スレッド数のチューニングが必要となります。さまざまな用途で使用されるスレッドプールのチューニングは難しいため、チューニングに慣れてない人はBlockingを使いましょう。