之前的文章我们其实已经用到了两种不同的方式访问 Ignite 中的数据. 一种方式是第一篇文章中提到通过 JDBC 客户端用 SQL 访问数据, 在这篇文章中我们也会看到不使用 JDBC, 如何通过 Ignite API 用 SQL 访问数据. 还有用一种方式我称之为 cache API, 即用 get/put 来访问数据. Ignite 实现了 JCache(JSR 107)标准, 所以除了基本的 cache 操作外, 我们也会介绍一些 cache 的原子操作和 EntryProcessor 的使用.
Cache API
Ignite 提供了类似 Map 的 API 用来操作缓存上的数据, 只不过 Ignite 的实现把这个 Map 上的数据分布在多个节点上, 并且保证了这些操作是多线程 / 进程安全的. 我们可以简单的在多个节点上使用 get/put 往 Ignite 缓存里读写数据, 而把数据同步, 并发控制等复杂问题留给 Ignite 来解决. 除了 get/put 操作外, Ignite 还提供了其他的原子操作以及异步操作, 比如 getAndPutIfAbsent, getAndPutAsync, putIfAbsent, putIfAbsentAsync, getAndReplace, getAndReplaceAsync 等, 完整的 API 列表可以看这里.
Ignite 也支持在 JCache 标准中定义的 entry processor. 我没仔细读过 JCache 中对 entry processor 的定义, 但根据 Ignite 的文档和使用经验, 相比于基本的缓存 get/put 操作, entry processor 有下面几个特性 / 优点:
相比于 get/put 等基本操作, 在 entry processor 中我们可以实现更为复杂的 cache 更新逻辑, 比如我们可以读出缓存中的某个值, 然后做一些自定义计算后, 再更新缓存中的值.
和 get/put/putIfAbsent 等操作一样, 在 entry processor 中所有的操作是原子性的, 即保证了 entry processor 中定义的操作要么都成功, 要么都失败. 如果不用 entry processor, 为了达到相同目的, 我们需要对需要要更新的缓存数据加锁, 更新缓存数据, 最后释放锁. 而有了 entry proce, 我们可以更专注于缓存更新的逻辑, 而不用考虑如何加解锁.
Entry processor 允许在数据节点上直接进行操作. 分布式缓存中, 如果更新的缓存数据需要根据已经在缓存中的数据计算得到, 往往需要在多个节点之间传送的缓存数据. 而 entry processor 是把操作序列化后发送到缓存数据所在的节点, 比起序列化缓存数据, 要更高效.
Entry Processor 代码示例
下面我们改造一下之前的例子, 看看在 Ignite 中如何实现并调用一个 entry processor. 在这个例子中, cache 中 key 的值依旧是城市的名字, 但是 value 的值不再是简单的城市所在省份的名字, 而是一个 City 类的实例. 下面是 City 类的定义:
- public class City {
- private String cityName;
- private String provinceName;
- private long population;
- public City(String cityName, String provinceName, long population) {
- this.cityName = cityName;
- this.provinceName = provinceName;
- this.population = population;
- }
- ...
- }
在 City 类中, 我们放了一个 population 的成员变量, 用来表示该城市的人口数量. 在主程序中, 我们创建多个线程, 通过 entry processor 不断修改不同城市的人口数量. 每个 entry processor 做的事情也很简单: 读取当前人口数量加 1, 再把新值更新到 cache 中. 下面是主程序的代码
- public class IgniteEntryProcessorExample {
- public static void main(String[] args) {
- // start an ignite cluster
- Ignite ignite = startCluster(args);
- CacheConfiguration<String, City> cacheCfg = new CacheConfiguration<>();
- cacheCfg.setName("CITY");
- cacheCfg.setCacheMode(CacheMode.PARTITIONED);
- cacheCfg.setBackups(1);
- IgniteCache<String, City> cityProvinceCache = ignite.getOrCreateCache(cacheCfg);
- // let's create a city and put it in the cache
- City markham = new City("Markham", "Ontario", 0);
- cityProvinceCache.put(markham.getCityName(), markham);
- System.out.println("Insert" + markham.toString());
- // submit two tasks to increase population
- ExecutorService service = Executors.newFixedThreadPool(2);
- IncreaseCityPopulationTask task1 = new IncreaseCityPopulationTask(cityProvinceCache, markham.getCityName(), 10000);
- IncreaseCityPopulationTask task2 = new IncreaseCityPopulationTask(cityProvinceCache, markham.getCityName(), 20000);
- Future<?> result1 = service.submit(task1);
- Future<?> result2 = service.submit(task2);
- System.out.println("Submit two tasks to increase the population");
- service.shutdown();
- try {
- service.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // get the population and check whether it is 30000
- City city = cityProvinceCache.get(markham.getCityName());
- if (city.getPopulation() != 30000) {
- System.out.println("Oops, the population is" + city.getPopulation() + "instead of 30000");
- } else {
- System.out.println("Yeah, the population is" + city.getPopulation());
- }
- }
- public static class IncreaseCityPopulationTask implements Runnable {
- private IgniteCache<String, City> cityProvinceCache;
- private String cityName;
- private long population;
- public IncreaseCityPopulationTask(IgniteCache<String, City> cityProvinceCache,
- String cityName, long population) {
- this.cityProvinceCache = cityProvinceCache;
- this.cityName = cityName;
- this.population = population;
- }
- @Override
- public void run() {
- long p = 0;
- while(p++ <population) {
- cityProvinceCache.invoke(cityName, new EntryProcessor<String, City, Object>() {
- @Override
- public Object process(MutableEntry<String, City> mutableEntry, Object... objects)
- throws EntryProcessorException {
- City city = mutableEntry.getValue();
- if (city != null) {
- city.setPopulation(city.getPopulation() + 1);
- mutableEntry.setValue(city);
- }
- return null;
- }
- });
- }
- }
- }
- private static Ignite startCluster(String[] args) {
- ...
- }
- }
4~10 行, 和之前的例子一样, 我们启动一个 Ignite 节点, 并且创建一个名为 "CITY" 的 cache,cache 的 key 是城市的名字(String),cache 的 value 是一个 City 的对象实例.
13~15 行, 我们创建了一个名字为 "Markham" 的 City 实例, 它的初始 population 值是 0.
18~30 行, 我们创建了 2 个线程, 每个线程启动后都会调用 IncreaseCityPopulationTask 的 Run()函数, 不同的是在线程创建时我们指定了不同的 population 增加次数, 一个增加 10000 次, 一个增加 20000 次.
在 33~38 行, 我们从 cache 中取回名为 "Markham" 的实例, 并检查它最终的人口数量是不是 30000. 如果两个线程之间的操作 (读 cache, 增加人口, 写 cache) 是原子操作的话, 那么最终结果应该是 30000.
57~68 是 Entry Processor 的具体用法, 通过 cityProvinceCache.invoke()函数就可以调用 entry processor,invoke()函数的第一参数是 entry processor 要作用的数据的 key. 第二个参数是 entry processor 的一个实例, 该实例必须要实现接口类 EntryProcessor 的 process()函数. 在第二个参数之后, 还可以传入多个参数, 调用时这些参数会传给 process()函数.
在 process()函数的中, 第一个参数 mutableEntry 包含了 process()函数作用的数据的 key 和 value, 可以通过 MutableEntry.getKey()和 MutableEntry.getValue()得到 (如果该 key 的 value 不存在 cache 中, getValue() 会返回 null). 第二个之后的 objects 参数, 是调用 invoke()函数时除了 key 和 EntryProcessor 之外, 传入的参数.
在 entry processor 中可以实现一些复杂的逻辑, 然后调用 MutableEntry.setValue()对 value 值进行修改. 如果需要删除 value, 调用 MutableEntry.remove().
EntryProcessor()被调用时, cache 中对应的 key 值会被加锁, 所以对同一个键值的不同 entry processor 之间是互斥的, 保证了一个 entry processor 中的所有操作是原子操作.
另外, 有一点需要注意的是, 在 entry processor 中的操作需要时无状态的, 因为同一个 entry processor 有可能会在 primary 和 backup 节点上执行多次, 所以要保证 entry processor 中的操作只和 cache 中的当前值相关, 如果还和当前节点的一些参数和状态相关, 会导致在不同节点上运行 entry processor 后写入 cache 的值不一致. 详情见 invoke()函数的文档.
总结
这篇文章我们介绍了 Ignite Cache 基本的 put/get()操作外的其他操作, 比如异步的操作和 entry processor** 这篇文章里用到的例子的完整代码和 maven 工程可以在这里找到.
下一篇文章, 我们会继续看看如何使用 Ignite 的 SQL API 对 cache 进行查询和修改.
来源: https://www.cnblogs.com/peppapigdaddy/p/11269351.html