@liyuj
2018-03-31T21:12:10.000000Z
字数 20751
阅读 8394
Apache-Ignite-2.3.0-中文开发手册
Ignite的原生持久化是一个分布式的ACID和兼容SQL的磁盘存储,它可以透明地与Ignite的固化内存进行集成。Ignite的持久化是可选的,可以打开也可以关闭,当关闭时Ignite就会变成一个纯内存存储。
Ignite的原生持久化会在磁盘上存储一个数据的超集,以及根据容量在内存中存储一个子集。比如,如果有100个条目,然后内存只能存储20条,那么磁盘上会存储所有的100条,然后为了提高性能在内存中缓存20条。
另外值得一提的是,和纯内存的使用场景一样,当打开持久化时,每个独立的节点只会持久化数据的一个子集,不管是主还是备节点,都是只包括节点所属的分区的数据,总的来说,整个集群包括了完整的数据集。
Ignite的原生持久化有如下的特性:
要开启Ignite的原生持久化,需要给集群的配置传递一个PersistentStoreConfiguration
的实例:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Enabling Apache Ignite native persistence. -->
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
</bean>
</property>
</bean>
</property>
<!-- Additional setting. -->
</bean>
Java:
// Apache Ignite node configuration.
IgniteConfiguration cfg = new IgniteConfiguration();
// Ignite persistence configuration.
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
// Enabling the persistence.
storageCfg.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
// Applying settings.
cfg.setDataStorageConfiguration(storageCfg);
持久化开启之后,所有的数据和索引都会存储在所有集群节点的内存和磁盘上,下图描述了在单独的集群节点的文件系统层看到的持久化结构:
首先,节点中的每个缓存都要有一个唯一的目录,从上图可知,可以看到至少两个缓存(Cache_A和Cache_B),由节点来维护他们的数据和索引。
其次,对于节点的每个分区,不管是主还是备,Ignite的原生持久化都会在文件系统中创建一个专用文件。比如,对上面的节点来说,它负责分区1,10到564,索引是每个缓存一个文件。
缓存组和分区文件
如果Cache_A和Cache_B属于同一个缓存组,那么这些缓存共享的分区文件会放在一个目录中。
最后,和预写日志活动有关的文件和目录,下面还会介绍。
集群激活
注意如果开启了Ignite持久化,集群默认是未激活的,无法进行任何的CRUD操作。用户需要手工激活集群,后面会介绍如何进行操作。
上述的文件层次默认是在一个名为${IGNITE_HOME}/work/db
的目录中进行维护的,要改变存储和WAL文件的默认位置,可以使用DataStorageConfiguration
中对应的setStoragePath(...)
、setWalPath(...)
和setWalArchivePath(...)
方法。
如果一台主机启动了若干个节点,那么每个节点进程都会在一个预定义的唯一子目录中,比如${IGNITE_HOME}/work/db/node{IDX}-{UUID}
,有自己的持久化文件,这里IDX
和UUID
参数都是Ignite在节点启动时自动计算的(这里有详细描述)。如果在持久化层次结构中已经有了若干node{IDX}-{UUID}
子目录,那么他们是按照节点先入先出的顺序进行赋值的。如果希望某节点即使重启也有专用目录和专用的数据分区,需要在集群范围配置唯一的IgniteConfiguration.setConsistentId
,这个唯一ID会在node{IDX}-{UUID}
字符串中映、射setStoragePath(...)到、setWalArchivePath(...)
ffUUID`。
一台主机隔离集群中的节点
Ignite可以在一台主机上隔离多个集群,每个集群都要在文件系统的不同目录下存储持久化文件,这时可以通过DataStorageConfiguration
的setStoragePath(...)
、setStoragePath(...)
、setWalArchivePath(...)
方法来重新定义每个集群的相应的路径。
Ignite的原生持久化是一个兼容ACID的分布式存储,每个事务性更新都会首先被添加到WAL。更新会被赋予一个唯一的ID,这意味着集群在故障或者重启时总是会恢复到最近的成功提交的事务或者原子性更新。
Ignite的原生持久化可以将Ignite作为一个分布式的SQL数据库。
在集群中执行SQL查询时是不需要在内存中保存所有的数据的,Ignite会在内存和磁盘上的所有数据中执行。另外,在集群重启后将所有的数据都预加载到内存中也是一个选择,这时当集群启动运行时,就可以执行SQL查询了。
本文档提供了Ignite持久化的一个高层视图,如果想了解更多的技术细节,可以看下面的文档:
在11.4.固化内存调优章节中有关于性能方面的建议。
要了解Ignite的原生持久化在实践中的应用,可以看Github中的这个示例。
Ignite的持久化会为节点的每个分区创建和维护一个专有文件,但是当内存中的页面更新时,更新是不会直接写入对应的分区文件的,因为会严重影响性能,而是将数据写入预写日志的尾部(WAL)。
WAL的目的是以最快的速度向磁盘传播更新,以及为单个节点或者整个集群故障的场景提供一种恢复机制。值得一提的是,集群可以根据WAL的内容在故障或者重启时随时恢复到最近成功提交的事务。
整个WAL会被拆分为若干个文件,叫做段,它是按顺序进行填充的。当第一个段满了之后,它的内容会被复制到WAL档案,然后在那里保存由DataStorageConfiguration.walHistorySize
配置的时间。复制完成之后,第二个段会被视为激活的WAL文件,然后接收由应用发送过来的更新请求。默认会创建和使用10个这样的段,这个数值可以通过DataStorageConfiguration.setWalSegmentSize
进行修改。
根据WAL模式的不同,Ignite提供了如下的一致性保证:
WAL模式 | 描述 | 一致性保证 |
---|---|---|
DEFAULT |
保证每个原子写或者事务性提交都会持久化到磁盘。 | 数据更新不会丢失,不管是任何的操作系统或者进程故障,甚至是电源故障。 |
LOG_ONLY |
保证每个原子写或者事务性提交都会刷新到操作系统的缓冲区缓存。 | 如果仅仅是进程崩溃数据更新会保留。 |
BACKGROUND |
变更会定期地刷新到节点的内部缓冲区,缓冲区刷新到磁盘的频率由DataStorageConfiguration.setWalFlushFrequency 参数定义。 |
如果进程故障或者其他的故障发生时,最近的数据更新可能丢失。 |
NONE |
WAL被禁用,只有在检查点进程执行过程中或者节点正常关闭时,变更才会正常持久化,使用Ignite#active(false) 可以优雅地停止节点。 |
没有一致性保证,如果一个节点异常终止,存储于磁盘上的数据很可能会损坏,并且在节点重启后持久化目录需要清理。 |
下面是如何配置WAL模式的代码示例:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<!-- Enabling Apache Ignite Persistent Store. -->
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
</bean>
</property>
<!-- Changing WAL Mode. -->
<property name="walMode" value="LOG_ONLY"/>
</bean>
</property>
<!-- Additional setting. -->
</bean>
Java:
// Apache Ignite node configuration.
IgniteConfiguration cfg = new IgniteConfiguration();
// Native Persistence configuration.
DataStorageConfiguration psCfg = new DataStorageConfiguration();
// Enabling the persistence.
psCfg.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
// Set WAL Mode.
psCfg.setWalMode(WALMode.LOG_ONLY);
// Enabling the Persistent Store.
cfg.setDataStorageConfiguration(psCfg);
//Additional parameters.
由于WAL文件会一直增长,并且通过WAL从头到尾地恢复集群会花费大量的时间。为了解决这个问题,Ignite引入了一个检查点过程。
检查点是一个将脏页面从内存复制到磁盘上的分区文件的过程,脏页面是指页面已经在内存中进行了更新但是还没有写入对应的分区文件(只是添加到了WAL中)。
这个过程有助于通过在磁盘上保持页面的最新状态而高效地利用磁盘空间,并且允许在WAL档案中删除过时的WAL段(文件)。
下图显示的是一个简单的更新操作的执行过程:
如果使用了Ignite的持久化,集群默认为非激活状态,无法进行任何的CRUD操作,用户需要手工激活集群。比如,如果集群未激活然后用户试图执行一个SQL或者键-值查询,会抛出异常,如下图所示:
这么做是为了避免在集群刚重启,数据还没有加载到集群中,应用就开始修改数据,这样会导致数据不一致。
通常的做法是等待所有的节点加入集群,然后使用下面的方法手工激活集群。
自动激活集群
自动激活集群在未来的某个版本中会实现。
想通过代码激活集群,可以在应用中调用Ignite.active()
方法。
Ignite ignite = ...;
// Activating the cluster once all the cluster nodes are up and running.
ignite.active(true);
通过Web控制台进行激活,可以使用Monitoring
面板右上角的Cluster inactive
开关。
下图显示集群已被激活:
在命令行中,使用$IGNITE_HOME/bin
文件夹中的control.sh|bat
脚本,比如
.sh:
control.sh|bat
keyi
./control.sh --activate
.bat:
./control.bat --activate
control.sh|bat
可以使用如下的参数:
参数 | 描述 |
---|---|
--activate | 使集群处于激活状态 |
--deactivate | 使集群处于非激活状态 |
--host {ip} | 集群IP地址 |
--port {port} | 连接端口 |
使用./control.sh --help
可以查看帮助。
在停止节点之前,建议首先通过上述的方法使集群处于非激活状态。
但是,即使集群异常终止,也不会导致数据的损坏或者不一致,因为所有未完成的操作/事务都会被忽略,而WAL会在重启之后进行重做处理。
JCache提供了javax.cache.integration.CacheLoader
和javax.cache.integration.CacheWriter
API,他们分别用于底层持久化存储的通读
和通写
(比如RDBMS中的Oracle或者MySQL,以及NoSQL数据库中的MongoDB或者CouchDB)。
虽然Ignite可以单独地配置CacheRLoader
和CacheWriter
,但是在两个单独的类中实现事务化存储是非常尴尬的,因为多个load
和put
操作需要在同一个事务中的同一个连接中共享状态。为了解决这个问题,Ignite提供了org.apacche.ignite.cache.store.CacheStore
接口,他同时扩展了CacheLoader
和CacheWriter
。
事务
CacheStore
是完整事务性的,他会自动地融入当前的缓存事务。
CacheJdbcPojoStore
Ignite附带了他自己的CacheJdbcPojoStore
,他会自动地建立Java POJO和数据库模式之间的映射。
当希望通读和通写行为时,提供一个正确的缓存存储的实现是很重要的。通读意味着当缓存无效时会从底层的持久化存储中读取,通写意味着当缓存更新时会自动地进行持久化。所有的通读和通写都会参与整体的缓存事务以及作为一个整体提交或者回滚。
要配置通读和通写,需要实现CacheStore
接口以及设置CacheConfiguration
中cacheStoreFactory
的readThrough
和writeThrough
属性,下面的示例会有说明。
在一个简单的通写模式中每个缓存的put和remove操作都会涉及一个持久化存储的请求,因此整个缓存更新的持续时间可能是相对比较长的。另外,密集的缓存更新频率也会导致非常高的存储负载。
对于这种情况,Ignite提供了一个选项来执行异步化的持久化存储更新,也叫做后写,这个方式的主要概念是累加更新操作然后作为一个批量操作异步化地刷入持久化存储中。真实的数据持久化可以被基于时间的事件触发(数据输入的最大时间受到队列的限制),也可以被队列的大小触发(当队列大小达到一个限值),或者通过两者的组合触发,这时任何事件都会触发刷新。
更新顺序
对于后写的方式只有数据的最后一次更新会被写入底层存储。如果键为key1的缓存数据分别依次地更新为值value1、value2和value3,那么只有(key1,value3)对这一个存储请求会被传播到持久化存储。
更新性能
批量的存储操作通常比按顺序的单一存储操作更有效率,因此可以通过开启后写模式的批量操作来利用这个特性。简单类型(put和remove)的简单顺序更新操作可以被组合成一个批量操作。比如,连续地往缓存中加入(key1,value1),(key2,value2),(key3,value3)可以通过一个单一的CacheStore.putAll(...)
操作批量处理。
后写缓存可以通过CacheConfiguration.setWriteBehindEnabled(boolean)
配置项来开启,下面的16.5.7.配置
章节显示了一个完整的配置属性列表来进行后写缓存行为的定制。
Ignite中的CacheStore
接口用于向底层的数据存储写入或者加载数据。除了标准的JCache加载和存储方法,他还引入了最终事务划界以及从底层数据存储批量载入数据的能力。
loadCache()
CacheStore.loadCache()
方法可以加载缓存,即使没有传入要加载的所有键,它通常用于启动时缓存的热加载,但是也可以在缓存加载完之后的任何时间点调用。
在每一个相关的集群节点,IgniteCache.loadCache()
方法会分配给CacheStore.loadCache()
方法,如果只想在本地节点上进行加载,可以用IgniteCache.localLoadCache()
方法。
对于分区缓存,不管是主节点还是备份节点,如果键没有被映射到该节点,会被缓存自动丢弃。
load(), write(), delete()
当IgniteCache
接口的get
,put
,remove
方法被调用时,相对应的CacheStore
的load()
,write()
和delete()
方法会被调用,当与单个缓存数据工作时,这些方法会用于启用通读和通写行为。
loadAll(), writeAll(), deleteAll()
当IgniteCache
接口的getAll
,putAll
,removeAll
方法被调用时,相对应的CacheStore
的loadAll()
,writeAll()
和deleteAll()
方法会被调用,当与多个缓存数据工作时,这些方法会用于启用通读和通写行为,他们通常用批量操作的方式实现以提供更好的性能。
CacheStoreAdapter
提供了loadAll()
,writeAll()
和deleteAll()
方法的默认实现,他只是简单地对键进行一个一个地迭代。
sessionEnd()
Ignite有一个存储会话的概念,他可以跨越不止一个的缓存存储操作,会话对于事务非常有用。
对于原子化
的缓存,sessionEnd()
方法会在每个CacheStore
方法完成之后被调用,对于事务化
的缓存,不管是在底层持久化存储进行提交或者回滚多个操作,sessionEnd()
方法都会在每个事务结束后被调用。
CacheStoreAdapater
提供了sessionEnd()
方法的默认的空实现。
Cassandra Cache Store
Ignite提供了将Apache Cassandra作为内存网格级CacheStore
的开箱即用的集成,要了解更多的信息,可以查看相关的文档。
缓存存储会话的主要目的是当CacheStore
用于事务中时在多个存储操作中持有一个上下文。比如,如果使用JDBC,可以通过CacheStoreSession.attach()
方法保存数据库的连接,然后可以在CacheStore.sessionEnd(boolean)
方法中提交这个连接。
CacheStoreSession
可以通过@GridCacheStoreSessionResource
注解注入自定义的缓存存储实现中。
下面是几个不同场景的缓存存储的实现,注意事务化的实现用还是没用事务。
JDBC非事务:
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
// This mehtod is called whenever "get(...)" methods are called on IgniteCache.
@Override public Person load(Long key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
st.setLong(1, key);
ResultSet rs = st.executeQuery();
return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load: " + key, e);
}
}
// This mehtod is called whenever "put(...)" methods are called on IgniteCache.
@Override public void write(Cache.Entry<Long, Person> entry) {
try (Connection conn = connection()) {
// Syntax of MERGE statement is database specific and should be adopted for your database.
// If your database does not support MERGE statement then use sequentially update, insert statements.
try (PreparedStatement st = conn.prepareStatement(
"merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
for (Cache.Entry<Long, Person> entry : entries) {
Person val = entry.getValue();
st.setLong(1, entry.getKey());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
st.executeUpdate();
}
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
}
}
// This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
@Override public void delete(Object key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
st.setLong(1, (Long)key);
st.executeUpdate();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to delete: " + key, e);
}
}
// This mehtod is called whenever "loadCache()" and "localLoadCache()"
// methods are called on IgniteCache. It is used for bulk-loading the cache.
// If you don't need to bulk-load the cache, skip this method.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;
while (cnt < entryCnt && rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
cnt++;
}
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
// Open JDBC connection.
private Connection connection() throws SQLException {
// Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
// In this example we use H2 Database for simplification.
Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
conn.setAutoCommit(true);
return conn;
}
}
JDBC事务:
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
// Complete transaction or simply close connection if there is no transaction.
@Override public void sessionEnd(boolean commit) {
try (Connection conn = ses.getAttached()) {
if (conn != null && ses.isWithinTransaction()) {
if (commit)
conn.commit();
else
conn.rollback();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to end store session.", e);
}
}
// This mehtod is called whenever "get(...)" methods are called on IgniteCache.
@Override public Person load(Long key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
st.setLong(1, key);
ResultSet rs = st.executeQuery();
return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load: " + key, e);
}
}
// This mehtod is called whenever "put(...)" methods are called on IgniteCache.
@Override public void write(Cache.Entry<Long, Person> entry) {
try (Connection conn = connection()) {
// Syntax of MERGE statement is database specific and should be adopted for your database.
// If your database does not support MERGE statement then use sequentially update, insert statements.
try (PreparedStatement st = conn.prepareStatement(
"merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
for (Cache.Entry<Long, Person> entry : entries) {
Person val = entry.getValue();
st.setLong(1, entry.getKey());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
st.executeUpdate();
}
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
}
}
// This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
@Override public void delete(Object key) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
st.setLong(1, (Long)key);
st.executeUpdate();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to delete: " + key, e);
}
}
// This mehtod is called whenever "loadCache()" and "localLoadCache()"
// methods are called on IgniteCache. It is used for bulk-loading the cache.
// If you don't need to bulk-load the cache, skip this method.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;
while (cnt < entryCnt && rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
cnt++;
}
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
// Opens JDBC connection and attaches it to the ongoing
// session if within a transaction.
private Connection connection() throws SQLException {
if (ses.isWithinTransaction()) {
Connection conn = ses.getAttached();
if (conn == null) {
conn = openConnection(false);
// Store connection in the session, so it can be accessed
// for other operations within the same transaction.
ses.attach(conn);
}
return conn;
}
// Transaction can be null in case of simple load or put operation.
else
return openConnection(true);
}
// Opens JDBC connection.
private Connection openConnection(boolean autocommit) throws SQLException {
// Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
// In this example we use H2 Database for simplification.
Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
conn.setAutoCommit(autocommit);
return conn;
}
}
JDBC批量操作:
public class CacheJdbcPersonStore extends CacheStore<Long, Person> {
// Skip single operations and open connection methods.
// You can copy them from jdbc non-transactional or jdbc transactional examples.
...
// This mehtod is called whenever "getAll(...)" methods are called on IgniteCache.
@Override public Map<K, V> loadAll(Iterable<Long> keys) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement(
"select firstName, lastName from PERSONS where id=?")) {
Map<K, V> loaded = new HashMap<>();
for (Long key : keys) {
st.setLong(1, key);
try(ResultSet rs = st.executeQuery()) {
if (rs.next())
loaded.put(key, new Person(key, rs.getString(1), rs.getString(2));
}
}
return loaded;
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to loadAll: " + keys, e);
}
}
// This mehtod is called whenever "putAll(...)" methods are called on IgniteCache.
@Override public void writeAll(Collection<Cache.Entry<Long, Person>> entries) {
try (Connection conn = connection()) {
// Syntax of MERGE statement is database specific and should be adopted for your database.
// If your database does not support MERGE statement then use sequentially update, insert statements.
try (PreparedStatement st = conn.prepareStatement(
"merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
for (Cache.Entry<Long, Person> entry : entries) {
Person val = entry.getValue();
st.setLong(1, entry.getKey());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
st.addBatch();
}
st.executeBatch();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to writeAll: " + entries, e);
}
}
// This mehtod is called whenever "removeAll(...)" methods are called on IgniteCache.
@Override public void deleteAll(Collection<Long> keys) {
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
for (Long key : keys) {
st.setLong(1, key);
st.addBatch();
}
st.executeBatch();
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to deleteAll: " + keys, e);
}
}
}
下面的配置参数可以通过CacheConfiguration
用于启用以及配置后写缓存:
setter方法 | 描述 | 默认值 |
---|---|---|
setWriteBehindEnabled(boolean) |
设置后写是否启用的标志 | false |
setWriteBehindFlushSize(int) |
后写缓存的最大值,如果超过了这个限值,所有的缓存数据都会被刷入缓存存储然后写缓存被清空。如果值为0,刷新操作将会依据刷新频率间隔,注意不能将写缓存大小和刷新频率都设置为0 | 10240 |
setWriteBehindFlushFrequency(long) |
后写缓存的刷新频率,单位为毫秒,该值定义了从对缓存对象进行插入/删除和当相应的操作被施加到缓存存储的时刻之间的最大时间间隔。如果值为0,刷新会依据写缓存大小,注意不能将写缓存大小和刷新频率都设置为0 | 5000 |
setWriteBehindFlushThreadCount(int) |
执行缓存刷新的线程数 | 1 |
setWriteBehindBatchSize(int) |
后写缓存存储操作的操作数最大值 | 512 |
CacheStore
接口可以在IgniteConfiguration
上通过一个工厂进行设置,就和CacheLoader
和CacheWriter
同样的方式。
对于分布式缓存的配置,
Factory
应该是可序列化的。
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="foo.bar.MyPersonStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</list>
</property>
...
</bean>
Java:
IgniteConfiguration cfg = new IgniteConfiguration();
CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();
cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyPersonStore.class));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
cfg.setCacheConfiguration(cacheCfg);
// Start Ignite node.
Ignition.start(cfg);
CacheJdbcBlobStore
实现基于JDBC,这个实现将对象以BLOB
的格式存储在底层数据库中。存储会在数据库中创建名为ENTRIES
的表来存储数据,表具有key和val两个字段。
如果提供了定制的DDL和DML语句,表和字段的名字要和所有的语句一致以及参数的顺序也要保留。
使用CacheJdbcBlobStoreFactory
工厂来向CacheConfiguration
传入CacheJdbcBlobStore
:
Spring:
<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource">
<property name="url" value="jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1" />
</bean>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<property name="cacheStoreFactory">
<bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
<property name="user" value = "user" />
<property name="dataSourceBean" value = "simpleDataSource" />
</bean>
</property>
</bean>
</list>
</property>
...
</bean>
CacheJdbcPojoStore
实现基于JDBC和基于反射的POJO,这个实现将对象用基于反射的Java Bean映射描述的形式存储在底层数据库中。
使用CacheJdbcPojoStoreFactory
工厂来向CacheConfiguration
传入CacheJdbcPojoStore
:
Spring:
<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
...
<property name="cacheStoreFactory">
<bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
<property name="dataSourceBean" value = "simpleDataSource" />
</bean>
</property>
</bean>
</list>
</property>
</bean>
CacheHibernateBlobStore
实现基于Hibernate,这个实现将对象以BLOB
的格式存储在底层数据库中。
使用CacheHibernateBlobStoreFactory
工厂来向CacheConfiguration
传入CacheHibernateBlobStore
:
Spring:
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<bean class="org.apache.ignite.cache.store.hibernate.CacheHibernateBlobStoreFactory">
<property name="hibernateProperties">
<props>
<prop key="connection.url">jdbc:h2:mem:</prop>
<prop key="hbm2ddl.auto">update</prop>
<prop key="show_sql">true</prop>
</props>
</property>
</bean>
</list>
</property>
...
</bean>
可以查看Cassandra集成
相关章节的内容,了解更详细的信息。