Spring-Cloud-Stream と Spring-Binder-Kafka を使用して Kafka のアプリケーションを作成してみます。
今回作成するアプリケーションは、Producer と Consumer を1つずつ作成します。Producer は一定時間ごとにメッセージを Kafka の Topic へ送り、Consumer は Kafka の Topic からメッセージを読み取って標準出力へ出力します。作成するアプリケーションの構成は以下のとおりです。
アプリケーションは Java プロセスとして動き、Kafka の Topic はクラスタ上で動きます。アプリケーションを開発/運用するときにはJavaプロセスやKafkaのクラスタを管理しなければなりません。今回はこれらの実行環境として Oracle EventHub Cloud Service と Oracle Application Container Cloud Service (ACCS) を使います。今回はこのアプリケーションを以下の構成になります。
Producer と Consumer はJavaアプリケーションの実行環境としてACCS 上で動き、Kafka のクラスタはEventHub Dedicated 上で動きます。
作業全体の流れは以下のとおりです。
- Kafka クラスタ/Topicの構築
- Producer/Consumerアプリケーションの作成
- Producer/Consumerの実行環境の構築
Kafka クラスタ/Topicの構築
Kafka クラスタと Topic を作成します。
Kafkaクラスタの構築
Kafkaクラスタの構築は EventHub Cloud Service - Dedicated を使います。このクラウドサービスでは以下のプロセスを構成できます。
- Broker
- Zookeeper
- REST Proxy
- Kafka Connect
※REST API を使って Kafka とやりとりする場合には、専用のクラスタを作らずにOracle社が提供している共有のクラスタを使えます。
今回はChiroitoEventHub
と言う名前のクラスタを以下の構成で作成します。
次の画面では Kafka の構成を決定します。後から構成を変更できます。
次のページで内容を確認して作成します。作成が完了すると一覧画面でクラスタインスタンスを確認できます。
Topic の作成
Kafka Topicの構築は EventHub Cloud Service を使います。このクラウドサービスではTopicの以下の要素を構成できます。
- Topicの名前
- Topic が動く Kafkaクラスタ
- パーティション数
- メッセージの保持期間
今回は以下の構成でTopicを作成します。
次のページで内容を確認して作成します。作成が完了すると一覧画面でTopicを確認できます。
Producer/Consumerアプリケーションの作成
次に、アプリケーションを作成します。今回は既に作成したアプリケーションを使用します。
Producerのアプリケーション
Producerアプリケーションはこちら使用します。
sandbox/kafka-schedule at master · chiroito/sandbox · GitHub
このアプリケーションは 5 秒ごとに Topic へメッセージを送信します。
ACCS へデプロイできる形でパッケージングしたProducerのアプリケーションはこちらです。
sandbox/schedule-0.0.1-SNAPSHOT.zip at master · chiroito/sandbox · GitHub
Consumer アプリケーション
Consumer アプリケーションはこちらを使用します。
sandbox/kafka-stdout at master · chiroito/sandbox · GitHub
このアプリケーションは受け取ったメッセージを標準出力へ出力します。
ACCS へデプロイできる形でパッケージングしたConsumerのアプリケーションはこちらです。
sandbox/stdout-0.0.1-SNAPSHOT.zip at master · chiroito/sandbox · GitHub
Producer/Consumerの実行環境の構築
Producer と Consumer のアプリケーションを動かす環境となる ACCS を作成します。ACCS は EventHub との連携が必要です。Oracle Cloud ではサービス同士の連携にサービス・バインディングという仕組みを使います。
サービス・バインディングについて
サービス・バインディングは、インスタンスの作成時に設定ファイルで指定することも、あとで GUI でも設定できます。この仕組みを使うことでアプリケーション開発者はインフラを意識せずに各種サービスを連携できます。
今回は ACCS が EventHub の ChiroitoTopic をサービス・バインディングします。
次の画面は、ACCS の設定画面です。この設定画面ではサービス・バインディングを設定します。以下の例では、EventHub の ChiroitoTopic がバインドされています。
今回は以下の設定ファイルを使って、ACCS の作成時にサービス・バインディングします。
Deployment Configurationファイル
sandbox/deployment.json at master · chiroito/sandbox · GitHub
ACCS の作成
Producer と Consumer のアプリケーションを動かす ACCS インスタンスを作成します。
ACCS は Java 意外にも様々な言語で書かれたアプリケーションを動かせます。今回は Java SE を選択します。
まずは Consumer 側から作成します。必須項目は以下のとおりです。
- ACCS 名
- アプリケーション(先ほどダウンロードしたzip)
今回はサービス・バインディングのためDeployment Configurationファイルも指定します。
Producer も Consumer 同様に作成します。
作成が完了すると以下のように一覧で表示されます。
ここまでの作業で、環境の構築、アプリケーションのデプロイは完了です。
確認
Producer と Consumer のアプリケーションが動いていることを確認しましょう。Consumer のログファイルを確認して、メッセージを受け取っているかを確認します。
ACCS のログはAdministration
のLogs
タブで確認できます。リンクをクリックするとログファイルがダウンロードできます。
ログを開くと Producer から送られてきたメッセージが出力されていることが確認できます。
Received message : GenericMessage [payload=Hello, headers={kafka_offset=0, id=317e43cb-9671-7eda-2c79-c92317663572, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTopic=idcs-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-ChiroitoTopic, timestamp=1521173896075}]