#chiroito ’s blog

Java を中心とした趣味の技術について

Infinispan Hot Rod の Distributed-cache で Cache Store/Loader を使う

キャッシュからデータを取得する時に、キャッシュにデータが乗っていないためキャッシュミスが発生し、RDBMSやオブジェクトストレージなどのデータストアからデータを取得し、次に備えてキャッシュに載せると言うことがあります。また、キャッシュの更新や他のデータストアの更新をした際には両者の整合性を保たなければなりません。

この様なケースで、キャッシュミスした場合に透過的に他のデータソースからデータを取り、データをキャッシュに載せて、クライアントにも返してくれたり、キャッシュに格納したデータとデータストアの整合性を保つため、キャッシュへの更新を透過的にデータストアへ適用してくれると非常に助かるのでは無いでしょうか。

Infinispanではこの様なよくある状況に備えて Cache Loader と Cache Storeという機能が使えます。Cache Loader と Cache Storeは独自に実装することもできますが、Infinispan では JPA、REST、ファイルを使った永続ストアが事前に実装されており、そちらを使うことが多いです。

今回は以下の流れで独自の実装を作る方法を紹介します。

  1. コーディング
  2. サーバの設定
  3. 実行

コーディング

Infinispan で永続ストアを実装するには以下のインターフェースを使用します。

  • CacheLoader
  • CacheWriter
  • AdvancedCacheLoader
  • AdvancedCacheWriter

マニュアルには以下の様に記載されています。

CacheLoader と CacheWriter は、ストアに対して読み書きを行う基本的なメソッドを提供します。CacheLoader は、必要なデータがキャッシュにない場合にデータストアからデータを取得します。 AdvancedCacheLoader と AdvancedCacheWriter は、基礎となるストレージを一括で処理する並列反復、失効したエントリーの削除、クリア、およびサイズ指定などの操作を提供します。 org.infinispan.persistence.file.SingleFileStore を使用すると、独自のストア実装を簡単に作成できます。

機能の必要さを考慮すると実際に使うのはAdvanced~になります。今回の実装ではこれらを全て継承しているAdvancedLoadWriteStoreを実装していきます。

今回は実際に使うことを想定して、永続化ストアで開発者が自作したエンティティを使います。自作したエンティティの使い方は以下を参照してください。

Infinispan Hot RodのDistributed-cacheで自作のエンティティを使う - #chiroito ’s blog

今回は、永続ストアを初期化する処理、データストアから読み書きする以下のメソッドだけを実装します。

  • void init(InitializationContext ctx)
  • void write(MarshallableEntry<? extends K, ? extends V> marshalledEntry)
  • MarshallableEntry loadEntry(Object bKey)

初期化処理では、与えられるコンテキストから各処理で必要なインスタンスを取得します。書込処理と読み込み処理は以下の流れで処理をしていくのが一般的です。

書き込み処理

  1. シリアライズされたエントリが渡されます
  2. そのエントリからシリアライズされたキーとバリューを取得
  3. それぞれをデシリアライズしてオブジェクトとして使えるようにする
  4. データストアへ書き込み

読み込み処理

  1. シリアライズされたキーが渡されます
  2. デシリアライズしてキーを取得
  3. データストアから読み込む
  4. 読み込んだデータからオブジェクトを作成
  5. オブジェクトをシリアライズ
  6. シリアライズされたエントリを作成

今回の実装では標準出力にデータを出力することでデータストアへの読み書きをしたとします。サンプルの実装は以下のとおりです。

CacheStoreSample.java

package chiroito.sample;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.persistence.Store;
import org.infinispan.persistence.spi.*;

import java.io.IOException;
import java.util.concurrent.Executor;

@Store
public class CacheStoreSample<K,V> implements AdvancedLoadWriteStore<K, V> {
    private MarshallableEntryFactory<K, V> entryFactory;
    private PersistenceMarshaller marshaller;

    @Override
    public void init(InitializationContext ctx) {
        this.entryFactory = ctx.getMarshallableEntryFactory();
        this.marshaller = ctx.getPersistenceMarshaller();
    }

    @Override
    public void write(MarshallableEntry<? extends K, ? extends V> marshalledEntry) {
        // シリアライズされたデータを取得
        WrappedByteArray bKey = (WrappedByteArray)marshalledEntry.getKey();
        WrappedByteArray bValue = (WrappedByteArray)marshalledEntry.getValue();

        try {
            // デシリアライズ
            String key = (String) this.marshaller.objectFromByteBuffer(bKey.getBytes());
            CustomEntity value = (CustomEntity) this.marshaller.objectFromByteBuffer(bValue.getBytes());

            // 永続化処理
            System.out.println("Stored");
            System.out.println(key);
            System.out.println(value);

        } catch (IOException|ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public MarshallableEntry loadEntry(Object bKey) {
        try {
            // デシリアライズ
            String key = (String) this.marshaller.objectFromByteBuffer(((WrappedByteArray)bKey).getBytes());

            // 読込処理
            System.out.println("Load");
            System.out.println(key);
            
            // オブジェクトを作成
            CustomEntity value = new CustomEntity("りんご", 3);

            // シリアライズ
            byte[] bValue = this.marshaller.objectToBuffer(value).getBuf();

            // エントリを作成
            return this.entryFactory.create(bKey, new WrappedByteArray(bValue));
        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public boolean delete(Object o) {
        return false;
    }

    @Override
    public int size() {
        return 0;
    }

    @Override
    public void clear() {
    }

    @Override
    public void purge(Executor executor, PurgeListener<? super K> purgeListener) {
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }
}

サーバの設定

このソースコードをビルドして、その成果物をサーバへコピーします。そして、サーバ側で設定ファイルをキャッシュストアを使うように設定してから起動すれば完了です。今回は自作のエンティティクラスを使うため追加でシリアライズの設定も必要になります。

ソースコードのビルドはmvn packageなどで行います。

作成されたjarファイルを確認し、Infinispanのserver/lib にそのjarファイルを置きます

設定ファイルにキャッシュストアとシリアライズの設定をします。server/conf/infinispanに以下を追記します。

     <cache-container>
        <!-- シリアライズの設定 -->
        <serialization>
            <context-initializer class="chiroito.sample.CustomInitializerImpl" />
        </serialization>
        <distributed-cache mode="SYNC" name="mycache">
            <transaction mode="NONE"/>
            <!-- キャッシュストアの設定 -->
            <persistence>
                <store class="chiroito.sample.CacheStoreSample"/>
            </persistence>
        </distributed-cache>
    </cache-container>

ここまでできたらサーバを起動しましょう。

実行

それでは実行してみましょう。次のコードは、key1みかんの情報を入れてから取得し、続いてkey2の情報も取得します。

    public static void main(String[] args) {
        RemoteCacheManager manager = new RemoteCacheManager();
        RemoteCache<String, CustomEntity> c = manager.getCache("mycache");

        c.put("key1", new CustomEntity("みかん", 3));
        System.out.println(c.get("key1"));
        System.out.println(c.get("key2"));

        manager.close();
    }

key1のデータはみかんをputしてからgetしているため、これを実行したコンソールにはみかんが出力されます。サーバ側ではput処理によってCacheStoreが動くため、サーバ側のコンソールにみかんが出力されます。みかんをgetする時には既にキャッシュに乗っているため、CacheLoaderは動作しません。

次にkey2のデータをgetするとデータがキャッシュ上にありません。そのため、CacheLoaderが該当するデータを読み込みます。これによってクライアントにりんごが返されて、これを実行したコンソールには出力されます。サーバ側では、データがキャッシュに乗っていないものに対するgetが行われたのでCacheLoaderが実行され、サーバ側のコンソールにりんごが出力されます。

クライアント側のコンソールは以下の様になります。

3 24, 2020 5:56:27 午後 org.infinispan.client.hotrod.RemoteCacheManager actualStart
INFO: ISPN004021: Infinispan version: Infinispan 'Turia' 10.1.3.Final
3 24, 2020 5:56:27 午後 org.infinispan.client.hotrod.impl.protocol.Codec20 readNewTopologyAndHash
INFO: ISPN004006: Server sent new topology view (id=1, age=0) containing 1 addresses: [192.168.1.1:11222]
CustomEntity{name='みかん', num=3}
CustomEntity{name='りんご', num=3}

サーバ側にはみかんのデータが格納されたこと、key2のデータが読み込まれたことを表す以下のログが出力されます。

サーバ側のコンソールは以下の様になります。

Stored
key1
CustomEntity{name='みかん', num=3}
Load
key2

このようにCacheLoaderCacheStoreを使うことでキャッシュを使うだけで透過的にデータストアへもアクセスできるようになり、非常に便利になります。