@liyuj
2017-01-03T20:24:01.000000Z
字数 91072
阅读 11069
Apache-Ignite-1.7.0-中文开发手册
Ignite针对越来越火的水平扩展概念而构建,具有实时按需增加节点的能力。他可以支持线性扩展到几百个节点,通过数据位置的强语义以及数据关系路由来降低冗余数据噪声。
Ignite数据网格是一个基于内存的分布式键值存储
,他可以视为一个分布式的分区化哈希,每个集群节点都持有所有数据的一部分,这意味着随着集群节点的增加,就可以缓存更多的数据。
与其他键值存储系统不同,Ignite通过可插拔的哈希算法来决定数据的位置,每个客户端都可以通过一个加入一个哈希函数决定一个键属于哪个节点,而不需要任何特定的映射服务器或者name节点。
Ignite数据网格支持本地、复制的、分区化的数据集,允许使用标准SQL语法方便地进行跨数据集查询,同时还支持在内存数据中进行分布式SQL关联。
Ignite数据网格轻量快速,是目前在集群中支持数据的事务性和原子性的最快的实现之一。
数据一致性
只要集群仍然处于活动状态,即使节点崩溃或者网络拓扑发生变化,Ignite都会保证不同集群节点中的数据的一致性。
JCache (JSR107)
Ignite实现了JCache
(JSR107)规范。
Ignite是JCache(JSR107)规范的一个实现,JCache为数据访问提供了简单易用且功能强大的API。然而规范忽略了任何有关数据分布以及一致性的细节来允许开发商在自己的实现中有足够的自由度。
可以通过JCache实现:
在JCache之外,Ignite还提供了ACID事务,数据查询的能力(包括SQL),各种内存模型等。
IgniteCache
接口是Ignite缓存实现的一个入口,提供了保存和获取数据,执行查询,包括SQL,迭代和扫描等等的方法。
IgniteCache
是基于JCache(JSR107)的,所以在非常基本的API上可以减少到javax.cache.Cache
接口,然而IgniteCache
还提供了JCache规范之外的、有用的功能,比如数据加载,查询,异步模型等。
可以从Ignite
中直接获得IgniteCache
的实例:
Ignite ignite = Ignition.ignite();
// Obtain instance of cache named "myCache".
// Note that different caches may have different generics.
IgniteCache<Integer, String> cache = ignite.cache("myCache");
动态缓存
也可以动态地创建缓存的一个实例,这时,Ignite会在所有的集群成员中创建和部署该缓存。
Ignite ignite = Ignition.ignite();
CacheConfiguration cfg = new CacheConfiguration();
cfg.setName("myCache");
cfg.setAtomicityMode(TRANSACTIONAL);
// Create cache with given name, if it does not exist.
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);
XML配置
在任意的缓存节点上定义的基于Spring的XML配置的所有缓存同时会自动地在所有的集群节点上创建和部署(不需要在每个集群节点上指定同样的配置)。
下面是一些JCache基本原子操作的例子:
try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) {
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);
// Store keys in cache (values will end up on different cache nodes).
for (int i = 0; i < 10; i++)
cache.put(i, Integer.toString(i));
for (int i = 0; i < 10; i++)
System.out.println("Got [key=" + i + ", val=" + cache.get(i) + ']');
}
当在缓存中执行puts
和updates
操作时,通常需要在网络中发送完整的状态数据,而EntryProcessor
可以直接在主节点上处理数据,只需要传输增量数据而不是全量数据。
此外,可以在EntryProcessor
中嵌入自定义逻辑,比如,获取之前缓存的数据然后加1.
Java8:
IgniteCache<String, Integer> cache = ignite.cache("mycache");
// Increment cache value 10 times.
for (int i = 0; i < 10; i++)
cache.invoke("mykey", (entry, args) -> {
Integer val = entry.getValue();
entry.setValue(val == null ? 1 : val + 1);
return null;
});
Java7:
IgniteCache<String, Integer> cache = ignite.jcache("mycache");
// Increment cache value 10 times.
for (int i = 0; i < 10; i++)
cache.invoke("mykey", new EntryProcessor<String, Integer, Void>() {
@Override
public Object process(MutableEntry<Integer, String> entry, Object... args) {
Integer val = entry.getValue();
entry.setValue(val == null ? 1 : val + 1);
return null;
}
});
原子性
:
EntryProcessor
通过给键加锁以原子性方式执行。
和Ignite中的所有API一样,IgniteCache
实现了IgniteAsynchronousSupport
接口,因此可以以异步的方式使用。
// Enable asynchronous mode.
IgniteCache<String, Integer> asyncCache = ignite.cache("mycache").withAsync();
// Asynhronously store value in cache.
asyncCache.getAndPut("1", 1);
// Get future for the above invocation.
IgniteFuture<Integer> fut = asyncCache.future();
// Asynchronously listen for the operation to complete.
fut.listenAsync(f -> System.out.println("Previous cache value: " + f.get()));
Ignite提供了三种不同的缓存操作模式,分区、复制和本地。缓存模型可以为每个缓存单独配置,缓存模型是通过CacheMode
枚举定义的。
分区
模式是扩展性最好的分布式缓存模式,这种模式下,所有数据被均等地分布在分区中,所有的分区也被均等地拆分在相关的节点中,实际上就是为缓存的数据创建了一个巨大的内存内分布式存储。这个方式可以在所有节点上只要匹配总可用内存就可以存储尽可能多的数据,因此,可以在集群的所有节点的内存中可以存储TB级的数据,也就是说,只要有足够多的节点,就可以存储足够多的数据。
与复制
模式不同,它更新是很昂贵的,因为集群内的每个节点都需要更新,而分区
模式更新就很廉价,因为对于每个键只需要更新一个主节点(可选择一个或者多个备份节点),然而,读取变得较为昂贵,因为只有特定节点才持有缓存的数据。
为了避免额外的数据移动,总是访问恰好缓存有要访问的数据的节点是很重要的,这个方法叫做关系并置,当工作在分区化缓存时强烈建议使用。
分区化缓存适合于数据量很大而更新频繁的场合。
下图简单描述了一下一个分区缓存,实际上,键A赋予了运行在JVM1上的节点,B赋予了运行在JVM3上的节点,等等。
下面的配置章节显示了如何配置缓存模式的例子。
复制
模式中,所有数据都被复制到集群内的每个节点,因为每个节点都有效所以这个缓存模式提供了最大的数据可用性。然而,这个模式每个数据更新都要传播到其他所有节点,因而会对性能和可扩展性产生影响。
Ignite中,复制缓存是通过分区缓存实现的,每个键都有一个主拷贝而且在集群内的其他节点也会有备份。
因为相同的数据被存储在所有的集群节点中,复制缓存的大小受到RAM最小的节点的有效内存限制。这个模式适用于读缓存比写缓存频繁的多而且数据集较小的场景,如果应用超过80%的时间用于查找缓存,那么就要考虑使用复制
缓存模式了。
复制缓存适用于数据集不大而且更新不频繁的场合。
本地
模式是最轻量的模式,因为没有数据被分布化到其他节点。他适用于或者数据是只读的,或者需要定期刷新的场景中。当缓存数据失效需要从持久化存储中加载数据时,他也可以工作与通读
模式。除了分布化以外,本地缓存包括了分布式缓存的所有功能,比如自动数据回收,过期,磁盘交换,数据查询以及事务。
缓存可以每个缓存分别配置,通过设置CacheConfiguration
的cacheMode
属性实现:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="cacheName"/>
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
...
</bean>
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration("myCache");
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
当分区缓存使用CacheAtomicityMode.ATOMIC
模式时,可以配置成原子有序写模式,原子有序写决定哪个节点会赋予写版本(发送者或者主节点),它由CacheAtomicWriteOrderMode
枚举定义,它有两种模式:CLOCK
和PRIMARY
。
CLOCK
有序写模式中,写版本被赋予在一个发送者节点上,当使用CacheWriteSynchronizationMode.FULL_SYNC
时CLOCK
模式会被自动开启,因为它性能更好,因为到主节点和备份节点的写请求是被同时发送的。
PRIMARY
有序写模式中,写版本只被赋予到主节点上,这种模式下发送者只会将写请求发送到主节点上然后分配写版本再转发到备份节点上。
原子有序写模式可以通过CacheConfiguration
的atomicWriteOrderMode
属性进行配置。
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="cacheName"/>
<!-- Atomic write order mode. -->
<property name="atomicWriteOrderMode" value="PRIMARY"/>
...
</bean>
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
cacheCfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.CLOCK);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
要了解有关
原子
模式的更多信息,请参照:3.9.事务
章节。
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="cacheName"/>
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- Number of backup nodes. -->
<property name="backups" value="1"/>
...
</bean>
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
cacheCfg.setBackups(1);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
在分区
模式下,赋予键的节点叫做这些键的主节点,对于缓存的数据,也可以有选择地配置任意多个备份节点。如果副本数量大于0,那么Ignite会自动地为每个独立的键赋予备份节点,比如,如果副本数量为1,那么数据网格内缓存的每个键都会有2个备份,一主一备。
因为性能原因备份默认是被关闭的。
备份可以通过CacheConfiguration
的backups
属性进行配置,像下面这样:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="cacheName"/>
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- Number of backup nodes. -->
<property name="backups" value="1"/>
...
</bean>
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
cacheCfg.setBackups(1);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
CacheWriteSynchronizationMode
枚举可以用来配置主节点和备份部分的同步和异步更新。同步写模式告诉Ignite在完成写或者提交之前客户端节点是否要等待来自远程节点的响应。
同步写模式可以设置为下面的三种之一:
同步写模式 | 描述 |
---|---|
FULL_SYNC |
客户端节点要等待所有相关远程节点的写入或者提交完成(主和备)。 |
FULL_ASYNC |
这种情况下,客户端不需要等待来自相关节点的响应。这时远程节点会在获得他们的状态在任意的缓存写操作完成或者Transaction.commit() 方法调用完成之后进行小幅更新。 |
PRIMARY_SYNC |
这是默认模式,客户端节点会等待主节点的写或者提交完成,但不会等待备份节点的更新完成。 |
缓存数据一致性
注意不管哪种写同步模式,缓存数据都会保持在所有相关节点上的完整一致性。
写同步模式可以通过CacheConfiguration
的writeSynchronizationMode
属性进行配置,像下面这样:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="cacheName"/>
<!-- Set write synchronization mode. -->
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
...
</bean>
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
分区化的缓存也可以通过近
缓存前移,他是一个较小的本地缓存,可以用来存储最近或者最频繁访问的数据。和分区缓存一样,可以控制近缓存的大小以及回收策略。
近缓存可以通过在Ignite.createNearCache(NearConfiguration)
中传入NearConfiguration
或者通过调用Ignite.getOrCreateNearCache(NearConfiguration)
方法在客户端节点直接创建。
Java:
// Create distributed cache on the server nodes, called "myCache".
ignite.getOrCreateCache(new CacheConfiguration<MyKey, MyValue>("myCache"));
// Create near-cache configuration for "myCache".
NearCacheConfiguration<MyKey, MyValue> nearCfg = new NearCacheConfiguration<>();
// Use LRU eviction policy to automatically evict entries
// from near-cache, whenever it reaches 100_000 in size.
nearCfg.setEvictionPolicy(new LruEvictionPolicy<>(100_000));
// Create near-cache for "myCache".
IgniteCache<MyKey, MyValue> cache = ignite.getOrCreateNearCache("myCache", nearCfg);
在大多数情况下,只要用了Ignite的关系并置,近缓存就不应该用了。如果计算与相应的分区化缓存节点是并置的,那么近缓存根本就不需要了,因为所有数据只在分区化缓存的本地才有效。
然而,有时没有必要将计算任务发送给远端节点,比如近缓存可以显著提升可扩展性或者提升应用的整体性能。
事务
近缓存是完全事务性的,当服务端的数据发生改变时会自动地获得更新或者失效。
服务端节点的近缓存
每当以一个非托管的方式从服务器端的分区
缓存中访问数据时,都需要通过CacheConfiguration.setNearConfiguration(...)
方法在服务端节点上配置近缓存。
CacheConfiguration
中与近缓存有关的大部分参数都会继承于服务端的配置,比如,如果服务端缓存有一个ExpiryPolicy
,近缓存中的条目也会基于同样的策略。
下表中列出的参数是不会从服务端配置中继承的,是通过NearCacheConfiguration
对象单独提供的:
setter方法 | 描述 | 默认值 |
---|---|---|
setNearEvictionPolicy(CacheEvictionPolicy) | 近缓存回收策略 | 无 |
setNearStartSize(int) | 缓存初始大小 | 375,000 |
Ignite提供了非常优雅的查询API,支持基于谓词的扫描查询、SQL查询(ANSI-99兼容)、文本查询。对于SQL查询,Ignite提供了内存内的索引,因此所有的数据检索都是非常快的,如果是在堆外内存
中缓存数据的,那么查询索引也会缓存在堆外内存中。
Ignite也通过IndexingSpi
和SpiQuery
类提供对自定义索引的支持。
IgniteCache
有若干个查询方法,这些方法可以获得一些Query
的子类以及返回QueryCursor
。
查询
Query
抽象类表示一个在分布式缓存上执行的抽象分页查询。可以通过Query.setPageSize(...)
方法设置返回游标的每页大小(默认值是1024
)。
查询游标
QueryCursor
表示查询的结果集,可以透明地进行一页一页地迭代。每当迭代到每页的最后时,会自动地在后台请求下一页的数据,当不需要分页时,可以使用QueryCursor.getAll()
方法,他会获得整个查询结果集然后存储在集合里。
关闭游标
如果调用了QueryCursor.getAll()
方法,游标会自动关闭。如果通过for循环迭代一个游标或者显式地获得Iterator
,必须显式地关闭或者使用AutoCloseable
语法。
扫描查询可以通过用户定义的谓词以分布式的形式进行缓存的查询。
Java8:
IgniteCache<Long, Person> cache = ignite.cache("mycache");
// Find only persons earning more than 1,000.
try (QueryCursor cursor = cache.query(new ScanQuery((k, p) -> p.getSalary() > 1000)) {
for (Person p : cursor)
System.out.println(p.toString());
}
Java7:
IgniteCache<Long, Person> cache = ignite.cache("mycache");
// Find only persons earning more than 1,000.
IgniteBiPredicate<Long, Person> filter = new IgniteBiPredicate<>() {
@Override public boolean apply(Long key, Perons p) {
return p.getSalary() > 1000;
}
};
try (QueryCursor cursor = cache.query(new ScanQuery(filter)) {
for (Person p : cursor)
System.out.println(p.toString());
}
Ignite的SQL查询请参照SQL查询。
Ignite也支持通过Lucene索引实现的基于文本的查询。
文本查询:
IgniteCache<Long, Person> cache = ignite.cache("mycache");
// Query for all people with "Master Degree" in their resumes.
TextQuery txt = new TextQuery(Person.class, "Master Degree");
try (QueryCursor<Entry<Long, Person>> masters = cache.query(txt)) {
for (Entry<Long, Person> e : cursor)
System.out.println(e.getValue().toString());
}
索引可以在代码中通过@QuerySqlField
注解进行配置,来告诉Ignite那个类型要被索引,键值对可以传入CacheConfiguration.setIndexedTypes(MyKey.class, MyValue.class)
方法。注意这个方法只会接受成对的类型,一个是键类型,一个是值类型。
Java:
public class Person implements Serializable {
/** Person ID (indexed). */
@QuerySqlField(index = true)
private long id;
/** Organization ID (indexed). */
@QuerySqlField(index = true)
private long orgId;
/** First name (not-indexed). */
@QuerySqlField
private String firstName;
/** Last name (not indexed). */
@QuerySqlField
private String lastName;
/** Resume text (create LUCENE-based TEXT index for this field). */
@QueryTextField
private String resume;
/** Salary (indexed). */
@QuerySqlField(index = true)
private double salary;
...
}
索引和字段也可以通过org.apache.ignite.cache.QueryEntity
进行配置,他便于通过Spring使用XML进行配置,详细信息可以参照JavaDoc。他与@QuerySqlField
注解是等价的,因为在内部类注解会被转换成查询实体。
XML:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="mycache"/>
<!-- Configure query entities -->
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Long"/>
<property name="valueType" value="org.apache.ignite.examples.Person"/>
<property name="fields">
<map>
<entry key="id" value="java.lang.Long"/>
<entry key="orgId" value="java.lang.Long"/>
<entry key="firstName" value="java.lang.String"/>
<entry key="lastName" value="java.lang.String"/>
<entry key="resume" value="java.lang.String"/>
<entry key="salary" value="java.lang.Double"/>
</map>
</property>
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="id"/>
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="orgId"/>
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="salary"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
Java:
CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();
...
cacheCfg.setName("mycache");
// Setting up query entity.
QueryEntity queryEntity = new QueryEntity();
queryEntity.setKeyType(Long.class.getName());
queryEntity.setValueType(Person.class.getName());
// Listing query fields.
LinkedHashMap<String, String> fields = new LinkedHashMap();
fields.put("id", Long.class.getName());
fields.put("orgId", Long.class.getName());
fields.put("firstName", String.class.getName());
fields.put("lastName", String.class.getName());
fields.put("resume", String.class.getName());
fields.put("salary", Double.class.getName());
queryEntity.setFields(fields);
// Listing indexes.
Collection<QueryIndex> indexes = new ArrayList<>(3);
indexes.add(new QueryIndex("id"));
indexes.add(new QueryIndex("orgId"));
indexes.add(new QueryIndex("salary"));
queryEntity.setIndexes(indexes);
...
cacheCfg.setQueryEntities(Arrays.asList(queryEntity));
...
Ignite对SQL查询的支持几乎没有任何限制,语法兼容于ANSI-99,可以使用任何SQL函数,聚合以及分组,Ignite会找出从哪里获得查询结果。
可以看下面的SQLQuery示例。
Ignite支持并置和非并置的分布式SQL关联,而且,如果数据位于不同的缓存,Ignite也允许跨缓存的关联。
在分区
和复制
缓存之间进行关联也没有什么限制。然而,如果在两个分区
缓存间关联,一定要确保要么要关联的键是并置的,要么为查询开启了非并置的关联参数。
可以看下面的SQLQuery关联示例。
不用查询整个对象,可以只选择查询特定的字段来减少网络和序列化的开销,为此,Ignite有一个字段查询
的概念,当想执行一些聚合查询时这个也很有用。
可以看下面的SqlFieldsQuery示例。
可以从多个缓存中查询数据,这时缓存名称就充当了常规SQL的模式名称。这意味着所有的缓存都可以用带引号的缓存名称来称呼。查询创建的缓存充当了默认模式并且不需要显式地指定。
可以看下面的跨缓存SqlFieldsQuery示例。
SQLQuery:
IgniteCache<Long, Person> cache = ignite.cache("personCache");
SqlQuery sql = new SqlQuery(Person.class, "salary > ?");
// Find only persons earning more than 1,000.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs(1000))) {
for (Entry<Long, Person> e : cursor)
System.out.println(e.getValue().toString());
}
SQLQuery关联:
IgniteCache<Long, Person> cache = ignite.cache("personCache");
// SQL join on Person and Organization.
SqlQuery sql = new SqlQuery(Person.class,
"from Person as p, \"orgCache\".Organization as org"
+ "where p.orgId = org.id "
+ "and lower(org.name) = lower(?)");
// Find all persons working for Ignite organization.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs("Ignite"))) {
for (Entry<Long, Person> e : cursor)
System.out.println(e.getValue().toString());
}
SqlFieldsQuery:
IgniteCache<Long, Person> cache = ignite.cache("personCache");
// Execute query to get names of all employees.
SqlFieldsQuery sql = new SqlFieldsQuery(
"select concat(firstName, ' ', lastName) from Person");
// Iterate over the result set.
try (QueryCursor<List<?>> cursor = cache.query(sql) {
for (List<?> row : cursor)
System.out.println("personName=" + row.get(0));
}
跨缓存SqlFieldsQuery:
// In this example, suppose Person objects are stored in a
// cache named 'personCache' and Organization objects
// are stored in a cache named 'orgCache'.
IgniteCache<Long, Person> personCache = ignite.cache("personCache");
// Select with join between Person and Organization to
// get the names of all the employees of a specific organization.
SqlFieldsQuery sql = new SqlFieldsQuery(
"select Person.name "
+ "from Person as p, \"orgCache\".Organization as org where "
+ "p.orgId = org.id "
+ "and org.name = ?");
// Execute the query and obtain the query result cursor.
try (QueryCursor<List<?>> cursor = personCache.query(sql.setArgs("Ignite"))) {
for (List<?> row : cursor)
System.out.println("Person name=" + row.get(0));
}
默认情况下,如果一个SQL关联需要跨越多个Ignite缓存才能完成,那么所有的缓存都应该是并置的,否则查询最终执行之后会获得一个不完整的结果,因为在关联阶段,一个节点只会使用本地的数据。正如下面的图1所示,首先,一个SQL查询会发给查询相关数据所在的所有节点(Q),然后查询就在每个节点(E(Q))的本地数据集上执行,最后,所有的执行结果会在客户端侧(R)进行汇总。
除此之外,关系并置还是一个功能强大的概念,即一旦建立了一个应用的业务实体(缓存),就可以以一种最优的方式执行跨缓存的关联,然后返回一个完整的、一致的结果集。但是有时总是有可能无法并置所有的数据,这时,就可能无法执行满足需求的所有SQL查询了。
从1.7版本开始,Ignite开始支持非并置的分布式关联,解决的就是当无法或者非常困难并置所有的数据时,还想在非并置的数据上执行一部分SQL查询的问题。
在实践中不要过度使用基于非并置的分布式关联的方式,因为这种关联方式的性能差于基于关系并置的关联,因为要完成这个查询,要有更多的网络开销和节点间的数据移动。
当通过SqlQuery.setDistributedJoins(boolean)
参数为一个SQL查询启用了非并置的分布式关联之后,查询映射的节点就会从远程节点通过发送广播或者单播请求的方式获取缺失的数据(本地不存在的数据),正如图2所示,有一个潜在的数据移动步骤(D(Q))。潜在的单播请求只会在关联在主键(缓存键)或者关系键上完成之后才会发送,因为执行关联的节点知道缺失数据的位置,其他所有的情况都会发送广播请求。
不管是广播还是单播请求,都是由一个节点发送到另一个节点来获取缺失的数据,是按照顺序执行的。SQL引擎会将所有的请求组成若干批量,这个批量的大小是由
lQuery.setPageSize(int)
参数管理的。
下面的代码片段是从Ignite的发行版的CacheQueryExample中提取的:
IgniteCache<AffinityKey<Long>, Person> cache = ignite.cache("personCache");
// SQL clause query with join over non-collocated data.
String joinSql =
"from Person, \"orgCache\".Organization as org " +
"where Person.orgId = org.id " +
"and lower(org.name) = lower(?)";
SqlQuery qry = new SqlQuery<AffinityKey<Long>, Person>(Person.class, joinSql).
setArgs("ApacheIgnite");
// Enable distributed joins for the query.
qry.setDistributedJoins(true);
// Execute the query to find out employees for specified organization.
System.out.println("Following people are 'ApacheIgnite' employees (distributed join): ", cache.query(qry).getAll());
要了解详细信息,可以参照非并置的分布式关联。
索引可以在代码中通过@QuerySqlField
注解进行配置。要告诉Ignite哪个类型应该被索引,要像下面的例子那样给CacheConfiguration.setIndexedTypes
方法传入键值对。注意这个方法只接受类型对,一个是键类型一个是值类型,基本类型要以包装类型的形式传入。
Java:
CacheConfiguration<Object,Object> ccfg = new CacheConfiguration<>();
// Here we are setting 3 key-value type pairs to be indexed.
ccfg.setIndexedTypes(
MyKey.class, MyValue.class,
Long.class, MyOtherValue.class,
UUID.class, String.class
);
使字段对于SQL查询可见
要使字段对于SQL查询可见,需要给字段加上@QuerySqlField
注解。下面的例子中,字段age
对于SQL不可见,要注意的是这些字段都没有索引。
Java:
public class Person implements Serializable {
/** Will be visible in SQL. */
@QuerySqlField
private long id;
/** Will be visible in SQL. */
@QuerySqlField
private String name;
/** Will NOT be visible in SQL. */
private int age;
}
预定义字段
除了用@QuerySqlField
注解标注的所有字段,每个表都有两个特别的预定义字段:_key
和_val
,它表示到整个键对象和值对象的链接。这很有用,比如当他们中的一个是基本类型并且希望用它的值进行过滤时。要做到这一点,执行一个SELECT * FROM Person WHERE _key = 100
查询即可。Scala注解
在Scala类中,@QuerySqlField
注解必须和@Field
注解一起使用,这样的话这个字段对于Ignite才是可见的,就像这样的:@(QuerySqlField @field)
。
作为替代,也可以使用ignite-scalar
模块的@ScalarCacheQuerySqlField
注解,他不过是@Field
注解的别名。
单列索引
要使字段对于SQL不仅可见还希望加快查询的速度,需要索引该字段的值,要创建一个单列索引,只需要给字段加上@QuerySqlField(index = true)
注解即可。
Java:
public class Person implements Serializable {
/** Will be indexed in ascending order. */
@QuerySqlField(index = true)
private long id;
/** Will be visible in SQL, but not indexed. */
@QuerySqlField
private String name;
/** Will be indexed in descending order. */
@QuerySqlField(index = true, descending = true)
private int age;
}
Scala:
case class Person (
/** Will be indexed in ascending order. */
@(QuerySqlField @field)(index = true) id: Long,
/** Will be visible in SQL, but not indexed. */
@(QuerySqlField @field) name: String,
/** Will be indexed in descending order. */
@(QuerySqlField @field)(index = true, descending = true) age: Int
) extends Serializable {
...
}
Scala注解
在Scala类中,@QuerySqlField
注解应该是这样的:@(QuerySqlField @field)(index = true)
。
分组索引
当查询条件复杂时可以使用多字段索引来加快查询的速度,这时可以用@QuerySqlField.Group
注解。如果希望一个字段参与多个分组索引时也可以将多个@QuerySqlField.Group
注解加入orderedGroups
中。
在下面的分组索引的例子中,age
字段加入了名为age_salary_idx
的分组索引,他的分组序号是0并且降序排列,同一个分组索引中还有一个字段salary
,他的分组序号是3并且升序排列。最重要的是salary
字段还是一个单列索引(除了orderedGroups
声明之外,还加上了index = true
)。声明中的order
不需要是什么特别的数值,他只是用于分组内的字段排序。
Java:
public class Person implements Serializable {
/** Indexed in a group index with "salary". */
@QuerySqlField(orderedGroups={@QuerySqlField.Group(
name = "age_salary_idx", order = 0, descending = true)})
private int age;
/** Indexed separately and in a group index with "age". */
@QuerySqlField(index = true, orderedGroups={@QuerySqlField.Group(
name = "age_salary_idx", order = 3)})
private double salary;
}
注意,将
@QuerySqlField.Group
放在@QuerySqlField(orderedGroups={...})
外面是无效的。
索引和字段也可以通过org.apache.ignite.cache.QueryEntity
进行配置,它便于利用Spring进行XML配置。基本上来说它和@QuerySqlField
是等价的,因为类注解会在内部转换为查询实体。
XML:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="mycache"/>
<!-- Configure query entities -->
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Long"/>
<property name="valueType" value="org.apache.ignite.examples.Person"/>
<property name="fields">
<map>
<entry key="id" value="java.lang.Long"/>
<entry key="name" value="java.lang.String"/>
<entry key="age" value="java.lang.Integer"/>
<entry key="salary" value="java.lang.Long "/>
</map>
</property>
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="id"/>
</bean>
<!-- Group index. -->
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg>
<list>
<value>age</value>
<value>salary</value>
</list>
</constructor-arg>
<constructor-arg value="SORTED"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
Ignite中处理查询主要有两种方式:
复制
缓存中执行查询,那么Ignite会假定所有数据都是本地有效的然后只是在H2数据库引擎中执行一个简单的查询,本地
缓存中也是一样的。分区
缓存中执行查询,工作方式如下:查询会被解析并且拆分成多个Map查询和一个Reduce查询,然后所有的Map查询在分区缓存的所有数据节点上执行,再把结果提供给Reduce节点,他会在这些中间结果上依次执行Reduce查询。Ignite在SQL中支持EXPLAIN ...
语法,Ignite中读取执行计划是分析查询性能的一个主要方式,注意计划游标会包含很多行:最后一个是Reduce节点查询的,其他的是Map节点的。
Java:
SqlFieldsQuery sql = new SqlFieldsQuery(
"explain select name from Person where age = ?").setArgs(26);
System.out.println(cache.query(sql).getAll());
执行计划本身是通过H2生成的,详细请参照:http://www.h2database.com/html/performance.html#explain_plan。
当用Ignite进行开发时,有时对于检查表和索引是否正确或者运行在嵌入节点内部的H2数据库中的本地查询是非常有用的,为此Ignite提供了启动H2控制台的功能。要启用该功能,在启动节点时要将IGNITE_H2_DEBUG_CONSOLE
系统属性或者环境变量设置为true
。然后就可以在浏览器中打开控制台,可能需要点击控制台中的刷新
按钮,因为有可能控制台在数据库对象初始化之前打开。
Ignite支持将索引数据放在堆外内存,这个设计对于避免在堆上保存特别大的数据集导致频繁的垃圾回收以及不可预知的响应时间是很有用的。
当CacheConfiguration.setOffHeapMaxMemory()
设置为-1
时SQL索引会保存在堆上,否则会一直使用堆外索引。要注意这是打开和关闭堆外索引的唯一属性,例如CacheConfiguration.setMemoryMode()
方法对于索引不起作用。
当打开堆外功能时要提高SQL查询的性能,可以试着增加CacheConfiguration.setSqlOnheapRowCacheSize()
属性的值,他的默认值是10000.
Java:
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
// Set unlimited off-heap memory for cache and enable off-heap indexes.
ccfg.setOffHeapMaxMemory(0);
// Cache entries will be placed on heap and can be evited to off-heap.
ccfg.setMemoryMode(ONHEAP_TIERED);
ccfg.setEvictionPolicy(new RandomEvictionPolicy(100_000));
// Increase size of SQL on-heap row cache for off-heap indexes.
ccfg.setSqlOnheapRowCacheSize(100_000);
在Ignite应用中使用索引时,有很多事情需要考虑:
索引每个字段是错误的!
有序索引示例
| A | B | C |
| 1 | 2 | 3 |
| 1 | 4 | 2 |
| 1 | 4 | 4 |
| 2 | 3 | 5 |
| 2 | 4 | 4 |
| 2 | 4 | 5 |
任意条件,比如a = 1 and b > 3
,都会被视为有界范围,在log(N)
时间内两个边界在索引中可以被快速检索到,然后结果就是两者之间的任何数据。
下面的条件会使用索引:
a = ?
a = ? and b = ?
a = ? and b = ? and c = ?
从索引的角度,条件a = ?
和c = ?
不会好于a = ?
明显地,半界范围a > ?
可以工作得很好。
当执行SQL查询时有一些常见的陷阱需要注意:
select name from Person where sex='M' and (age = 20 or age = 30)
,age
上的索引并不会生效,虽然它明显比sex
上的索引选择性更强。要解决这个问题需要用UNION ALL重写这个查询(注意没有ALL的UNION会返回去重的行,这会改变查询的语意而且引入了额外的性能开销),就是这样的:select name from Person where sex='M' and age = 20 UNION ALL select name from Person where sex='M' and age = 30
,这个方式索引就被正确使用了。where id in (?, ?, ?)
,但是不能写where id in ?
然后传入一个数组或者集合)并且查询无法使用索引,要解决这两个问题需要像这样重写查询:select p.name from Person p join table(id bigint = ?) i on p.id = i.id
,这里可以提供一个任意长度的对象数组(Object[])作为参数,然后会在字段id
上使用索引。注意基本类型数组(比如int[],long[]等)无法使用这个语法,但是可以使用基本类型的包装器。持续查询对于当执行一个查询之后希望持续地获得该查询结果数据更新的通知时,是很有用的。
持续查询是通过ContinuousQuery
类实现的,他支持如下特性:
初始化查询
当执行持续查询时,有一个选项在开始监听更新之前执行初始化查询。初始化查询可以通过ContinuousQuery.setInitialQuery(Query)
方法进行设置,并且可以是任意查询类型,包括扫描查询,SQL查询和文本查询。这个参数是可选的,并且如果不设置,这个功能是不会启用的。
远程过滤器
过滤器执行在给定键对应的主节点上,然后评估事件是否会被传播给监听器。如果监听器返回true
,那么监听器就会收到通知,否则事件会被忽略。发生监听事件的节点会最大限度地减少监听通知的网络流量。远程过滤器可以通过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>)
方法进行设置。
本地监听器
每当事件通过远程过滤器,他们会被发送给客户端来通知本地的监听器,本地监听器是通过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>)
方法设置的。
Java8:
IgniteCache<Integer, String> cache = ignite.cache("mycache");
// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
// Optional initial query to select all keys greater than 10.
qry.setInitialQuery(new ScanQuery<Integer, String>((k, v) -> k > 10)):
// Callback that is called locally when update notifications are received.
qry.setLocalListener((evts) ->
evts.stream().forEach(e -> System.out.println("key=" + e.getKey() + ", val=" + e.getValue())));
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilter(e -> e.getKey() > 10);
// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
// Iterate through existing data stored in cache.
for (Cache.Entry<Integer, String> e : cur)
System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
// Add a few more keys and watch a few more query notifications.
for (int i = 5; i < 15; i++)
cache.put(i, Integer.toString(i));
}
Java7:
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);
// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
qry.setInitialQuery(new ScanQuery<Integer, String>(new IgniteBiPredicate<Integer, String>() {
@Override public boolean apply(Integer key, String val) {
return key > 10;
}
}));
// Callback that is called locally when update notifications are received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
for (CacheEntryEvent<Integer, String> e : evts)
System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
}
});
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
@Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
return e.getKey() > 10;
}
});
// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
// Iterate through existing data.
for (Cache.Entry<Integer, String> e : cur)
System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
// Add a few more keys and watch more query notifications.
for (int i = keyCnt; i < keyCnt + 10; i++)
cache.put(i, Integer.toString(i));
}
事件传递保证
持续查询的实现提供明确的一次保证,将一个事件传递给客户端的本地监听器。因为除了主节点,在每个备份节点维护一个更新队列是可能的。如果主节点故障或者由于某些其他原因网络发生了改变,之后每个备份节点会刷新他的内部队列的内容给客户端来确保事件都会被传递给客户端的本地监听器。
为了避免重复通知,当所有的备份节点都刷新他们的队列给客户端时,Ignite会为每个分区维护一个更新计数器。一旦某个分区的一个条目已经更新,这个分区的计数器在主节点和备份节点都会增加。这个计数器的值会和事件通知一起发给客户端,该客户端还维护该映射的副本。如果客户端收到了一个更新,对应的计数小于它的本地映射,这个更新会被视为重复的然后被忽略。
一旦客户端确认一个事件已经收到,主节点和备份节点会从它们的备份队列中删除该事件的记录。
Ignite支持两种类型的缓存操作,事务性和原子性,在事务性
模式中可以在一个事务中组合多个缓存操作,而原子性
模式支持多个原子性操作,一次一个。
这些原子化模式是在CacheAtomicityMode
枚举中定义的:
TRANSACTIONAL
ATOMIC
TRANSACTIONAL
模式开启了完全遵守ACID的事务,然而,如果只需要原子语义,因为性能原因还是建议使用ATOMIC
模式。
ATOMIC
模式因为避免了事务锁,所以性能更好,但是仍然提供了数据的原子性和一致性。ATOMIC
模式的另一个不同是批量写,比如putAll(...)
和removeAll(...)
方法不再可以在一个事务中执行并且可能部分失败,在部分失败时,会抛出CachePartialUpdateException
,它里面包含了更新失败的键列表。
原子化模式是在CacheAtomicityMode
枚举中定义的,可以在CacheConfiguration的atomicityMode
属性进行配置。
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="myCache"/>
<!-- Set atomicity mode, can be ATOMIC or TRANSACTIONAL.
ATOMIC is default. -->
<property name="atomicityMode" value="TRANSACTIONAL"/>
...
</bean>
</property>
<!-- Optional transaction configuration. -->
<property name="transactionConfiguration">
<bean class="org.apache.ignite.configuration.TransactionConfiguration">
<!-- Configure TM lookup here. -->
</bean>
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Optional transaction configuration. Configure TM lookup here.
TransactionConfiguration txCfg = new TransactionConfiguration();
cfg.setTransactionConfiguration(txCfg);
// Start Ignite node.
Ignition.start(cfg);
性能
注意当使用ATOMIC
模式时,事务是被禁用的,因为不需要事务,因此可以获得更高的性能和吞吐量。
IgniteTransactions
接口包括了启动和结束事务的功能,以及订阅监听器或者获得指标数据。
跨缓存事务
可以在一个事务中组合来自不同节点的多个操作。注意他可以在一个事务中更新不同类型的缓存,比如复制
和分区
缓存。
近缓存事务
近缓存是完全事务化的,当数据在服务端改变时,会自动地获得更新或者失效。
可以像下面这样获得IgniteTransactions
的一个实例:
Java:
Ignite ignite = Ignition.ignite();
IgniteTransactions transactions = ignite.transactions();
下面是一个事务如何在Ignite中执行的例子:
try (Transaction tx = transactions.txStart()) {
Integer hello = cache.get("Hello");
if (hello == 1)
cache.put("Hello", 11);
cache.put("World", 22);
tx.commit();
}
Ignite在事务中使用了2阶段提交的协议,但是只要适用也带有很多一阶段提交的优化。在一个事务中当数据更新时,Ignite会在本地事务映射中保持事务状态直到调用了commit()
方法,在这一点,只要需要,数据都会被传输到远程节点。
对于Ignite2阶段提交是怎么工作的更多信息,可以参照如下博客:
ACID完整性
Ignite提供了完整的ACID(原子性,一致性,隔离性和持久性)兼容事务来确保一致性。
当原子化模式配置为事务
时,Ignite对事务支持乐观
和悲观
的并发模型。并发模型决定了何时获得一个条目级的事务锁-在访问数据时或者在prepare
阶段。锁定可以防止对一个对象的并发访问。比如,当试图用悲观锁更新一个ToDo列表项时,服务端会在该对象上置一个锁以使其他的事务或者操作无法更新同一个条目,直到提交或者回滚该事务。不管在一个事务中使用那种并发模型,在提交之前都存在事务中的所有条目被锁定的时刻。
隔离级别定义了并发事务如何"看"以及处理针对同一个键的操作。Ignite支持读提交
、可重复读
、可序列化
隔离级别。
并发模型和隔离级别的所有组合都是可以同时使用的。下面是针对Ignite提供的每一个并发-隔离组合的行为和保证的描述。
在悲观
事务中,锁是在第一次读或者写访问期间获得(取决于隔离级别)然后被事务持有直到其被提交或者回滚。该模式中,锁首先在主节点获得然后在准备阶段提升至备份节点。下面的隔离级别可以配置为悲观
并发模型。
读提交
:数据被无锁地读取并且不会被事务本身缓存。如果缓存配置允许的话数据是可能从一个备份节点中读取的。在这个隔离级别中,可以有所谓的非可重复读,因为当在自己的事务中读取数据两次时,一个并发事务可以改变该数据。锁只有在第一次写访问时才会获得(包括EntryProcessor
调用)。这意味着事务中已经读取的一个条目在该事务提交时可能有一个不同的值,这种情况是不会抛出异常的。可重复读
:获得条目锁以及第一次读或者写访问时从主节点获得数据,然后就存储在本地事务映射中。之后对同一数据的所有连续访问都是本地化的,并且返回最后一次读或者被更新的事务值。这意味着没有其他的并发事务可以改变锁定的数据,这样就获得了事务的可重复读。可序列化
:在悲观
模式中,这个隔离级别与可重复读
是一样的工作方式。注意,在悲观
模式中,锁的顺序是很重要的。此外,Ignite可以按照用户提供的顺序依次并且准确地获得锁。
性能考量
设想网络中有三个节点(A、B、C),并且在事务中针对键[1, 2, 3, 4, 5, 6]执行一个putAll
。假定这些键以如下形式映射到节点:{A: 1, 4}, {B: 2, 5}, {C: 3, 6},因为Ignite在悲观
模式中无法改变获得锁的顺序,他会产生6次连续地网络往返:[A, B, C, A, B, C]。在键的锁定顺序对于一个事务的语义不重要的情况下,将键按照分区进行分组然后将在一个分区的键一起锁定是明智的。这在一个大的事务中可以显著地降低网络消息的数量。在这个示例中,如果对于一个putAll
键按照如下的方式排序:[1, 4, 2, 5, 3, 6],之后只需要3次的连续网络访问。
拓扑变化约束
注意,如果至少获得一个悲观事务锁,都不可能改变缓存的拓扑,直到事务被提交或者回滚,因此,不建议长时间地持有事务锁。
当处理分布式事务时必须要遵守的主要规则是参与一个事务的键的锁,必须按照同样的顺序获得,违反这个规则就可能导致分布式死锁。
Ignite无法避免分布式死锁,而是有一个内建的功能来使调试和解决这个问题更容易。
目前死锁检测过程只支持悲观事务,对乐观事务的支持会在Ignite的下一个版本中提供。
就像下面的代码片段所示,一个带有超时时间的事务启动。如果过了超时时间,死锁检测过程就会试图查找一个触发这个超时的可能的死锁。当超过超时时间时,会抛出TransactionTimeoutException
并且像触发CacheException
那样传播到应用层而不会管死锁。然而,如果检测到了一个死锁,返回的TransactionTimeoutException
的cause会是TransactionDeadlockException
(至少涉及死锁的一个事务)。
try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.READ_COMMITTED, 300, 0)) {
cache.put(1, 1);
cache.put(2, 1);
tx.commit();
}
catch (CacheException e) {
if (e.getCause() instanceof TransactionTimeoutException &&
e.getCause().getCause() instanceof TransactionDeadlockException)
System.out.println(e.getCause().getCause().getMessage());
}
TransactionDeadlockException
里面包含了有用的信息,有助于找到导致死锁的原因。
Deadlock detected:
K1: TX1 holds lock, TX2 waits lock.
K2: TX2 holds lock, TX1 waits lock.
Transactions:
TX1 [txId=GridCacheVersion [topVer=74949328, time=1463469328421, order=1463469326211, nodeOrder=1], nodeId=ad68354d-07b8-4be5-85bb-f5f2362fbb88, threadId=73]
TX2 [txId=GridCacheVersion [topVer=74949328, time=1463469328421, order=1463469326210, nodeOrder=1], nodeId=ad68354d-07b8-4be5-85bb-f5f2362fbb88, threadId=74]
Keys:
K1 [key=1, cache=default]
K2 [key=2, cache=default]
死锁检测是一个多步过程,依赖于集群中节点的数量、键以及可能导致死锁涉及的事务数,可能需要做很多次迭代。一个死锁检测的发起者是发起事务并且出现TransactionTimeoutException
错误的那个节点,这个节点会检查是否发生了死锁,通过与其他远程节点交换请求/响应,并且准备一个与死锁有关的、由TransactionDeadlockException
提供的报告,每个这样的消息(请求/响应)都会被称为一个迭代器。
因为死锁检测过程不结束,事务就不会回滚,有时,如果希望对于事务回滚有一个可预测的时间,调整一下参数还是有意义的(下面会描述)。
IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS
:指定死锁检测过程迭代器的最大数,如果这个属性的值小于等于0,死锁检测会被禁用(默认为1000);IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT
:指定死锁检测机制的超时时间(默认为1分钟)。注意如果迭代器太少的话,可能获得一个不完整的死锁检测报告。
如果想彻底避免死锁,可以看下面的无死锁事务章节。
在乐观事务中,条目锁是在准备
阶段从主节点获得的,然后提升至备份节点,该锁在事务提交时被释放。如果用户回滚事务没有试图做提交,是不会获得锁的。下面的隔离级别可以与乐观
并发模型配置在一起。
读提交
:应该作用于缓存的改变是在源节点上收集的,然后事务提交后生效。事务数据无锁地读取并且不会在事务中缓存。如果缓存配置允许的话该数据是可能从备份节点中读取的。在这个隔离级别中,可以有一个所谓的非可重复读,因为在自己的事务中读取数据两次时另一个事务可以修改数据。这个模式组合在第一次读或者写操作后如果条目值被修改是不会做校验的,并且不会抛出异常。可重复读
:这个隔离级别的事务的工作方式类似于乐观
读提交
的事务,只有一个不同-读取值缓存于源节点并且所有的后续读保证都是本地化的。这个模式组合在第一次读或者写操作后如果条目值被修改是不会做校验的,并且不会抛出异常。可序列化
:在第一次读访问之后会存储一个条目的版本,如果Ignite引擎检测到发起事务中的条目只要有一个被修改,Ignite就会在提交阶段放弃该事务,这是在提交阶段对网格内的事务中记载的条目的版本进行内部检查实现的。简而言之,这意味着Ignite如果在一个事务的提交阶段检测到一个冲突,就会放弃这个事务并且抛出TransactionOptimisticException
异常以及回滚已经做出的任何改变,开发者应该处理这个异常并且重试该事务。
IgniteTransactions txs = ignite.transactions();
// Start transaction in optimistic mode with serializable isolation level.
while (true) {
try (Transaction tx =
ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC,
TransactionIsolation.SERIALIZABLE)) {
// Modify cache entires as part of this transacation.
....
// commit transaction.
tx.commit();
// Transaction succeeded. Leave the while loop.
break;
}
catch (TransactionOptimisticException e) {
// Transaction has failed. Retry.
}
}
这里另外一个需要注意的重要的点是,即使一个条目只是简单地读取(没有改变,cache.put(...)),一个事务仍然可能失败,因为该条目的值对于发起事务中的逻辑很重要。
注意,对于读提交
和可重复读
事务,键的顺序是很重要的,因为这些模式中锁也是按顺序获得的。
对于乐观
的可序列化
事务,锁不是按顺序获得的。该模式中键可以按照任何顺序访问,因为事务锁是通过一个额外的检查以并行的方式获得的,这使得Ignite可以避免死锁。
这里需要引入几个概念来描述可序列化
的事务锁是如何工作的。Ignite中的每个事务都会被赋予一个叫做XidVersion
的可比较的版本号,事务提交时该事务中修改的每个条目都会被赋予一个叫做EntryVersion
的新的版本号,在如下情况下,版本号为XidVersionA
的乐观
可序列化
事务会失败:
悲观
的或者非可序列化乐观
事务在可序列化
事务中的一个条目上持有了一个锁;XidVersionB
的乐观
可序列化
事务,在XidVersionB > XidVersionA
时以及这个事务在可序列化
事务中的一个条目上持有了一个锁;乐观
可序列化
事务获得所有必要的锁时,存在在提交之前的版本与当前版本不同的条目;在一个高并发环境中,乐观锁可能导致一个很高的事务失败率。但是悲观锁如果锁被事务以一个不同的顺序获得可能导致死锁。然而,在一个无竞争的环境中,乐观可序列化锁对于大的事务可能提供更好的性能,因为网络交互的数量只取决于事务相关的节点的数量,而不取决于事务中的键的数量。
Ignite可以通过TransactionConfiguration#setTxManagerFactory
方法配置一个JTA事务管理器搜索类,事务管理器工厂是一个工厂,他给Ignite提供了一个JTA事务管理器的实例。
Ignite提供了一个CacheJndiTmFactory
工厂,他是一个通过JNDI名字查找事务管理器的开箱即用的事务管理器工厂实现。
设置了之后,在事务中的每一次缓存操作,Ignite都会检查是否存在一个进行中的JTA事务。如果JTA事务开启了,Ignite也会开启一个事务然后通过他自己的一个XAResource
的内部实现来将其加入JTA事务,Ignite事务会准备,提交或者干脆回滚相应的JTA事务。
下面是一个在Ignite中使用JTA事务管理器的示例:
Java:
// Get an instance of JTA transaction manager.
TMService tms = appCtx.getComponent(TMService.class);
// Get an instance of Ignite cache.
IgniteCache<String, Integer> cache = cache();
UserTransaction jtaTx = tms.getUserTransaction();
// Start JTA transaction.
jtaTx.begin();
try {
// Do some cache operations.
cache.put("key1", 1);
cache.put("key2", 2);
// Commit the transaction.
jtaTx.commit();
}
finally {
// Rollback in a case of exception.
if (jtaTx.getStatus() == Status.STATUS_ACTIVE)
jtaTx.rollback();
}
当堆上的数据集特别大时,堆外内存通过将缓存数据保存在Java主堆空间之外来避免因为JVM长时间的垃圾回收导致的暂停,但是数据仍然在内存中。
堆外索引
注意当配置了堆外存储时,Ignite也会把查询索引保存在堆外,这意味着索引也不会占用任何的堆内空间。
堆外内存与多进程
也可以通过在同一台物理机上以小型堆开启多个进程来管理垃圾回收导致的停顿,然而,当使用复制缓存时这个做法是浪费的。因为对于开启的每个JVM进程,最终都会缓存完全一致的复制数据。
Ignite提供了分层存储模型,使数据可以在堆内、堆外和交换空间之间保存和移动。越往上的层提供了更大的存储能力,逐渐地延迟也会增加。
Ignite提供了三种存储模型,是在CacheMemoryMode
中定义的。
存储模型 | 描述 |
---|---|
ONHEAP_TIERED |
保存在堆内,退出到堆外以及可选的存储在交换空间 |
OFFHEAP_TIERED |
保存在堆外,避开堆内以及可选的退出到交换空间 |
OFFHEAP_VALUES |
将键存储在堆内,将值存储在堆外 |
Ignite中,ONHEAP_TIERED
是默认的存储模型,所有的缓存数据都会存储在堆内,数据可以从堆内移动到堆外存储,如果配置了还可以移动到交换空间。
要配置ONHEAP_TIERED
存储模型,需要:
CacheConfiguration
的memoryMode
属性设置为ONHEAP_TIERED
.XML:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<!-- Store cache entries on-heap. -->
<property name="memoryMode" value="ONHEAP_TIERED"/>
<!-- Enable Off-Heap memory with max size of 10 Gigabytes (0 for unlimited). -->
<property name="offHeapMaxMemory" value="#{10 * 1024L * 1024L * 1024L}"/>
<!-- Configure eviction policy. -->
<property name="evictionPolicy">
<bean class="org.apache.ignite.cache.eviction.fifo.CacheFifoEvictionPolicy">
<!-- Evict to off-heap after cache size reaches maxSize. -->
<property name="maxSize" value="100000"/>
</bean>
</property>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
// Set off-heap memory to 10GB (0 for unlimited)
cacheCfg.setOffHeapMaxMemory(10 * 1024L * 1024L * 1024L);
CacheFifoEvictionPolicy evctPolicy = new CacheFifoEvictionPolicy();
// Store only 100,000 entries on-heap.
evctPolicy.setMaxSize(100000);
cacheCfg.setEvictionPolicy(evctPolicy);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
退出策略
注意如果在ONHEAP_TIERED模式中没有启用退出策略,数据是不会从堆内移动到堆外的。
这个存储模型可以配置将缓存数据直接存储在堆外内存空间,避开堆内内存。如果所有数据都存储在堆外,那么就没有必要显式地配置退出策略。如果存储超过了限制(0为没有限制),那么LRU退出策略就会被用于将数据从堆外存储可选地移动到交换空间,如果配置了的话。
要配置OFFHEAP_TIERED
存储模型,需要:
CacheConfiguration
的memoryMode
属性设置为OFFHEAP_TIERED
.XML:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<!-- Always store cache entries in off-heap memory. -->
<property name="memoryMode" value="OFFHEAP_TIERED"/>
<!-- Enable Off-Heap memory with max size of 10 Gigabytes (0 for unlimited). -->
<property name="offHeapMaxMemory" value="#{10 * 1024L * 1024L * 1024L}"/>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
// Set off-heap memory to 10GB (0 for unlimited)
cacheCfg.setOffHeapMaxMemory(10 * 1024L * 1024L * 1024L);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
这个存储模型可以将键存储在堆内,将值存储在堆外,这个存储模型适用于键较小和值较大的场景。
要配置OFFHEAP_VALUES
存储模型,需要:
CacheConfiguration
的memoryMode
属性设置为OFFHEAP_VALUES
.XML:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<!-- Always store cache entries in off-heap memory. -->
<property name="memoryMode" value="OFFHEAP_VALUES"/>
<!-- Enable Off-Heap memory with max size of 10 Gigabytes (0 for unlimited). -->
<property name="offHeapMaxMemory" value="#{10 * 1024L * 1024L * 1024L}"/>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_VALUES);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
当数据集超过了堆内和堆外存储的限值,需要配置交换空间以使Ignite可以将数据退出到磁盘而不是抛弃他们。
交换空间性能
因为交换空间是磁盘上的,所以性能会显著慢于堆内和堆外内存。
XML:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<!-- Enable swap. -->
<property name="swapEnabled" value="true"/>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setSwapEnabled(true);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
数据和计算以及数据和数据的并置可以显著地提升应用的性能和可扩展性。
在许多情况下,如果不同的缓存键被同时访问的话那么将他们并置在一起是很有利的。通常来说业务逻辑需要访问不止一个的缓存键,通过将他们并置在一起可以确保具有同一个affinityKey
的所有键都会缓存在同一个处理节点上,从而避免从远程节点获取数据的昂贵网络开销。
例如,有一个Person
和Company
对象,然后希望将Person
对象和其工作的Company
对象并置在一起。要做到这一点,用于缓存Person
对象的缓存键应该有一个属性或者方法加注了@AffinityKeyMapped
注解,他会提供用于并置的Company
键的值,方便起见,也可以可选地使用AffinityKey
类。
Scala中的注解
注意,如果Scala的case class用于键类并且它的构造函数参数之一加注了@AffinityKeyMapped
注解,默认这个注解并不会正确地用于生成的字段,因此也就不会被Ignite识别。要覆盖这个行为,可以使用@field
元注解而不是@AffinityKeyMapped
(看下面的示例)。
使用PersonKey:
public class PersonKey {
// Person ID used to identify a person.
private String personId;
// Company ID which will be used for affinity.
@AffinityKeyMapped
private String companyId;
...
}
// Instantiate person keys with the same company ID which is used as affinity key.
Object personKey1 = new PersonKey("myPersonId1", "myCompanyId");
Object personKey2 = new PersonKey("myPersonId2", "myCompanyId");
Person p1 = new Person(personKey1, ...);
Person p2 = new Person(personKey2, ...);
// Both, the company and the person objects will be cached on the same node.
cache.put("myCompanyId", new Company(...));
cache.put(personKey1, p1);
cache.put(personKey2, p2);
使用PersonKey(Scala)
case class PersonKey (
// Person ID used to identify a person.
personId: String,
// Company ID which will be used for affinity.
@(AffinityKeyMapped @field)
companyId: String
)
// Instantiate person keys with the same company ID which is used as affinity key.
val personKey1 = PersonKey("myPersonId1", "myCompanyId");
val personKey2 = PersonKey("myPersonId2", "myCompanyId");
val p1 = new Person(personKey1, ...);
val p2 = new Person(personKey2, ...);
// Both, the company and the person objects will be cached on the same node.
compCache.put("myCompanyId", Company(...));
perCache.put(personKey1, p1);
perCache.put(personKey2, p2);
使用AffinityKey:
Object personKey1 = new AffinityKey("myPersonId1", "myCompanyId");
Object personKey2 = new AffinityKey("myPersonId2", "myCompanyId");
Person p1 = new Person(personKey1, ...);
Person p2 = new Person(personKey2, ...);
// Both, the company and the person objects will be cached on the same node.
cache.put("myCompanyId", new Company(..));
cache.put(personKey1, p1);
cache.put(personKey2, p2);
SQL关联
当在分区缓存上的数据执行SQL分布式关联时,一定要确保关联的键是并置的。
也有可能向缓存数据的节点发送计算,这是一个被称为数据和计算的并置的概念,他可以向特定的节点发送整个的工作单元。
要将数据和计算并置在一起,需要使用IgniteCompute.affinityRun(...)
和IgniteCompute.affinityCall(...)
方法。
下面的例子显示了如何和上面提到的缓存Person
和Company
对象的同一个集群节点进行并置计算:
Java8:
String companyId = "myCompanyId";
// Execute Runnable on the node where the key is cached.
ignite.compute().affinityRun("myCache", companyId, () -> {
Company company = cache.get(companyId);
// Since we collocated persons with the company in the above example,
// access to the persons objects is local.
Person person1 = cache.get(personKey1);
Person person2 = cache.get(personKey2);
...
});
Java7:
final String companyId = "myCompanyId";
// Execute Runnable on the node where the key is cached.
ignite.compute().affinityRun("myCache", companyId, new IgniteRunnable() {
@Override public void run() {
Company company = cache.get(companyId);
Person person1 = cache.get(personKey1);
Person person2 = cache.get(personKey2);
...
}
};
IgniteCompute.affinityRun(...)
和IgniteCache.invoke(...)
方法都提供了数据和计算的并置。主要的不同在于invoke(...)
方法是原子的并且执行时在键上加了锁,无法从EntryProcessor
逻辑内部访问其他的键,因为它会触发一个死锁。
另一方面,affinityRun(...)
和affinityCall(...)
不持有任何锁。比如,在这些方法内开启多个事务或者执行缓存查询是绝对合法的,不用担心死锁。这时Ignite会自动检测处理是并置的然后对事务采用优化过的一阶段提交而不是二阶段提交。
关于
IgniteCache.invoke(...)
方法的更多信息,请参照3.2.超越JCache
文档。
分区的关系控制一个分区缓存在哪个网格节点或者哪些节点上。AffinityFunction
是一个可插拔的API用于确定网格中分区到节点的一个理想映射。当集群网络发生变化时,分区到节点的映射可能不同于关系函数提供的理想分布,直到再平衡结束。
Ignite附带了两个预定义的关系函数实现:
RendezvousAffinityFunction
:这个函数允许分区到节点的映射有点区别(即一些节点可能比其他节点负责稍微多一点的分区数量)。然而,它保证当网络发生变化时,分区只会迁移到一个新加入的节点或者只来自一个离开的节点,集群内已有的节点间不会发生数据的交换。FairAffinityFunction
:这个函数试图确保分区在集群节点间均衡分布,这可能导致集群内的已有节点间产生分区迁移的开销。注意,缓存关系函数不会直接映射键和节点,它映射的是键和分区。分区只是来自一个有限集合的简单的数字(默认0-1024)。在键映射到他们的分区之后(即获得了他们的分区号),已有的分区到节点的映射会用于当前的网络版本,键到分区的映射在时间上并不会改变。
关系的故障安全
主备副本不位于同一台物理机上,以这样的方式调整集群内的分区是很有用的,要确保这个属性,可以在RendezvousAffinityFunction
和FairAffinityFunction
上设置excludeNeighbors
标志。
有时将一个分区的主备副本放在不同的机架上也是很有用的。这时,可以为每个节点赋予一个特别的属性然后在RendezvousAffinityFunction
和FairAffinityFunction
上使用backupFilter
属性来排除同一个机架中分配用于备份副本的若干节点。
AffinityFunction
是一个可插拔的API,也可以提供这个函数的自定义实现,AffinityFunction
API的三个主要方法是:
partitions()
:获取一个缓存的分区总数量,集群启动之后无法改变。partition(...)
:给定一个键,这个方法确定一个键属于哪个分区,这个映射在时间上不会改变。assignPartitions(...)
:这个方法在集群网络发生变化时每次都会被调用,这个方法对于给定的集群网络返回一个分区到节点的映射。CacheAffinityKeyMapper
是一个可插拔的API,负责为一个缓存键获取关系键。通常缓存键本身就用于关系键,然而为了与其他的缓存键并置,有时改变一个缓存键的关系是很重要的。
CacheAffinityKeyMapper
的主要方法是affinityKey(key)
,他会为一个缓存键返回一个affinityKey
。Ignite默认会查找加注@CacheAffinityKeyMapped
注解的所有属性和方法。如果没有找到这样的属性或者方法,那么缓存键本身就会用做关系键。如果找到了这样的属性或者方法,那么这个属性或者方法的值就会从CacheAffinityKeyMapper.affinityKey(key)
方法返回,这样只要需要,就可以指定一个替代的关系键,而不是缓存键本身。
JCache提供了javax.cache.integration.CacheLoader
和javax.cache.integration.CacheWriter
API,他们分别用于底层持久化存储的通读
和通写
(比如RDBMS中的Oracle或者MySQL,以及NoSQL数据库中的MongoDB或者CouchDB)。
虽然Ignite可以单独地配置CacheRLoader
和CacheWriter
,但是在两个单独的类中实现事务化存储是非常尴尬的,因为多个load
和put
操作需要在同一个事务中的同一个连接中共享状态。为了缓解这个问题,Ignite提供了·org.apacche.ignite.cache.store.CacheStore·接口,他同时扩展了CacheLoader
和CacheWriter
。
事务
CacheStore
是完整事务性的,他会自动地融入当前的缓存事务。
CacheJdbcPojoStore
Ignite附带了他自己的CacheJdbcPojoStore
,他会自动地建立Java POJO和数据库模式之间的映射,可以参照3.13.自动持久化章节
。
当希望通读和通写时,提供一个正常的缓存存储的实现是很重要的。通读意味着当缓存无效时会从持久化存储中读取,通写意味着当缓存更新时会自动地进行持久化。所有的通读和通写都会参与整体的缓存事务以及作为一个整体提交或者回滚。
要配置通读和通写,需要实现CacheStore
接口以及设置CacheConfiguration
中cacheStoreFactory
的readThrough
和writeThrough
属性,下面的例子会有说明。
在一个简单的通写模式中每个缓存的put和remove操作都会涉及一个持久化存储的请求,因此整个缓存更新的持续时间可能是相对比较长的。另外,密集的缓存更新频率也会导致非常高的存储负载。
对于这种情况,Ignite提供了一个选项来执行异步化的持久化存储更新,也叫做后写,这个方法的主要概念是累加更新操作然后作为一个批量操作异步化地刷新入持久化存储中。真实的数据持久化可以被基于时间的事件触发(数据输入的最大时间受到队列的限制),也可以被队列的大小触发(当队列大小达到一个限值后队列就被刷新了),或者通过两者的组合触发,这时任何事件都会触发刷新。
更新顺序
对于后写的方式只有数据的最后一次更新会被写入底层存储。如果键为key1的缓存数据分别依次地更新为值value1、value2和value3,那么只有(key1,value3)对这一个存储请求会被传播到持久化存储。
更新性能
批量的存储操作通常比按顺序的单一存储操作更有效率,因此可以通过开启后写模式的批量操作来利用这个特性。简单类型(put和remove)的简单顺序更新操作可以被组合成一个批量操作。比如,连续地往缓存中加入(key1,value1),(key2,value2),(key3,value3)可以通过一个单一的CacheStore.putAll(...)
操作批量处理。
后写缓存可以通过CacheConfiguration.setWriteBehindEnabled(boolean)
配置项来开启,下面的3.12.6.配置
章节显示了一个完整的配置属性列表来进行后写缓存行为的定制。
Ignite中的CacheStore
接口用于向底层的数据存储写入或者加载数据。除了标准的JCache加载和存储方法,他还引入了最终事务划界以及从底层数据存储批量载入数据的能力。
loadCache()
CacheStore.loadCache()
方法可以加载缓存,即使没有传入要加载的所有键,它通常用于启动时缓存的热加载,但是也可以在缓存加载完之后的任何时间点调用。
在每一个相关的集群节点,IgniteCache.loadCache()
方法会分配给CacheStore.loadCache()
方法,如果只想在本地节点上进行加载,可以用IgniteCache.localLoadCache()
方法。
对于分区缓存,不管是主节点还是备份节点,如果键没有被映射到该节点,会被缓存自动丢弃。
load(), write(), delete()
当IgniteCache
接口的get
,put
,remove
方法被调用时,相对应的CacheStore
的load()
,write()
和delete()
方法会被调用,当与单个缓存数据工作时,这些方法会用于启用通读和通写行为。
loadAll(), writeAll(), deleteAll()
当IgniteCache
接口的getAll
,putAll
,removeAll
方法被调用时,相对应的CacheStore
的loadAll()
,writeAll()
和deleteAll()
方法会被调用,当与多个缓存数据工作时,这些方法会用于启用通读和通写行为,他们通常用批量操作的方式实现以提供更好的性能。
CacheStoreAdapter
提供了loadAll()
,writeAll()
和deleteAll()
方法的默认实现,他只是简单地对键进行一个一个地迭代。
sessionEnd()
Ignite有一个存储会话的概念,他可以跨越不止一个的缓存存储操作,会话对于事务非常有用。
对于原子化
的缓存,sessionEnd()
方法会在每个CacheStore
方法完成之后被调用,对于事务化
的缓存,不管是在底层持久化存储进行提交或者回滚多个操作,sessionEnd()
方法都会在每个事务结束后被调用。
CacheStoreAdapater
提供了sessionEnd()
方法的默认的空实现。
缓存存储会话的主要目的是当CacheStore
用于事务中时在多个存储操作中持有一个上下文。比如,如果使用JDBC,可以通过CacheStoreSession.attach()
方法保存数据库的连接,然后可以在CacheStore.sessionEnd(boolean)
方法中提交这个连接。
CacheStoreSession
可以通过@GridCacheStoreSessionResource
注解注入自定义的缓存存储实现中。
下面是几个不同场景的缓存存储的实现,注意事务化的实现用还是没用事务。
JDBC非事务:
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
// This mehtod is called whenever "get(...)" methods are called on IgniteCache.
@Override public Person load(Long key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
st.setLong(1, key);
ResultSet rs = st.executeQuery();
return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load: " + key, e);
}
}
// This mehtod is called whenever "put(...)" methods are called on IgniteCache.
@Override public void write(Cache.Entry<Long, Person> entry) {
try (Connection conn = connection()) {
// Syntax of MERGE statement is database specific and should be adopted for your database.
// If your database does not support MERGE statement then use sequentially update, insert statements.
try (PreparedStatement st = conn.prepareStatement(
"merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
for (Cache.Entry<Long, Person> entry : entries) {
Person val = entry.getValue();
st.setLong(1, entry.getKey());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
st.executeUpdate();
}
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
}
}
// This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
@Override public void delete(Object key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
st.setLong(1, (Long)key);
st.executeUpdate();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to delete: " + key, e);
}
}
// This mehtod is called whenever "loadCache()" and "localLoadCache()"
// methods are called on IgniteCache. It is used for bulk-loading the cache.
// If you don't need to bulk-load the cache, skip this method.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;
while (cnt < entryCnt && rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
cnt++;
}
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
// Open JDBC connection.
private Connection connection() throws SQLException {
// Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
// In this example we use H2 Database for simplification.
Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
conn.setAutoCommit(true);
return conn;
}
}
JDBC事务:
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
// Complete transaction or simply close connection if there is no transaction.
@Override public void sessionEnd(boolean commit) {
try (Connection conn = ses.getAttached()) {
if (conn != null && ses.isWithinTransaction()) {
if (commit)
conn.commit();
else
conn.rollback();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to end store session.", e);
}
}
// This mehtod is called whenever "get(...)" methods are called on IgniteCache.
@Override public Person load(Long key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
st.setLong(1, key);
ResultSet rs = st.executeQuery();
return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load: " + key, e);
}
}
// This mehtod is called whenever "put(...)" methods are called on IgniteCache.
@Override public void write(Cache.Entry<Long, Person> entry) {
try (Connection conn = connection()) {
// Syntax of MERGE statement is database specific and should be adopted for your database.
// If your database does not support MERGE statement then use sequentially update, insert statements.
try (PreparedStatement st = conn.prepareStatement(
"merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
for (Cache.Entry<Long, Person> entry : entries) {
Person val = entry.getValue();
st.setLong(1, entry.getKey());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
st.executeUpdate();
}
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
}
}
// This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
@Override public void delete(Object key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
st.setLong(1, (Long)key);
st.executeUpdate();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to delete: " + key, e);
}
}
// This mehtod is called whenever "loadCache()" and "localLoadCache()"
// methods are called on IgniteCache. It is used for bulk-loading the cache.
// If you don't need to bulk-load the cache, skip this method.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;
while (cnt < entryCnt && rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
cnt++;
}
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
// Opens JDBC connection and attaches it to the ongoing
// session if within a transaction.
private Connection connection() throws SQLException {
if (ses.isWithinTransaction()) {
Connection conn = ses.getAttached();
if (conn == null) {
conn = openConnection(false);
// Store connection in the session, so it can be accessed
// for other operations within the same transaction.
ses.attach(conn);
}
return conn;
}
// Transaction can be null in case of simple load or put operation.
else
return openConnection(true);
}
// Opens JDBC connection.
private Connection openConnection(boolean autocommit) throws SQLException {
// Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
// In this example we use H2 Database for simplification.
Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
conn.setAutoCommit(autocommit);
return conn;
}
}
JDBC批量操作
public class CacheJdbcPersonStore extends CacheStore<Long, Person> {
// Skip single operations and open connection methods.
// You can copy them from jdbc non-transactional or jdbc transactional examples.
...
// This mehtod is called whenever "getAll(...)" methods are called on IgniteCache.
@Override public Map<K, V> loadAll(Iterable<Long> keys) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement(
"select firstName, lastName from PERSONS where id=?")) {
Map<K, V> loaded = new HashMap<>();
for (Long key : keys) {
st.setLong(1, key);
try(ResultSet rs = st.executeQuery()) {
if (rs.next())
loaded.put(key, new Person(key, rs.getString(1), rs.getString(2));
}
}
return loaded;
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to loadAll: " + keys, e);
}
}
// This mehtod is called whenever "putAll(...)" methods are called on IgniteCache.
@Override public void writeAll(Collection<Cache.Entry<Long, Person>> entries) {
try (Connection conn = connection()) {
// Syntax of MERGE statement is database specific and should be adopted for your database.
// If your database does not support MERGE statement then use sequentially update, insert statements.
try (PreparedStatement st = conn.prepareStatement(
"merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
for (Cache.Entry<Long, Person> entry : entries) {
Person val = entry.getValue();
st.setLong(1, entry.getKey());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
st.addBatch();
}
st.executeBatch();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to writeAll: " + entries, e);
}
}
// This mehtod is called whenever "removeAll(...)" methods are called on IgniteCache.
@Override public void deleteAll(Collection<Long> keys) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
for (Long key : keys) {
st.setLong(1, key);
st.addBatch();
}
st.executeBatch();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to deleteAll: " + keys, e);
}
}
}
下面的配置参数可以通过CacheConfiguration
用于启用以及配置通写缓存:
setter方法 | 描述 | 默认值 |
---|---|---|
setWriteBehindEnabled(boolean) | 设置后写是否启用的标志 | false |
setWriteBehindFlushSize(int) | 后写缓存的最大值,如果超过了这个限值,所有的缓存数据都会被刷入缓存存储然后写缓存被清空。如果值为0,刷新操作将会依据刷新频率间隔,注意不能将写缓存大小和刷新频率都设置为0 | 10240 |
setWriteBehindFlushFrequency(long) | 后写缓存的刷新频率,单位为毫秒,该值定义了从对缓存对象进行插入/删除和当相应的操作被施加到缓存存储的时刻之间的最大时间间隔。如果值为0,刷新会依据写缓存大小,注意不能将写缓存大小和刷新频率都设置为0 | 5000 |
setWriteBehindFlushThreadCount(int) | 执行缓存刷新的线程数 | 1 |
setWriteBehindBatchSize(int) | 后写缓存存储操作的操作数最大值 | 512 |
CacheStore
接口可以在IgniteConfiguration
上通过一个工厂进行设置,就和CacheLoader
和CacheWriter
同样的方式。
对于分布式缓存配置的
factory
应该是序列化的。
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="foo.bar.MyPersonStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</list>
</property>
...
</bean>
Java:
IgniteConfiguration cfg = new IgniteConfiguration();
CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();
cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyPersonStore.class));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
CacheJdbcBlobStore
实现基于JDBC,这个实现将对象以BLOB
的格式存储在底层数据库中。存储会在数据库中创建名为ENTRIES
的表来存储数据,表具有key和val两个字段。
如果提供了定制的DDL和DML语句,表和字段的名字要和所有的语句一致以及参数的顺序也要保留。
使用CacheJdbcBlobStoreFactory
工厂来向CacheConfiguration
传入CacheJdbcBlobStore
:
Spring:
<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<property name="cacheStoreFactory">
<bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
<property name="user" value = "user" />
<property name="dataSourceBean" value = "simpleDataSource" />
</bean>
</property>
</bean>
</list>
</property>
...
</bean>
CacheJdbcPojoStore
实现基于JDBC和基于反射的POJO,这个实现将对象用基于反射的Java Bean映射描述的形式存储在底层数据库中。
使用CacheJdbcPojoStoreFactory
工厂来向CacheConfiguration
传入CacheJdbcPojoStore
:
Spring:
<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<property name="cacheStoreFactory">
<bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
<property name="dataSourceBean" value = "simpleDataSource" />
</bean>
</property>
</bean>
</list>
</property>
</bean>
CacheHibernateBlobStore
实现基于Hibernate,这个实现将对象以BLOB
的格式存储在底层数据库中。
使用CacheHibernateBlobStoreFactory
工厂来向CacheConfiguration
传入CacheHibernateBlobStore
:
Spring:
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<bean class="org.apache.ignite.cache.store.hibernate.CacheHibernateBlobStoreFactory">
<property name="hibernateProperties">
<props>
<prop key="connection.url">jdbc:h2:mem:</prop>
<prop key="hbm2ddl.auto">update</prop>
<prop key="show_sql">true</prop>
</props>
</property>
</bean>
</list>
</property>
...
</bean>
Ignite附带了自己的数据库模式映射向导,他提供了与持久化存储集成的自动支持。这个工具自动连接底层数据库然后生成所有必要的XML映射文件还有Java领域模型对象POJO。
Ignite还附带了org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore
,这是即开即用的gniteCacheStore
接口的JDBC实现,然后自动地处理通读和通写逻辑。
要启动生成数据库模式映射向导,可以执行bin/ignite-schema-import.sh
脚本:
$ bin/ignite-schema-import.sh
这个命令会弹出一个界面向导,他有两个界面。
接入数据库
第一个界面需要填入数据连接配置信息,注意工具没有提供JDBC驱动需要单独提供。
生成XML配置和POJO
第二个界面可以将数据库表映射为领域模型类,然后自动生成XML配置和POJO。
生成组件
工具会生成如下的组件:
CacheTypeMetadata
的XML配置;ConfigurationSnippet.java
(与XML二选一)从向导退出后,需要做:
CacheJdbcPojoStoreFactory
的XML声明拷贝进Ignite XML配置文件中对应的CacheConfiguration
下面;ConfigurationSnippet.java
。POJO Key类
/**
* PersonKey definition.
*
* Code generated by Apache Ignite Schema Import utility: 03/03/2015.
*/
public class PersonKey implements Serializable {
/** */
private static final long serialVersionUID = 0L;
/** Value for id. */
private int id;
/**
* Gets id.
*
* @return Value for id.
*/
public int getId() {
return id;
}
/**
* Sets id.
*
* @param id New value for id.
*/
public void setId(int id) {
this.id = id;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof PersonKey))
return false;
PersonKey that = (PersonKey)o;
if (id != that.id)
return false;
return true;
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = id;
return res;
}
/** {@inheritDoc} */
@Override public String toString() {
return "PersonKey [id=" + id +
"]";
}
}
POJO类:
/**
* Person definition.
*
* Code generated by Apache Ignite Schema Import utility: 03/03/2015.
*/
public class Person implements Serializable {
/** */
private static final long serialVersionUID = 0L;
/** Value for id. */
private int id;
/** Value for orgId. */
private Integer orgId;
/** Value for name. */
private String name;
/**
* Gets id.
*
* @return Value for id.
*/
public int getId() {
return id;
}
/**
* Sets id.
*
* @param id New value for id.
*/
public void setId(int id) {
this.id = id;
}
/**
* Gets orgId.
*
* @return Value for orgId.
*/
public Integer getOrgId() {
return orgId;
}
/**
* Sets orgId.
*
* @param orgId New value for orgId.
*/
public void setOrgId(Integer orgId) {
this.orgId = orgId;
}
/**
* Gets name.
*
* @return Value for name.
*/
public String getName() {
return name;
}
/**
* Sets name.
*
* @param name New value for name.
*/
public void setName(String name) {
this.name = name;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof Person))
return false;
Person that = (Person)o;
if (id != that.id)
return false;
if (orgId != null ? !orgId.equals(that.orgId) : that.orgId != null)
return false;
if (name != null ? !name.equals(that.name) : that.name != null)
return false;
return true;
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = id;
res = 31 * res + (orgId != null ? orgId.hashCode() : 0);
res = 31 * res + (name != null ? name.hashCode() : 0);
return res;
}
/** {@inheritDoc} */
@Override public String toString() {
return "Person [id=" + id +
", orgId=" + orgId +
", name=" + name +
"]";
}
}
XML配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean class="org.apache.ignite.cache.CacheTypeMetadata">
<property name="databaseSchema" value="PUBLIC"/>
<property name="databaseTable" value="PERSON"/>
<property name="keyType" value="org.apache.ignite.examples.datagrid.store.model.PersonKey"/>
<property name="valueType" value="org.apache.ignite.examples.datagrid.store.model.Person"/>
<property name="keyFields">
<list>
<bean class="org.apache.ignite.cache.CacheTypeFieldMetadata">
<property name="databaseName" value="ID"/>
<property name="databaseType">
<util:constant static-field="java.sql.Types.INTEGER"/>
</property>
<property name="javaName" value="id"/>
<property name="javaType" value="int"/>
</bean>
</list>
</property>
<property name="valueFields">
<list>
<bean class="org.apache.ignite.cache.CacheTypeFieldMetadata">
<property name="databaseName" value="ID"/>
<property name="databaseType">
<util:constant static-field="java.sql.Types.INTEGER"/>
</property>
<property name="javaName" value="id"/>
<property name="javaType" value="int"/>
</bean>
<bean class="org.apache.ignite.cache.CacheTypeFieldMetadata">
<property name="databaseName" value="ORG_ID"/>
<property name="databaseType">
<util:constant static-field="java.sql.Types.INTEGER"/>
</property>
<property name="javaName" value="orgId"/>
<property name="javaType" value="java.lang.Integer"/>
</bean>
<bean class="org.apache.ignite.cache.CacheTypeFieldMetadata">
<property name="databaseName" value="NAME"/>
<property name="databaseType">
<util:constant static-field="java.sql.Types.VARCHAR"/>
</property>
<property name="javaName" value="name"/>
<property name="javaType" value="java.lang.String"/>
</bean>
</list>
</property>
</bean>
</beans>
Java配置:
/**
* ConfigurationSnippet definition.
*
* Code generated by Apache Ignite Schema Import utility: 03/22/2015.
*/
public class ConfigurationSnippet {
/** Configure cache store. */
public static CacheStore store() {
DataSource dataSource = null; // TODO create data source.
CacheJdbcPojoStore store = new CacheJdbcPojoStore();
store.setDataSource(dataSource);
return store;
}
/** Configure cache types metadata. */
public static Collection<CacheTypeMetadata> typeMetadata() {
// Configure cache types.
Collection<CacheTypeMetadata> meta = new ArrayList<>();
// PERSON.
CacheTypeMetadata type = new CacheTypeMetadata();
type.setDatabaseSchema("PUBLIC");
type.setDatabaseTable("PERSON");
type.setKeyType("org.apache.ignite.PersonKey");
type.setValueType("org.apache.ignite.Person");
meta.add(type);
// Key fields for PERSON.
Collection<CacheTypeFieldMetadata> keys = new ArrayList<>();
keys.add(new CacheTypeFieldMetadata("ID", Types.INTEGER, "id", int.class));
type.setKeyFields(keys);
// Value fields for PERSON.
Collection<CacheTypeFieldMetadata> vals = new ArrayList<>();
vals.add(new CacheTypeFieldMetadata("ID", Types.INTEGER, "id", int.class));
vals.add(new CacheTypeFieldMetadata("FIRST_NAME", Types.VARCHAR, "firstName", String.class));
vals.add(new CacheTypeFieldMetadata("LAST_NAME", Types.VARCHAR, "lastName", String.class));
type.setValueFields(vals);
return meta;
}
}
在通过向导生成XML和POJO代码以及将所有必要的代码片段拷贝进项目之后,就可以用CacheJdbcPojoStore
来从数据库往缓存内加载数据以及从缓存将数据写入数据库。
首先需要在配置中正确地声明Store:
XML:
<!-- Sample data source -->
<bean id= "sampleDataSource" class="org.h2.jdbcx.JdbcDataSource">
<property name="url" value="jdbc:h2:tcp://localhost/mem:ExampleDb"/>
<property name="user" value="sa"/>
</bean>
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<!-- Cache configuration. -->
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<!-- Cache store. -->
<property name="cacheStoreFactory">
<bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
<property name="dataSourceBean" value = "sampleDataSource"/>
<property name="types">
<list>
... Copy here types generated by wizard...
</list>
</property>
</bean>
</property>
<!-- Enable store usage. -->
<!-- Sets flag indicating whether read from database is enabled. -->
<property name="readThrough" value="true"/>
<!-- Sets flag indicating whether write to database is enabled. -->
<property name="writeThrough" value="true"/>
<!-- Enable database batching. -->
<!-- Sets flag indicating whether write-behind is enabled. -->
<property name="writeBehindEnabled" value="true"/>
</list>
</property>
...
</bean>
Java:
IgniteConfiguration cfg = new IgniteConfiguration();
...
CacheConfiguration ccfg = new CacheConfiguration<>();
// Create data source for your database.
// For example: in-memory H2 database.
DataSource dataSource = org.h2.jdbcx.JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:ExampleDb", "sa", "");
// Create store.
CacheJdbcPojoStore store = new CacheJdbcPojoStore();
store.setDataSource(dataSource);
// Create store factory.
ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<>(store));
// Configure cache to use store.
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
// Enable database batching.
ccfg.setWriteBehindEnabled(true);
cfg.setCacheConfiguration(ccfg);
// Configure cache types.
Collection<CacheTypeMetadata> meta = new ArrayList<>();
... Paste here code generated by wizard ...
...
// Start Ignite node.
Ignition.start(cfg);
3.12.持久化存储
中定义的所有操作在CacheJdbcPojoStore
中都有效。
CacheJdbcPojoStore
可以有效地加载数据,要从数据库中加载缓存配置中注册的所有类型的所有数据,只需要调用IgniteCache.loadCache(null)
即可。要加载特定条件的数据,需要向IgniteCache.loadCache()
中传入键类型以及SQL查询,比如:
Ignite ignite = Ignition.ignite();
IgniteCache<Long, Person> c = ignite.getOrCreateCache("myCache");
c.loadCache(null, "java.lang.Integer", "select * from Person where id > 100");
示例org.apache.ignite.examples.datagrid.store.auto.CacheAutoStoreExample
演示了缓存存储的使用。
为了运行自动持久化的示例,需要设置CacheNodeWithStoreStartup.STORE = AUTO
然后运行CacheAutoStoreExample
。
下面是一步一步地演示,可以从examples/schema-import
文件夹中找到演示的源代码。
演示要使用H2数据库。
$ examples/schema-import/bin/h2-server.sh
H2的控制台会在默认浏览器中打开。
examples/schema-import/bin/db-init.sql
的内容粘贴进H2控制台然后执行;
bin/ignite-schema-import.sh examples/schema-import/bin/schema-import.properties
POJO文件以及java片段会在examples/schema-import/src/main/java/org/apache/ignite/schema
文件夹中生成。
Demo.java
。数据加载通常用于启动时初始化缓存数据,用标准的缓存put(...)
和putAll(...)
操作加载大量的数据通常是比较低效的。Ignite提供了IgniteDataStreamer
API和CacheStore
API,他们有助于以一个更高效的方式将大量数据注入Ignite缓存。
数据流处理器是通过IgniteDataStreamer
API定义的,他可以将大量的连续数据注入Ignite缓存。数据流处理器以可扩展和容错的方式在数据被发送到集群节点之前通过把定量数据放在一起以获得高性能。
数据流处理器可以用于任何时候将大量数据载入缓存,包括启动时的预加载。
想了解更多信息请参照4.2.数据流处理器
。
将大量数据载入缓存的另一个方法是通过CacheStore.loadCache()
方法,他可以在不传入要加载的所有键的情况下进行缓存的数据加载。
在所有保存该缓存的每一个集群节点上IgniteCache.loadCache()
方法会委托给CacheStore.loadCache()
方法,如果只想在本地节点上加载,可以用IgniteCache.localLoadCache()
方法。
对于分区缓存,如果键没有映射到某个节点,不管是主节点还是备份节点,都会被自动忽略。
下面是一个CacheStore.loadCache()
实现的例子,对于CacheStore
的完整例子,可以参照3.12.持久化存储
章节。
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
...
// This method is called whenever "IgniteCache.loadCache()" or
// "IgniteCache.localLoadCache()" methods are called.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
Connection conn = null;
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;
while (cnt < entryCnt && rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
cnt++;
}
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
...
}
分区感知的数据加载
在上面描述的场景中同样的查询会在所有节点上执行,每个节点会迭代所有的结果集,忽略掉不属于该节点的所有键,效率不是很高。
如果数据库中的每条记录都保存分区ID的话这个情况会有所改善。可以通过org.apache.ignite.cache.affinity.Affinity
接口来获得要存储在缓存中的任何键的分区ID。
下面的代码片段可以获得每个要存储在缓存中的Person
对象的分区ID。
IgniteCache cache = ignite.cache(cacheName);
Affinity aff = ignite.affinity(cacheName);
for (int personId = 0; personId < PERSONS_CNT; personId++) {
// Get partition ID for the key under which person is stored in cache.
int partId = aff.partition(personId);
Person person = new Person(personId);
person.setPartitionId(partId);
// Fill other fields.
cache.put(personId, person);
}
当Person对象知道自己的分区ID,每个节点就可以只查询属于自己所属分区的数据。要做到这一点,可以将一个Ignite实例注入到自己的CacheStore,然后用它来确定本地节点所属的分区。
下面的代码片段演示了用Affinity
来只加载本地分区的数据,注意例子代码是单线程的,然而它可以通过分区ID高效地并行化。
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
// Will be automatically injected.
@IgniteInstanceResource
private Ignite ignite;
...
// This mehtod is called whenever "IgniteCache.loadCache()" or
// "IgniteCache.localLoadCache()" methods are called.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
Affinity aff = ignite.affinity(cacheName);
ClusterNode locNode = ignite.cluster().localNode();
try (Connection conn = connection()) {
for (int part : aff.primaryPartitions(locNode))
loadPartition(conn, part, clo);
for (int part : aff.backupPartitions(locNode))
loadPartition(conn, part, clo);
}
}
private void loadPartition(Connection conn, int part, IgniteBiInClosure<Long, Person> clo) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where partId=?")) {
st.setInt(1, part);
try (ResultSet rs = st.executeQuery()) {
while (rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
...
}
注意键和分区的映射依赖于affinity函数中配置的分区数量(参照org.apache.ignite.cache.affinity.AffinityFunctio)。如果affinity函数配置改变,数据库中存储的分区ID必须响应地更新。
退出策略控制着堆内内存中缓存可以存储的元素的最大值,如果可能的话当达到堆内缓存的最大值时,数据会被退出到堆外空间。
部分退出策略支持批量退出以及受到内存大小限制而退出。如果批量退出启用,那么当缓存大小加上batchSize
数量大于最大缓存大小时退出就开始了,这时batchSize
数量的元素会被退出。如果因为内存大小限制而退出启用,那么当以字节数计算的缓存数据大小超过内存大小最大值时退出就会发生。
批量退出只有当最大内存限制没有赋值时才会被支持。
Ignite中退出策略是可插拔的,通过EvictionPolicy
接口来控制。一个退出策略的实现会在每个缓存数据发生变化时收到通知,他定义了从缓存中选择要退出数据的算法。
如果数据集可以装入内存,那么退出策略不会带来任何好处,因此可以关闭,这也是默认值。
LRU退出策略基于最近最少使用算法,他会确保最近最少使用的数据(即最久没有被访问的数据)会被首先退出。
支持批量退出以及受到内存大小限制的退出。
LRU退出策略适用于缓存的大多数使用场景。
这个策略通过LruEvictionPolicy
实现,通过CacheConfiguration
进行配置。
XML:
<bean class="org.apache.ignite.cache.CacheConfiguration">
<property name="name" value="myCache"/>
...
<property name="evictionPolicy">
<!-- LRU eviction policy. -->
<bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
<!-- Set the maximum cache size to 1 million (default is 100,000). -->
<property name="maxSize" value="1000000"/>
</bean>
</property>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new LruEvictionPolicy(1000000));
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
FIFO退出策略基于先进先出算法,他确保缓存中保存时间最久的数据会被首先退出,他与ruEvictionPolicy
不同,因为他忽略了数据的访问顺序。
支持批量退出以及受到内存大小限制的退出。
这个策略通过FifoEvictionPolicy
实现,通过CacheConfiguration
进行配置。
XML:
<bean class="org.apache.ignite.cache.CacheConfiguration">
<property name="name" value="myCache"/>
...
<property name="evictionPolicy">
<!-- FIFO eviction policy. -->
<bean class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
<!-- Set the maximum cache size to 1 million (default is 100,000). -->
<property name="maxSize" value="1000000"/>
</bean>
</property>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new FifoEvictionPolicy(1000000));
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
有序退出策略和FIFO退出策略很像,不同点在于通过默认或者用户定义的比较器定义了数据的顺序,然后确保最小的数据(即排序数值最小的数据)会被退出。
默认的比较器用缓存条目的键作为比较器,他要求键必须实现Comparable
接口。也可以提供自定义的比较器实现,可以通过键,值或者两者都用来进行条目的比较。
支持批量退出以及受到内存大小限制的退出。
这个策略通过SortedEvictionPolicy
实现,通过CacheConfiguration
进行配置。
XML:
<bean class="org.apache.ignite.cache.CacheConfiguration">
<property name="name" value="myCache"/>
...
<property name="evictionPolicy">
<!-- Sorted eviction policy. -->
<bean class="org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicy">
<!-- Set the maximum cache size to 1 million (default is 100,000) and use default comparator. -->
<property name="maxSize" value="1000000"/>
</bean>
</property>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new SortedEvictionPolicy(1000000));
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
随机退出策略会随机地选择条目退出,这个退出策略主要用于调试或者基准测试的目的。
这个策略通过RandomEvictionPolicy
实现,通过CacheConfiguration
进行配置。
XML:
<bean class="org.apache.ignite.cache.CacheConfiguration">
<property name="name" value="myCache"/>
...
<property name="evictionPolicy">
<!-- Random eviction policy. -->
<bean class="org.apache.ignite.cache.eviction.random.RandomEvictionPolicy"> <!-- Set the maximum cache size to 1 million (default is 100,000). -->
<property name="maxSize" value="1000000"/>
</bean>
</property>
...
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new RandomEvictionPolicy(1000000));
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
过期策略指定了在缓存条目过期之前必须经过的时间量,时间可以从创建,最后访问或者修改时间开始计算。
过期策略可以通过任何预定义的ExpiryPolicy
实现进行设置。
类名 | 创建时间 | 最后访问时间 | 最后更新时间 |
---|---|---|---|
CreatedExpiryPolicy |
可用 | ||
AccessedExpiryPolicy |
可用 | 可用 | |
ModifiedExpiryPolicy |
可用 | 可用 | |
TouchedExpiryPolicy |
可用 | 可用 | 可用 |
EternalExpiryPolicy |
也可以自定义ExpiryPolicy
实现。
过期策略可以在CacheConfiguration
中进行设置,这个策略可以用于缓存内的所有条目。
cfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ZERO));
也可以在对缓存进行单独操作时对过期策略进行设置或者修改。
IgniteCache<Object, Object> cache = cache.withExpiryPolicy(
new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 5)));
此策略将用于在返回的缓存实例上调用的每个操作。
当一个新节点加入集群时,已有节点会放弃一部分缓存条目的所有权转交给新的节点,以使整个网格在任何时候都保持键的均等平衡。
如果新的节点成为一些分区的主节点或者备份节点,他会从该分区之前的主节点获取数据,或者从该分区的备份节点之一获取数据。一旦分区全部载入新的节点,旧节点就会被标记为过时然后该节点在所有当前的事务完成之后最终会被退出。因此,在一些很短的时间段,在网络发生变化之后,有一种情况是在缓存中对于一个键备份的数量可能比事先配置的多。然而,一旦再平衡完成,额外的备份会被删除。
下面的再平衡模式是在CacheRebalanceMode
枚举中定义的:
缓存再平衡模式 | 描述 |
---|---|
SYNC |
同步再平衡模式,直到所有必要的数据全部从其他有效节点加载完毕分布式缓存才会启动,这意味着所有对缓存的开放API的调用都会阻塞直到再平衡结束 |
ASYNC |
异步平衡模式,分布式缓存会马上启动,然后在后台会从其他节点加载所有必要的数据 |
NONE |
该模式下不会发生再平衡,这意味着要么在访问数据时从持久化存储载入,要么数据被显式地填充。 |
默认启用ASYNC
再平衡模式,要使用其他的再平衡模式,可以像下面这样设置CacheConfiguration
的rebalanceMode
属性:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set synchronous rebalancing. -->
<property name="rebalanceMode" value="SYNC"/>
...
</bean
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
IgniteConfiguration
提供了一个setRebalanceThreadPoolSize
方法,他可以为了再平衡的需要,从Ignite的系统线程池中获取一定数量的线程,每当一个节点需要向远程节点发送一批数据时,或者需要处理来自相反方向的一批数据时,都会从池中获取一个系统线程,这个远程节点既可能是一个分区的主节点,也可能是备份节点。这个线程在批处理发送或者接收完以及处理完之后,就会被释放。
默认只会有一个线程用于再平衡。这基本上意味着在一个特定的时间点只有一个线程用于从一个节点到另一节点传输批量数据,或者处理来自远端的批量数据。举例来说,如果集群有两个节点和一个缓存,那么所有缓存的分区都会一个一个地按照顺序进行再平衡。如果集群有两个节点和两个不同的缓存,那么这些缓存会以并行的方式进行再平衡,但是在一个特定的时间点,就像上面解释的那样,只会处理属于某一个特定缓存的批量数据。
每个缓存的分区数量不会影响再平衡的性能,有影响的是数据的总量,再平衡线程池大小以及下面章节列出的其他参数。
根据系统中缓存的数量以及缓存中存储的数据量,如果再平衡线程池的大小为1,要将所有的数据再平衡至一个节点上,会花费很长的时间。要加快预加载的进程,可以根据需要增加IgniteConfiguration.setRebalanceThreadPoolSize
的值。
假定将IgniteConfiguration.setRebalanceThreadPoolSize
的值设为4然后考虑上述的示例,再平衡的过程会如下所示:
在内部,系统线程池广泛用于和缓存有关的所有操作(put,get等),SQL引擎和其他模块,因此将
IgniteConfiguration.setRebalanceThreadPoolSize
设置为一个很大的值会显著提高再平衡的性能,但是会影响应用的性能。
当再平衡器将数据从一个节点传输到另一个节点时,他会将整个数据集拆分为多个批次然后将每一个批次作为一个单独的消息进行发送。如果数据集很大的话那么就会有很多的消息要发送,CPU和网络就会过度的消耗,这时在再平衡消息之间进行等待是合理的,以使由于再平衡过程导致的性能下降冲击最小化。这个时间间隔可以通过CacheConfiguration
的rebalanceThrottle
属性进行控制,他的默认值是0,意味着在消息之间没有暂停,注意单个消息的大小也可以通过rebalanceBatchSize
属性进行设置(默认值是512K)。
比如,如果希望再平衡器间隔100ms每个消息发送2MB数据,需要提供如下的配置:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set batch size. -->
<property name="rebalanceBatchSize" value="#{2 * 1024 * 1024}"/>
<!-- Set throttle interval. -->
<property name="rebalanceThrottle" value="100"/>
...
</bean
</property>
</bean>
Java:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setRebalanceBatchSize(2 * 1024 * 1024);
cacheCfg.setRebalanceThrottle(100);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
缓存的再平衡行为可以有选择地通过下面的配置属性进行定制:
CacheConfiguration:
setter方法 | 描述 | 默认值 |
---|---|---|
setRebalanceMode |
分布式缓存的再平衡模式,细节可以参照再平衡模式章节 | ASYNC |
setRebalancePartitionedDelay |
再平衡延迟时间(毫秒) | 0,无延迟 |
setRebalanceBatchSize |
单个再平衡消息的大小(byte),再平衡算法会在发送数据之前将每个节点的整个数据集拆分成多个批次。 | 512K |
setRebalanceThrottle |
不建议,再平衡消息之间的等待时间(毫秒),用来避免CPU和网络的过载,注意应用在再平衡的过程中仍然可以正确执行 | 0,无间隔 |
setRebalanceOrder |
要完成的再平衡的顺序,只有同步和异步再平衡模式的缓存才可以将再平衡顺序设置为非0值,具有更小值的缓存再平衡会被首先完成,再平衡默认是无序的 | 0 |
setRebalanceBatchesPrefetchCount |
为了达到更好的性能,数据提供者节点会在再平衡开始时提供不止一个批次然后在下一个请求时提供一个新的批次。这个方法会设置再平衡开始时数据提供者节点产生的批次的数量 | 2 |
IgniteConfiguration:
setter方法 | 描述 | 默认值 |
---|---|---|
setRebalanceThreadPoolSize |
用于再平衡的线程的最大值 | 1,对整个集群的操作影响最小 |
Ignite具有缓存所有兼容Java Servlet3.0规范的Java Servlet容器的Web Session的能力。包括Apache Tomcat,Eclipse Jetty,Oracle WebLogic以及其他的。
缓存Web会话对于运行一个应用服务器集群时是有用的。当在一个Servlet容器中运行一个Web应用时,可能面临性能和可扩展性的问题,一个单独的应用服务器通常可能无法自己处理很大的流量,一个常规的解决方案就是跨越多个集群实例扩展Web应用。
在上面的架构中,高可用代理(负载平衡器)在多个应用服务器实例之间分发请求(应用服务器1,应用服务器2……),来降低每个实例的负载以及提供在任意实例故障时的服务可用性,这里的问题就是Web会话的可用性。Web会话通过Cookie保持请求之间的中间逻辑状态,并且通常绑定到一个特定的应用实例。通常这是由粘性连接来处理,来确保来自同一个用户的请求被同一个应用服务器实例处理。然而,如果该实例故障,会话就丢失了,所有当前未保存的状态也丢失了,然后用户会重新创建它。
这里的一个解决方案就是用Ignite来缓存Web会话-维护每个创建的会话的拷贝的分布式缓存,在所有的实例中共享。如果任何一个应用实例故障,Ignite会马上从分布式缓存中恢复故障实例所属的会话,而不管下一个请求会被转发到哪个应用服务器。这样的话,随着Web会话被缓存粘性连接就变得不那么重要,因为会话可以用于请求被路由到的任何应用服务器。
这个章节给出了一个Ignite的Web会话缓存功能的主要架构概况以及介绍了如何配置Web应用来启用Web会话缓存。
要用Ignite配置一个分布式Web会话缓存,通常需要将应用启动为一个Ignite节点(嵌入式模式),当多个应用服务器实例启动后,所有的Ignite节点会连姐在一起形成一个分布式缓存。
注意并不是所有的Ignite缓存节点都需要运行在应用服务器内部,也可以启动额外的,独立的Ignite节点,然后将他们加入集群。
当将会话存储在Ignite中时有几个复制策略可供选择,复制策略是在缓存的备份设定中定义的,本章节将主要覆盖最常用的配置。
全复制缓存
这个策略保存每个Ignite节点上的所有会话的拷贝,提供了最大的可用性。然而这个方法缓存的会话的数量必须匹配单个服务器的内存大小,另外,性能也会变差因为现在Web会话状态的每一次改变都必须复制到集群中所有的其他节点。
要启用全复制策略,设置缓存的cacheMode为REPLICATED
:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Cache mode. -->
<property name="cacheMode" value="REPLICATED"/>
...
</bean>
有备份的分区缓存
在分区模式,Web会话会被拆分为区,每个节点只负责缓存分配给该节点的分区的数据,这个方法中如果有更多的节点,就可以缓存更多的数据,新的节点也可以动态地加入以增加更多的内存。
分区
模式中,冗余是通过为每个缓存的Web会话配置一定数量的备份实现的。
要开启分区策略,设置缓存的cacheMode为PARTITIONED
以及通过CacheConfiguration
的backups
属性来配置备份的数量。
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<property name="backups" value="1"/>
</bean>
可以参照
3.3.缓存模型
来了解关于Ignite不同复制策略的更多信息。
当会话过期后会被缓存自动清理。然而,如果创建了大量的长期存活的会话,当缓存达到一个特定的限值时,为了节省内存可能需要将不必要的缓存退出。这个可以通过设置缓存的退出策略以及指定缓存中可以存储的会话的最大值来实现。比如,要启用基于LRU算法的自动退出以及10000个会话的限制,可以通过如下的缓存配置来实现:
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Cache name. -->
<property name="name" value="session-cache"/>
<!-- Set up LRU eviction policy with 10000 sessions limit. -->
<property name="evictionPolicy">
<bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
<property name="maxSize" value="10000"/>
</bean>
</property>
...
</bean>
要了解各个退出策略的更多信息,可以参照
3.15.退出策略
章节。
要在应用中通过Ignite开启Web会话缓存,需要:
添加Ignite的jar包,下载Ignite然后将如下的jar包加入应用的类路径(WEB-INF/lib目录);
或者,如果是一个基于Maven的工程,可以将下面的片段加入应用的pom.xml:
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version> ${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-web</artifactId>
<version> ${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>${ignite.version}</version>
</dependency>
确保将${ignite.version}替换为实际的Ignite版本。
2. 配置缓存模式,配置Ignite的缓存,要么是分区
模式,要么是复制
模式(可以看上面的例子);
3. 更新web.xml,在web.xml中声明一个ContextListener和一个WebSessionsFilter:
...
<listener>
<listener-class>org.apache.ignite.startup.servlet.ServletContextListenerStartup</listener-class>
</listener>
<filter>
<filter-name>IgniteWebSessionsFilter</filter-name>
<filter-class>org.apache.ignite.cache.websession.WebSessionFilter</filter-class>
</filter>
<!-- You can also specify a custom URL pattern. -->
<filter-mapping>
<filter-name>IgniteWebSessionsFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- Specify Ignite configuration (relative to META-INF folder or Ignite_HOME). -->
<context-param>
<param-name>IgniteConfigurationFilePath</param-name>
<param-value>config/default-config.xml </param-value>
</context-param>
<!-- Specify the name of Ignite cache for web sessions. -->
<context-param>
<param-name>IgniteWebSessionsCacheName</param-name>
<param-value>partitioned</param-value>
</context-param>
...
在应用启动时,监听器会在应用中启动一个Ignite节点,他会连接网络中的其他节点以形成一个分布式缓存。
4. 设置退出策略(可选),为缓存中的旧数据设置退出策略(可以看上面的例子)。
配置参数
ServletContextListenerStartup
有如下的配置参数:
参数名 | 描述 | 默认值 |
---|---|---|
IgniteConfigurationFilePath |
Ignite配置文件的路径(相对于META-INF 文件夹或者IGNITE_HOME ) |
/config/default-config.xml |
WebSessionFilter有如下的配置参数:
参数名 | 描述 | 默认值 |
---|---|---|
IgniteWebSessionsGridName |
启动Ignite节点的网格名,可以参照配置文件的grid部分(如果配置文件中指定了网格名) | 无 |
IgniteWebSessionsCacheName |
Web会话缓存的缓存名 | 无 |
IgniteWebSessionsMaximumRetriesOnFail |
只对ATOMIC 缓存有效,指定了当主节点故障时的重试次数 |
3 |
Ignite官方测试了如下的应用服务器:
Ignite提供了JDBC驱动,他可以通过标准SQL和JDBC API来从缓存中获得分布式数据。
Ignite中,JDBC连接URL的规则如下:
jdbc:ignite:cfg://[<params>@]<config_url>
<config_url>
是必需的,表示对于通过JDBC Driver建立连接期间待启动的Ignite客户端节点来说指向Ignite配置文件的任意合法URL,要了解更多细节可以参照1.6.客户端和服务器
章节。<params>
是可选的,格式如下:
param1=value1:param2=value2:...:paramN=valueN
他支持如下的参数:
属性 | 描述 | 默认值 |
---|---|---|
cache |
缓存名,如果未定义会使用默认的缓存,区分大小写 | |
nodeId |
要执行的查询所在节点的Id,对于在本地查询是有用的 | |
local |
查询只在本地节点执行,这个参数和nodeId 参数都是通过指定节点来限制数据集 |
false |
collocated |
优化标志,当Ignite执行一个分布式查询时,他会向单个的集群节点发送子查询,如果提前知道要查询的数据已经被并置到同一个节点,Ignite会有显著的性能提升和网络优化 | false |
distributedJoins |
可以在非并置的数据上使用分布式关联。 | false |
跨缓存查询
驱动连接到的缓存会被视为默认的模式,要跨越多个缓存进行查询,可以参照3.6.缓存查询
章节。
关联和并置
就像3.6.缓存查询
章节描述的那样,通过IgniteCache
API,如果关联对象是以并置模式存储的话,在分区缓存上的关联是可以正常执行的。细节可以参照3.11.关系并置
章节。
复制和分区缓存
在复制
缓存上的查询会直接在一个节点上执行,而在分区
缓存上的查询是分布在所有缓存节点上的。
Ignite JDBC驱动会自动地只获取缓存中存储的对象中实际需要的那些字段,比如,有一个像下面这样的Person
对象。
public class Person {
@QuerySqlField
private String name;
@QuerySqlField
private int age;
// Getters and setters.
...
}
如果在缓存中有这些类的实例,可以通过标准JDBC API
查询单独的字段(name,age或者两个),比如:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");
// Query names of all people.
ResultSet rs = conn.createStatement().executeQuery("select name from Person");
while (rs.next()) {
String name = rs.getString(1);
...
}
// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
stmt.setInt(1, 30);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String name = rs.getString("name");
int age = rs.getInt("age");
...
}
ignite-jdbc.xml
的最小版本大概像下面这样:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="clientMode" value="true"/>
<property name="peerClassLoadingEnabled" value="true"/>
<!-- Configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"/>
</property>
</bean>
</property>
</bean>
</beans>
对于之前版本的Ignite(1.4之前),JDBC连接的URL有如下的格式:
jdbc:ignite://<hostname>:<port>/<cache_name>
细节可以参照文档。
Ignite提供了一个SpringCacheManager
-一个Spring缓存抽象的实现。他提供了基于注解的方式来启用Java方法的缓存,这样方法的执行结果就会存储在Ignite缓存中。如果之后同一个方法通过同样的参数集被调用,结果会直接从缓存中获得而不是实际执行这个方法。
Spring缓存抽象文档
关于如何使用Spring缓存抽象的更多信息,包括可用的注解,可以参照这个文档页面:http://docs.spring.io/spring/docs/current/spring-framework-reference/html/cache.html.
只需要两个简单的步骤就可以将Ignite缓存嵌入基于Spring的应用:
SpringCacheManager
作为缓存管理器。嵌入式节点可以通过SpringCacheManager
自己启动,这种情况下需要分别通过configurationPath
或者configuration
属性提供一个Ignite配置文件的路径或者IgniteConfiguration
Bean(看下面的例子)。注意同时设置两个属性是非法的以及抛出IllegalArgumentException
。
配置路径:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache.xsd">
<!-- Provide configuration file path. -->
<bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
<property name="configurationPath" value="examples/config/spring-cache.xml"/>
</bean>
<!-- Enable annotation-driven caching. -->
<cache:annotation-driven/>
</beans>
配置Bean:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache.xsd">
<-- Provide configuration bean. -->
<bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
<property name="configuration">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
</bean>
</property>
</bean>
<-- Enable annotation-driven caching. -->
<cache:annotation-driven/>
</beans>
当缓存管理器初始化时也可能已经有一个Ignite节点正在运行(比如已经通过ServletContextListenerStartup
启动了)。这时只需要简单地通过gridName
属性提供网格名字就可以了。注意如果不设置网格名字,缓存管理器会试图使用默认的Ignite实例(名字为空的),下面是一个示例:
使用已启动的Ignite实例:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache.xsd">
<!-- Provide grid name. -->
<bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
<property name="gridName" value="myGrid"/>
</bean>
<!-- Enable annotation-driven caching. -->
<cache:annotation-driven/>
</beans>
远程节点
注意应用内部启动的节点只是希望连接的网络的一个入口,可以根据需要通过Ignite发行版提供的bin/ignite.{sh|bat}
脚本启动尽可能多的远程独立节点,所有这些节点都会参与缓存数据。
虽然通过Ignite配置文件可以获得所有必要的缓存,但是这不是必要的。如果Spring要使用一个不存在的缓存时,SpringCacheManager
会自动创建它。
如果不指定,会使用默认值创建一个新的缓存。要定制的话,可以通过dynamicCacheConfiguration
属性提供一个配置模板,比如,如果希望使用复制
缓存而不是分区
缓存,可以像下面这样配置SpringCacheManager
:
<bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
...
<property name="dynamicCacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="cacheMode" value="REPLICATED"/>
</bean>
</property>
</bean>
也可以在客户端侧使用近缓存,要做到这一点只需要简单地通过dynamicNearCacheConfiguration
属性提供一个近缓存配置即可。默认的话近缓存是不启用的,下面是一个例子:
<bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
...
<property name="dynamicNearCacheConfiguration">
<bean class="org.apache.ignite.configuration.NearCacheConfiguration">
<property name="nearStartSize" value="1000"/>
</bean>
</property>
</bean>
一旦在Spring应用上下文中加入了SpringCacheManager
,就可以通过简单地加上注解为任意的java方法启用缓存。
通常为很重的操作使用缓存,比如数据库访问。比如,假设有个Dao类有一个averageSalary(...)
方法,他计算一个组织内的所有雇员的平均工资,那么可以通过@Cacheable
注解来开启这个方法的缓存。
private JdbcTemplate jdbc;
@Cacheable("averageSalary")
public long averageSalary(int organizationId) {
String sql =
"SELECT AVG(e.salary) " +
"FROM Employee e " +
"WHERE e.organizationId = ?";
return jdbc.queryForObject(sql, Long.class, organizationId);
}
当这个方法第一次被调用时,SpringCacheManager
会自动创建一个averageSalary
缓存,他也会在缓存中查找事先计算好的平均值然后如果存在的话就会直接返回,如果这个组织的平均值还没有被计算过,那么这个方法就会被调用然后将结果保存在缓存中,因此下一次请求这个组织的平均值,就不需要访问数据库了。
缓存键
因为organizationId
是唯一的方法参数,所以他会自动作为缓存键。
如果一个雇员的工资发生变化,可能希望从缓存中删除这个雇员所属组织的平均值,否则averageSalary(...)
方法会返回过时的缓存结果。这个可以通过将@CacheEvict
注解加到一个方法上来更新雇员的工资:
private JdbcTemplate jdbc;
@CacheEvict(value = "averageSalary", key = "#e.organizationId")
public void updateSalary(Employee e) {
String sql =
"UPDATE Employee " +
"SET salary = ? " +
"WHERE id = ?";
jdbc.update(sql, e.getSalary(), e.getId());
}
在这个方法被调用之后,这个雇员所属组织的平均值就会被从averageSalary
缓存中踢出,这会强迫averageSalary(...)
方法在下一次调用时重新计算。
Spring表达式语言(SpEL)
注意这个方法是以雇员为参数的,而平均值是通过组织的Id将平均值存储在缓存中的。为了明确地指定什么作为缓存键,可以使用注解的key
参数和Spring表达式语言。
#e.organizationId
表达式的意思是从e变量中获取organizationId
属性的值。本质上会在提供的雇员对象上调用getOrganizationId()
方法,以及将返回的值作为缓存键。
拓扑验证器用于验证集群网络拓扑对于未来的缓存操作是否有效。
拓扑验证器在每次集群拓扑发生变化时都会被调用(或者新节点加入或者已有节点故障或者其他的)。如果没有配置拓扑验证器,那么集群拓扑会被认为一直有效。
当TopologyValidator.validate(Collection)
方法返回true时,那么对于特定的缓存以及在这个缓存上的所有有效操作拓扑都会被认为是有效的,否则,该缓存上的所有更新操作都会抛出如下异常:
CacheException
:所有试图更新的操作都会抛出(put,remove等)IgniteException
:试图进行事务提交的操作会抛出返回false以及声明拓扑无效后,当下一次拓扑发生变化时拓扑验证器可以返回正常状态。
示例:
...
for (CacheConfiguration cCfg : iCfg.getCacheConfiguration()) {
if (cCfg.getName() != null) {
if (cCfg.getName().equals(CACHE_NAME_1))
cCfg.setTopologyValidator(new TopologyValidator() {
@Override public boolean validate(Collection<ClusterNode> nodes) {
return nodes.size() == 2;
}
});
else if (cCfg.getName().equals(CACHE_NAME_2))
cCfg.setTopologyValidator(new TopologyValidator() {
@Override public boolean validate(Collection<ClusterNode> nodes) {
return nodes.size() >= 2;
}
});
}
}
...
在这个例子中,对缓存允许更新操作情况如下:
CACHE_NAME_1
:集群具有两个节点时CACHE_NAME_2
:集群至少有两个节点时配置
拓扑验证器通过CacheConfiguration.setTopologyValidator(TopologyValidator)
方法既可以用代码也可以用XML进行配置。
缓存事务会隐式地获得锁,然而,有些情况下显式锁是很有用的。IgniteCache
API的lock()
方法会返回一个java.util.concurrent.locks.Lock
的实例,他可以在任意给定的键上定义显式的分布式锁,也可以通过IgniteCache.lockAll()
方法给集合对象加锁。
IgniteCache<String, Integer> cache = ignite.cache("myCache");
// Create a lock for the given key.
Lock lock = cache.lock("keyLock");
try {
// Aquire the lock.
lock.lock();
cache.put("Hello", 11);
cache.put("World", 22);
}
finally {
// Release the lock.
lock.unlock();
}
原子化模式
Ignite中,只有在TRANSACTIONAL
原子化模式中才支持锁,他可以通过CacheConfiguration
的atomicityMode
属性进行配置。
锁和事务
显式锁是非事务性的,不能在事务中使用(会抛出异常)。如果确实需要在事务中使用显式锁,那么需要使用事务的TransactionConcurrency.PESSIMISTIC
并发控制,他会为相关的缓存操作获得显式锁。