#chiroito ’s blog

Java を中心とした趣味の技術について

Managed Cloud Service で Kafka アプリケーションを作成する

Spring-Cloud-Stream と Spring-Binder-Kafka を使用して Kafka のアプリケーションを作成してみます。

今回作成するアプリケーションは、Producer と Consumer を1つずつ作成します。Producer は一定時間ごとにメッセージを Kafka の Topic へ送り、Consumer は Kafka の Topic からメッセージを読み取って標準出力へ出力します。作成するアプリケーションの構成は以下のとおりです。

f:id:chiroito:20180323092630p:plain

アプリケーションは Java プロセスとして動き、Kafka の Topic はクラスタ上で動きます。アプリケーションを開発/運用するときにはJavaプロセスやKafkaのクラスタを管理しなければなりません。今回はこれらの実行環境として Oracle EventHub Cloud Service と Oracle Application Container Cloud Service (ACCS) を使います。今回はこのアプリケーションを以下の構成になります。

f:id:chiroito:20180320201557p:plain

Producer と Consumer はJavaアプリケーションの実行環境としてACCS 上で動き、Kafka のクラスタはEventHub Dedicated 上で動きます。

作業全体の流れは以下のとおりです。

  1. Kafka クラスタ/Topicの構築
  2. Producer/Consumerアプリケーションの作成
  3. Producer/Consumerの実行環境の構築

Kafka クラスタ/Topicの構築

Kafka クラスタと Topic を作成します。

f:id:chiroito:20180323101839p:plain

Kafkaクラスタの構築

Kafkaクラスタの構築は EventHub Cloud Service - Dedicated を使います。このクラウドサービスでは以下のプロセスを構成できます。

  • Broker
  • Zookeeper
  • REST Proxy
  • Kafka Connect

※REST API を使って Kafka とやりとりする場合には、専用のクラスタを作らずにOracle社が提供している共有のクラスタを使えます。

今回はChiroitoEventHubと言う名前のクラスタを以下の構成で作成します。

f:id:chiroito:20180320162523p:plain

次の画面では Kafka の構成を決定します。後から構成を変更できます。

f:id:chiroito:20180320162533p:plain

次のページで内容を確認して作成します。作成が完了すると一覧画面でクラスタインスタンスを確認できます。

f:id:chiroito:20180320162542p:plain

Topic の作成

Kafka Topicの構築は EventHub Cloud Service を使います。このクラウドサービスではTopicの以下の要素を構成できます。

  • Topicの名前
  • Topic が動く Kafkaクラスタ
  • パーティション数
  • メッセージの保持期間

今回は以下の構成でTopicを作成します。

f:id:chiroito:20180320162559p:plain

次のページで内容を確認して作成します。作成が完了すると一覧画面でTopicを確認できます。

f:id:chiroito:20180320162608p:plain

Producer/Consumerアプリケーションの作成

次に、アプリケーションを作成します。今回は既に作成したアプリケーションを使用します。

f:id:chiroito:20180323101904p:plain

Producerのアプリケーション

Producerアプリケーションはこちら使用します。

sandbox/kafka-schedule at master · chiroito/sandbox · GitHub

このアプリケーションは 5 秒ごとに Topic へメッセージを送信します。

ACCS へデプロイできる形でパッケージングしたProducerのアプリケーションはこちらです。

sandbox/schedule-0.0.1-SNAPSHOT.zip at master · chiroito/sandbox · GitHub

Consumer アプリケーション

Consumer アプリケーションはこちらを使用します。

sandbox/kafka-stdout at master · chiroito/sandbox · GitHub

このアプリケーションは受け取ったメッセージを標準出力へ出力します。

ACCS へデプロイできる形でパッケージングしたConsumerのアプリケーションはこちらです。

sandbox/stdout-0.0.1-SNAPSHOT.zip at master · chiroito/sandbox · GitHub

Producer/Consumerの実行環境の構築

Producer と Consumer のアプリケーションを動かす環境となる ACCS を作成します。ACCS は EventHub との連携が必要です。Oracle Cloud ではサービス同士の連携にサービス・バインディングという仕組みを使います。

サービス・バインディングについて

サービス・バインディングは、インスタンスの作成時に設定ファイルで指定することも、あとで GUI でも設定できます。この仕組みを使うことでアプリケーション開発者はインフラを意識せずに各種サービスを連携できます。

今回は ACCS が EventHub の ChiroitoTopic をサービス・バインディングします。 f:id:chiroito:20180323155050p:plain

次の画面は、ACCS の設定画面です。この設定画面ではサービス・バインディングを設定します。以下の例では、EventHub の ChiroitoTopic がバインドされています。

f:id:chiroito:20180320203204p:plain

今回は以下の設定ファイルを使って、ACCS の作成時にサービス・バインディングします。

Deployment Configurationファイル

sandbox/deployment.json at master · chiroito/sandbox · GitHub

ACCS の作成

Producer と Consumer のアプリケーションを動かす ACCS インスタンスを作成します。

f:id:chiroito:20180323101921p:plain

ACCS は Java 意外にも様々な言語で書かれたアプリケーションを動かせます。今回は Java SE を選択します。

f:id:chiroito:20180320203105p:plain

まずは Consumer 側から作成します。必須項目は以下のとおりです。

  • ACCS 名
  • アプリケーション(先ほどダウンロードしたzip)

今回はサービス・バインディングのためDeployment Configurationファイルも指定します。

f:id:chiroito:20180320203120p:plain

Producer も Consumer 同様に作成します。

作成が完了すると以下のように一覧で表示されます。

f:id:chiroito:20180320203129p:plain

ここまでの作業で、環境の構築、アプリケーションのデプロイは完了です。

確認

Producer と Consumer のアプリケーションが動いていることを確認しましょう。Consumer のログファイルを確認して、メッセージを受け取っているかを確認します。

f:id:chiroito:20180323101936p:plain

ACCS のログはAdministrationLogsタブで確認できます。リンクをクリックするとログファイルがダウンロードできます。

f:id:chiroito:20180320203218p:plain

ログを開くと Producer から送られてきたメッセージが出力されていることが確認できます。

Received message : GenericMessage [payload=Hello, headers={kafka_offset=0, id=317e43cb-9671-7eda-2c79-c92317663572, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTopic=idcs-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-ChiroitoTopic, timestamp=1521173896075}]