[关闭]
@liyuj 2016-12-25T09:04:55.000000Z 字数 61491 阅读 6568

Apache-Ignite-1.8.0-中文开发手册

3.数据网格

3.1.数据网格

Ignite针对越来越火的水平扩展概念而构建,具有实时按需增加节点的能力。他可以支持线性扩展到几百个节点,通过数据位置的强语义以及数据关系路由来降低冗余数据噪声。
Ignite数据网格是一个基于内存的分布式键值存储,他可以视为一个分布式的分区化哈希,每个集群节点都持有所有数据的一部分,这意味着随着集群节点的增加,就可以缓存更多的数据。
与其他键值存储系统不同,Ignite通过可插拔的哈希算法来决定数据的位置,每个客户端都可以通过一个加入一个哈希函数决定一个键属于哪个节点,而不需要任何特定的映射服务器或者name节点。
Ignite数据网格支持本地、复制的、分区化的数据集,允许使用标准SQL语法方便地进行跨数据集查询,同时还支持在内存数据中进行分布式SQL关联。
Ignite数据网格轻量快速,是目前在集群中支持数据的事务性和原子性的最快的实现之一。

数据一致性
只要集群仍然处于活动状态,即使节点崩溃或者网络拓扑发生变化,Ignite都会保证不同集群节点中的数据的一致性。

JCache (JSR107)
Ignite实现了JCache(JSR107)规范。

3.2.超越JCache

3.2.1.摘要

Ignite是JCache(JSR107)规范的一个实现,JCache为数据访问提供了简单易用且功能强大的API。然而规范忽略了任何有关数据分布以及一致性的细节来允许开发商在自己的实现中有足够的自由度。
可以通过JCache实现:

在JCache之外,Ignite还提供了ACID事务,数据查询的能力(包括SQL),各种内存模型等。

3.2.2.IgniteCache

IgniteCache接口是Ignite缓存实现的一个入口,提供了保存和获取数据,执行查询,包括SQL,迭代和扫描等等的方法。
IgniteCache是基于JCache(JSR107)的,所以在非常基本的API上可以减少到javax.cache.Cache接口,然而IgniteCache还提供了JCache规范之外的、有用的功能,比如数据加载,查询,异步模型等。
可以从Ignite中直接获得IgniteCache的实例:

  1. Ignite ignite = Ignition.ignite();
  2. // Obtain instance of cache named "myCache".
  3. // Note that different caches may have different generics.
  4. IgniteCache<Integer, String> cache = ignite.cache("myCache");

动态缓存
也可以动态地创建缓存的一个实例,这时,Ignite会在所有的集群成员中创建和部署该缓存。

  1. Ignite ignite = Ignition.ignite();
  2. CacheConfiguration cfg = new CacheConfiguration();
  3. cfg.setName("myCache");
  4. cfg.setAtomicityMode(TRANSACTIONAL);
  5. // Create cache with given name, if it does not exist.
  6. IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);

XML配置
在任意的缓存节点上定义的基于Spring的XML配置的所有缓存同时会自动地在所有的集群节点上创建和部署(不需要在每个集群节点上指定同样的配置)。

3.2.3.基本操作

下面是一些JCache基本原子操作的例子:

  1. try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) {
  2. IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);
  3. // Store keys in cache (values will end up on different cache nodes).
  4. for (int i = 0; i < 10; i++)
  5. cache.put(i, Integer.toString(i));
  6. for (int i = 0; i < 10; i++)
  7. System.out.println("Got [key=" + i + ", val=" + cache.get(i) + ']');
  8. }

3.2.4.EntryProcessor

当在缓存中执行putsupdates操作时,通常需要在网络中发送完整的状态数据,而EntryProcessor可以直接在主节点上处理数据,只需要传输增量数据而不是全量数据。
此外,可以在EntryProcessor中嵌入自定义逻辑,比如,获取之前缓存的数据然后加1.
Java8:

  1. IgniteCache<String, Integer> cache = ignite.cache("mycache");
  2. // Increment cache value 10 times.
  3. for (int i = 0; i < 10; i++)
  4. cache.invoke("mykey", (entry, args) -> {
  5. Integer val = entry.getValue();
  6. entry.setValue(val == null ? 1 : val + 1);
  7. return null;
  8. });

Java7:

  1. IgniteCache<String, Integer> cache = ignite.jcache("mycache");
  2. // Increment cache value 10 times.
  3. for (int i = 0; i < 10; i++)
  4. cache.invoke("mykey", new EntryProcessor<String, Integer, Void>() {
  5. @Override
  6. public Object process(MutableEntry<Integer, String> entry, Object... args) {
  7. Integer val = entry.getValue();
  8. entry.setValue(val == null ? 1 : val + 1);
  9. return null;
  10. }
  11. });

原子性
EntryProcessor通过给键加锁以原子性方式执行。

3.2.5.异步支持

和Ignite中的所有API一样,IgniteCache实现了IgniteAsynchronousSupport接口,因此可以以异步的方式使用。

  1. // Enable asynchronous mode.
  2. IgniteCache<String, Integer> asyncCache = ignite.cache("mycache").withAsync();
  3. // Asynhronously store value in cache.
  4. asyncCache.getAndPut("1", 1);
  5. // Get future for the above invocation.
  6. IgniteFuture<Integer> fut = asyncCache.future();
  7. // Asynchronously listen for the operation to complete.
  8. fut.listenAsync(f -> System.out.println("Previous cache value: " + f.get()));

3.3.缓存模式

3.3.1.摘要

Ignite提供了三种不同的缓存操作模式,分区、复制和本地。缓存模型可以为每个缓存单独配置,缓存模型是通过CacheMode枚举定义的。

3.3.2.分区模式

分区模式是扩展性最好的分布式缓存模式,这种模式下,所有数据被均等地分布在分区中,所有的分区也被均等地拆分在相关的节点中,实际上就是为缓存的数据创建了一个巨大的内存内分布式存储。这个方式可以在所有节点上只要匹配总可用内存就可以存储尽可能多的数据,因此,可以在集群的所有节点的内存中可以存储TB级的数据,也就是说,只要有足够多的节点,就可以存储足够多的数据。
复制模式不同,它更新是很昂贵的,因为集群内的每个节点都需要更新,而分区模式更新就很廉价,因为对于每个键只需要更新一个主节点(可选择一个或者多个备份节点),然而,读取变得较为昂贵,因为只有特定节点才持有缓存的数据。
为了避免额外的数据移动,总是访问恰好缓存有要访问的数据的节点是很重要的,这个方法叫做关系并置,当工作在分区化缓存时强烈建议使用。

分区化缓存适合于数据量很大而更新频繁的场合。

下图简单描述了一下一个分区缓存,实际上,键A赋予了运行在JVM1上的节点,B赋予了运行在JVM3上的节点,等等。

下面的配置章节显示了如何配置缓存模式的例子。

3.3.3.复制模式

复制模式中,所有数据都被复制到集群内的每个节点,因为每个节点都有效所以这个缓存模式提供了最大的数据可用性。然而,这个模式每个数据更新都要传播到其他所有节点,因而会对性能和可扩展性产生影响。
Ignite中,复制缓存是通过分区缓存实现的,每个键都有一个主拷贝而且在集群内的其他节点也会有备份。

因为相同的数据被存储在所有的集群节点中,复制缓存的大小受到RAM最小的节点的有效内存限制。这个模式适用于读缓存比写缓存频繁的多而且数据集较小的场景,如果应用超过80%的时间用于查找缓存,那么就要考虑使用复制缓存模式了。

复制缓存适用于数据集不大而且更新不频繁的场合。

3.3.4.本地模式

本地模式是最轻量的模式,因为没有数据被分布化到其他节点。他适用于或者数据是只读的,或者需要定期刷新的场景中。当缓存数据失效需要从持久化存储中加载数据时,他也可以工作与通读模式。除了分布化以外,本地缓存包括了分布式缓存的所有功能,比如自动数据回收,过期,磁盘交换,数据查询以及事务。

3.3.5.配置

缓存可以每个缓存分别配置,通过设置CacheConfigurationcacheMode属性实现:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  5. <!-- Set a cache name. -->
  6. <property name="name" value="cacheName"/>
  7. <!-- Set cache mode. -->
  8. <property name="cacheMode" value="PARTITIONED"/>
  9. ...
  10. </bean>
  11. </property>
  12. </bean>

Java:

  1. CacheConfiguration cacheCfg = new CacheConfiguration("myCache");
  2. cacheCfg.setCacheMode(CacheMode.PARTITIONED);
  3. IgniteConfiguration cfg = new IgniteConfiguration();
  4. cfg.setCacheConfiguration(cacheCfg);
  5. // Start Ignite node.
  6. Ignition.start(cfg);

3.3.6.原子有序写模式

当分区缓存使用CacheAtomicityMode.ATOMIC模式时,可以配置成原子有序写模式,原子有序写决定哪个节点会赋予写版本(发送者或者主节点),它由CacheAtomicWriteOrderMode枚举定义,它有两种模式:CLOCKPRIMARY
CLOCK有序写模式中,写版本被赋予在一个发送者节点上,当使用CacheWriteSynchronizationMode.FULL_SYNCCLOCK模式会被自动开启,因为它性能更好,因为到主节点和备份节点的写请求是被同时发送的。
PRIMARY有序写模式中,写版本只被赋予到主节点上,这种模式下发送者只会将写请求发送到主节点上然后分配写版本再转发到备份节点上。
原子有序写模式可以通过CacheConfigurationatomicWriteOrderMode属性进行配置。
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  5. <!-- Set a cache name. -->
  6. <property name="name" value="cacheName"/>
  7. <!-- Atomic write order mode. -->
  8. <property name="atomicWriteOrderMode" value="PRIMARY"/>
  9. ...
  10. </bean>
  11. </property>
  12. </bean>

Java:

  1. CacheConfiguration cacheCfg = new CacheConfiguration();
  2. cacheCfg.setName("cacheName");
  3. cacheCfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.CLOCK);
  4. IgniteConfiguration cfg = new IgniteConfiguration();
  5. cfg.setCacheConfiguration(cacheCfg);
  6. // Start Ignite node.
  7. Ignition.start(cfg);

要了解有关原子模式的更多信息,请参照:3.9.事务章节。

XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  5. <!-- Set a cache name. -->
  6. <property name="name" value="cacheName"/>
  7. <!-- Set cache mode. -->
  8. <property name="cacheMode" value="PARTITIONED"/>
  9. <!-- Number of backup nodes. -->
  10. <property name="backups" value="1"/>
  11. ...
  12. </bean>
  13. </property>
  14. </bean>

Java:

  1. CacheConfiguration cacheCfg = new CacheConfiguration();
  2. cacheCfg.setName("cacheName");
  3. cacheCfg.setCacheMode(CacheMode.PARTITIONED);
  4. cacheCfg.setBackups(1);
  5. IgniteConfiguration cfg = new IgniteConfiguration();
  6. cfg.setCacheConfiguration(cacheCfg);
  7. // Start Ignite node.
  8. Ignition.start(cfg);

3.4.主节点和备份副本

3.4.1.摘要

分区模式下,赋予键的节点叫做这些键的主节点,对于缓存的数据,也可以有选择地配置任意多个备份节点。如果副本数量大于0,那么Ignite会自动地为每个独立的键赋予备份节点,比如,如果副本数量为1,那么数据网格内缓存的每个键都会有2个备份,一主一备。

因为性能原因备份默认是被关闭的。

3.4.2.配置备份

备份可以通过CacheConfigurationbackups属性进行配置,像下面这样:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  5. <!-- Set a cache name. -->
  6. <property name="name" value="cacheName"/>
  7. <!-- Set cache mode. -->
  8. <property name="cacheMode" value="PARTITIONED"/>
  9. <!-- Number of backup nodes. -->
  10. <property name="backups" value="1"/>
  11. ...
  12. </bean>
  13. </property>
  14. </bean>

Java:

  1. CacheConfiguration cacheCfg = new CacheConfiguration();
  2. cacheCfg.setName("cacheName");
  3. cacheCfg.setCacheMode(CacheMode.PARTITIONED);
  4. cacheCfg.setBackups(1);
  5. IgniteConfiguration cfg = new IgniteConfiguration();
  6. cfg.setCacheConfiguration(cacheCfg);
  7. // Start Ignite node.
  8. Ignition.start(cfg);

3.4.3.同步和异步备份

CacheWriteSynchronizationMode枚举可以用来配置主节点和备份部分的同步和异步更新。同步写模式告诉Ignite在完成写或者提交之前客户端节点是否要等待来自远程节点的响应。
同步写模式可以设置为下面的三种之一:

同步写模式 描述
FULL_SYNC 客户端节点要等待所有相关远程节点的写入或者提交完成(主和备)。
FULL_ASYNC 这种情况下,客户端不需要等待来自相关节点的响应。这时远程节点会在获得他们的状态在任意的缓存写操作完成或者Transaction.commit()方法调用完成之后进行小幅更新。
PRIMARY_SYNC 这是默认模式,客户端节点会等待主节点的写或者提交完成,但不会等待备份节点的更新完成。

缓存数据一致性
注意不管那种写同步模式,缓存数据都会保持在所有相关节点上的完整一致性。

写同步模式可以通过CacheConfigurationwriteSynchronizationMode属性进行配置,像下面这样:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  5. <!-- Set a cache name. -->
  6. <property name="name" value="cacheName"/>
  7. <!-- Set write synchronization mode. -->
  8. <property name="writeSynchronizationMode" value="FULL_SYNC"/>
  9. ...
  10. </bean>
  11. </property>
  12. </bean>

Java:

  1. CacheConfiguration cacheCfg = new CacheConfiguration();
  2. cacheCfg.setName("cacheName");
  3. cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
  4. IgniteConfiguration cfg = new IgniteConfiguration();
  5. cfg.setCacheConfiguration(cacheCfg);
  6. // Start Ignite node.
  7. Ignition.start(cfg);

3.5.近缓存

分区化的缓存也可以通过缓存前移,他是一个较小的本地缓存,可以用来存储最近或者最频繁访问的数据。和分区缓存一样,可以控制近缓存的大小以及回收策略。
近缓存可以通过在Ignite.createNearCache(NearConfiguration)中传入NearConfiguration或者通过调用Ignite.getOrCreateNearCache(NearConfiguration)方法在客户端节点直接创建。使用Ignite.getOrCreateCache(CacheConfiguration, NearCacheConfiguration),可以在动态启动一个分布式缓存的同时为其创建一个近缓存。
Java:

  1. // Create near-cache configuration for "myCache".
  2. NearCacheConfiguration<Integer, Integer> nearCfg =
  3. new NearCacheConfiguration<>();
  4. // Use LRU eviction policy to automatically evict entries
  5. // from near-cache, whenever it reaches 100_000 in size.
  6. nearCfg.setNearEvictionPolicy(new LruEvictionPolicy<>(100_000));
  7. // Create a distributed cache on server nodes and
  8. // a near cache on the local node, named "myCache".
  9. IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(
  10. new CacheConfiguration<Integer, Integer>("myCache"), nearCfg);

在大多数情况下,只要用了Ignite的关系并置,近缓存就不应该用了。如果计算与相应的分区化缓存节点是并置的,那么近缓存根本就不需要了,因为所有数据只在分区化缓存的本地才有效。
然而,有时没有必要将计算任务发送给远端节点,比如近缓存可以显著提升可扩展性或者提升应用的整体性能。

事务
近缓存是完全事务性的,当服务端的数据发生改变时会自动地获得更新或者失效。
服务端节点的近缓存
每当以一个非托管的方式从服务器端的分区缓存中访问数据时,都需要通过CacheConfiguration.setNearConfiguration(...)方法在服务端节点上配置近缓存。

3.5.1.配置

CacheConfiguration中与近缓存有关的大部分参数都会继承于服务端的配置,比如,如果服务端缓存有一个ExpiryPolicy,近缓存中的条目也会基于同样的策略。
下表中列出的参数是不会从服务端配置中继承的,是通过NearCacheConfiguration对象单独提供的:

setter方法 描述 默认值
setNearEvictionPolicy(CacheEvictionPolicy) 近缓存回收策略
setNearStartSize(int) 缓存初始大小 375,000

3.6.缓存查询

3.6.1.摘要

Ignite提供了非常优雅的查询API,支持基于谓词的扫描查询、SQL查询(ANSI-99兼容)、文本查询。对于SQL查询,Ignite提供了内存内的索引,因此所有的数据检索都是非常快的,如果是在堆外内存中缓存数据的,那么查询索引也会缓存在堆外内存中。
Ignite也通过IndexingSpiSpiQuery类提供对自定义索引的支持。

3.6.2.主要的抽象

IgniteCache有若干个查询方法,这些方法可以获得一些Query的子类以及返回QueryCursor

查询
Query抽象类表示一个在分布式缓存上执行的抽象分页查询。可以通过Query.setPageSize(...)方法设置返回游标的每页大小(默认值是1024)。

查询游标
QueryCursor表示查询的结果集,可以透明地进行一页一页地迭代。每当迭代到每页的最后时,会自动地在后台请求下一页的数据,当不需要分页时,可以使用QueryCursor.getAll()方法,他会获得整个查询结果集然后存储在集合里。

关闭游标
如果调用了QueryCursor.getAll()方法,游标会自动关闭。如果通过for循环迭代一个游标或者显式地获得Iterator,必须显式地关闭或者使用AutoCloseable语法。

3.6.3.扫描查询

扫描查询可以通过用户定义的谓词以分布式的形式进行缓存的查询。
Java8:

  1. IgniteCache<Long, Person> cache = ignite.cache("mycache");
  2. // Find only persons earning more than 1,000.
  3. try (QueryCursor cursor = cache.query(new ScanQuery((k, p) -> p.getSalary() > 1000)) {
  4. for (Person p : cursor)
  5. System.out.println(p.toString());
  6. }

Java7:

  1. IgniteCache<Long, Person> cache = ignite.cache("mycache");
  2. // Find only persons earning more than 1,000.
  3. IgniteBiPredicate<Long, Person> filter = new IgniteBiPredicate<>() {
  4. @Override public boolean apply(Long key, Perons p) {
  5. return p.getSalary() > 1000;
  6. }
  7. };
  8. try (QueryCursor cursor = cache.query(new ScanQuery(filter)) {
  9. for (Person p : cursor)
  10. System.out.println(p.toString());
  11. }

扫描查询还支持可选的转换器闭包,它可以在服务端节点在将数据发送到客户端之前对其进行转换。这个很有用,比如,当只是希望从一个大的对象获取少量字段时,这样可以最小化网络的数据传输量,下面的示例显示了如何只获取对象的键,而不发送对象的值。
Java8:

  1. IgniteCache<Long, Person> cache = ignite.cache("mycache");
  2. // Get only keys for persons earning more than 1,000.
  3. List<Long> keys = cache.query(new ScanQuery<Long, Person>(
  4. (k, p) -> p.getSalary() > 1000), // Remote filter.
  5. Cache.Entry::getKey // Transformer.
  6. ).getAll();

Java7:

  1. IgniteCache<Long, Person> cache = ignite.cache("mycache");
  2. // Get only keys for persons earning more than 1,000.
  3. List<Long> keys = cache.query(new ScanQuery<>(
  4. // Remote filter.
  5. new IgniteBiPredicate<Long, Person>() {
  6. @Override public boolean apply(Long k, Person p) {
  7. return p.getSalary() > 1000;
  8. }
  9. }),
  10. // Transformer.
  11. new IgniteClosure<Cache.Entry<Long, Person>, Long>() {
  12. @Override public Long apply(Cache.Entry<Long, Person> e) {
  13. return e.getKey();
  14. }
  15. }
  16. ).getAll();

3.6.4.SQL查询

Ignite的SQL查询请参照SQL网格的相关章节。

3.6.5.文本查询

Ignite也支持通过Lucene索引实现的基于文本的查询。
文本查询:

  1. IgniteCache<Long, Person> cache = ignite.cache("mycache");
  2. // Query for all people with "Master Degree" in their resumes.
  3. TextQuery txt = new TextQuery(Person.class, "Master Degree");
  4. try (QueryCursor<Entry<Long, Person>> masters = cache.query(txt)) {
  5. for (Entry<Long, Person> e : cursor)
  6. System.out.println(e.getValue().toString());
  7. }

3.6.6.通过注解进行查询的配置

索引可以在代码中通过@QuerySqlField注解进行配置,来告诉Ignite那个类型要被索引,键值对可以传入CacheConfiguration.setIndexedTypes(MyKey.class, MyValue.class)方法。注意这个方法只会接受成对的类型,一个是键类型,一个是值类型。

Java:

  1. public class Person implements Serializable {
  2. /** Person ID (indexed). */
  3. @QuerySqlField(index = true)
  4. private long id;
  5. /** Organization ID (indexed). */
  6. @QuerySqlField(index = true)
  7. private long orgId;
  8. /** First name (not-indexed). */
  9. @QuerySqlField
  10. private String firstName;
  11. /** Last name (not indexed). */
  12. @QuerySqlField
  13. private String lastName;
  14. /** Resume text (create LUCENE-based TEXT index for this field). */
  15. @QueryTextField
  16. private String resume;
  17. /** Salary (indexed). */
  18. @QuerySqlField(index = true)
  19. private double salary;
  20. ...
  21. }

3.6.7.使用QueryEntity进行查询配置

索引和字段也可以通过org.apache.ignite.cache.QueryEntity进行配置,他便于通过Spring使用XML进行配置,详细信息可以参照JavaDoc。他与@QuerySqlField注解是等价的,因为在内部类注解会被转换成查询实体。
XML:

  1. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  2. <property name="name" value="mycache"/>
  3. <!-- Configure query entities -->
  4. <property name="queryEntities">
  5. <list>
  6. <bean class="org.apache.ignite.cache.QueryEntity">
  7. <property name="keyType" value="java.lang.Long"/>
  8. <property name="valueType" value="org.apache.ignite.examples.Person"/>
  9. <property name="fields">
  10. <map>
  11. <entry key="id" value="java.lang.Long"/>
  12. <entry key="orgId" value="java.lang.Long"/>
  13. <entry key="firstName" value="java.lang.String"/>
  14. <entry key="lastName" value="java.lang.String"/>
  15. <entry key="resume" value="java.lang.String"/>
  16. <entry key="salary" value="java.lang.Double"/>
  17. </map>
  18. </property>
  19. <property name="indexes">
  20. <list>
  21. <bean class="org.apache.ignite.cache.QueryIndex">
  22. <constructor-arg value="id"/>
  23. </bean>