@aloxc
2017-12-05T04:11:03.000000Z
字数 18595
阅读 640
一起学
ignite
cachestore
源码分析
之前测试了ignite中的数据网格(缓存)功能的cachestore,今天整理下代码发出来。
CacheLoadOnlyStoreExampleUsingMysql.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<context:component-scan base-package="com.github.aloxc.ignite.datagrid.store"/>
<!-- Datasource for sample in-memory H2 database. -->
<bean id="h2-example-db" class="org.h2.jdbcx.JdbcDataSource">
<property name="URL" value="jdbc:h2:tcp://localhost/mem:ExampleDb" />
<property name="user" value="sa" />
</bean>
<bean class="org.springframework.jdbc.datasource.DriverManagerDataSource" id="tybbs_user_datasource">
<property name="driverClassName" value="com.mysql.jdbc.Driver"></property>
<property name="url" value="jdbc:mysql://192.168.20.185:3306/b_user"></property>
<property name="username" value="xroot"></property>
<property name="password" value="l2lDm*xpass"></property>
</bean>
<bean id="cacheOnlyStoreFactoryExampleUsingMysql" class="com.github.aloxc.ignite.datagrid.store.CacheOnlyStoreFactoryExampleUsingMysql">
<property name="dataSource" ref="tybbs_user_datasource"></property>
</bean>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="peerClassLoadingEnabled" value="true"/>
<property name="clientMode" value="true"/>
<property name="deploymentMode" value="SHARED"/>
<property name="marshaller">
<bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/>
</property>
<property name="cacheConfiguration" >
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="cacheMode" value="PARTITIONED"/>
<property name="writeThrough" value="true"/>
<property name="readThrough" value="true" />
<property name="backups" value="1"/>
<property name="name" value="cacheOnlyUsingMysql"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder.SingletonFactory">
<constructor-arg ref="cacheOnlyStoreFactoryExampleUsingMysql" />
</bean>
</property>
<property name="queryEntities">
<util:list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Long"/>
<property name="valueType" value="com.github.aloxc.ignite.model.Person"/>
<property name="fields">
<map>
<entry key="id" value="java.lang.Long"/>
<entry key="first_name" value="java.lang.String"/>
<entry key="last_name" value="java.lang.String"/>
<entry key="resume" value="java.lang.String" />
</map>
</property>
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg name="field" value="id" />
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg name="field" value="first_name" />
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg name="field" value="resume"/>
<property name="indexType" value="FULLTEXT"/>
</bean>
</list>
</property>
</bean>
</util:list>
</property>
</bean>
</list>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list> <value>192.168.16.68:47500..47509</value> <value>192.168.16.69:47500..47509</value>
<value>192.168.16.70:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
CacheLoadOnlyStoreExampleUsingMysql.java
package com.github.aloxc.ignite.datagrid.store;
import com.github.aloxc.ignite.model.Person;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cache.store.CacheLoadOnlyStoreAdapter;
import org.apache.ignite.lang.IgniteBiPredicate;
import javax.cache.Cache;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
public class CacheLoadOnlyStoreExampleUsingMysql {
private static Log logger = LogFactory.getLog(CacheLoadOnlyStoreExampleUsingMysql.class);
private static String CACHE_NAME = CacheLoadOnlyStoreExampleUsingMysql.class.getSimpleName();
private static CacheOnlyStoreFactoryExampleUsingMysql cacheOnlyStoreFactoryExampleUsingMysql;
public static void main(String[] args) throws IgniteException {
new CacheLoadOnlyStoreExampleUsingMysql().doit();
}
public void doit(){
try (Ignite ignite = Ignition.start("com/github/aloxc/ignite/datagrid/store/CacheLoadOnlyStoreExampleUsingMysql.xml")) {
logger.error(">>> CacheLoadOnlyStoreExample started.");
CACHE_NAME = "cacheOnlyUsingMysql";
try (IgniteCache<Long, Person> cache = ignite.getOrCreateCache(CACHE_NAME)) {
logger.error("cache.getName : " + cache.getName());
logger.error("读取编号1的person" + cache.get(1L));
IgniteCache<Long, BinaryObject> biCache = cache.withKeepBinary();
ScanQuery<Long, BinaryObject> scan = new ScanQuery<>(
new IgniteBiPredicate<Long, BinaryObject>() {
@Override
public boolean apply(Long id, BinaryObject person) {
boolean isTrue = person.<String>field("firstName").equals("Robert");
if (isTrue) {
logger.error("找到结果 id = " + id + "\t" + person);
} else {
logger.error(">>错误结果 id = " + id);
}
return isTrue;
}
}
);
QueryCursor cursor = biCache.query(scan);
logger.error("执行扫描查找结果" + cursor.getAll());
Iterator<Cache.Entry<Long, Person>> its = cache.iterator();
while (its.hasNext()) {
logger.error(its.next());
}
logger.error("缓存中所有数据条数 = " + cache.size(CachePeekMode.PRIMARY));
Person person5 = cache.get(5L);
logger.error("读取编号5的person" + person5);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
person5.setLastName("lastName-" + sdf.format(new Date()));
cache.replace(person5.getId(),person5);
logger.error("待更新编号5的person" + person5);
Thread.sleep(2000);
for (int i = 0; i < 10; i++) {
Person maxPerson = getMaxPerson();
logger.error("最大一个person"+maxPerson);
// try{
// logger.error(">>> 读取一个不存在的person: " + cache.get(maxPerson.getId() + 1L));
// }catch (Exception e){
// logger.error("读取一个不存在的person异常"+e.getMessage());
// }
maxPerson.setId(maxPerson.getId() + 1L);
maxPerson.setFirstName("max-first-" + maxPerson.getId());
maxPerson.setLastName("max-last-" + maxPerson.getId());
maxPerson.setResume("resume-" + maxPerson.getId());
boolean maxTrue = cache.putIfAbsent(maxPerson.getId(),maxPerson);
logger.error("新增记录结果" + maxTrue);
}
SqlQuery sqlQuery = new SqlQuery(Person.class," id<10 order by id desc limit 2,2");
logger.error("执行sqlQuery分页查找结果" + cache.query(sqlQuery).getAll());
Person maxPerson = getMaxPerson();
maxPerson.setId(maxPerson.getId() + 1L);
maxPerson.setFirstName("max-first-" + maxPerson.getId());
maxPerson.setLastName("max-last-" + maxPerson.getId());
maxPerson.setResume("中华 人民共和国-" + maxPerson.getId());
boolean maxTrue = cache.putIfAbsent(maxPerson.getId(),maxPerson);
logger.error("新增[中华人民共和国]记录结果" + maxTrue + "\t" + maxPerson);
logger.error("执行文本查找[中华]结果" + cache.query(new TextQuery<Long,Person>(Person.class,"中华")).getAll());
logger.error("执行文本查找[共和]结果" + cache.query(new TextQuery<Long,Person>(Person.class,"共和")).getAll());
SqlFieldsQuery sqlFieldsQuery = new SqlFieldsQuery("select * from person where id>? order by id desc limit 2,2 ");
sqlFieldsQuery.setArgs(10);
logger.error("执行sqlFieldsQuery分页查找结果" + cache.query(sqlFieldsQuery).getAll());
sqlFieldsQuery = new SqlFieldsQuery("select * from person where resume like '%resume-%5%' order by id desc limit 2,2 ");
logger.error("执行sqlFieldsQuery分页查找结果2" + cache.query(sqlFieldsQuery).getAll());
sqlFieldsQuery = new SqlFieldsQuery("select * from person where resume like '%中华%' order by id desc limit 2,2 ");
logger.error("执行sqlFieldsQuery分页查找结果3" + cache.query(sqlFieldsQuery).getAll());
sqlFieldsQuery = new SqlFieldsQuery("select * from person where resume like '%民共%' order by id desc limit 2,2 ");
logger.error("执行sqlFieldsQuery分页查找结果4" + cache.query(sqlFieldsQuery).getAll());
sqlFieldsQuery = new SqlFieldsQuery("select * from person where resume like '%共和%' order by id desc limit 2,2 ");
logger.error("执行sqlFieldsQuery分页查找结果5" + cache.query(sqlFieldsQuery).getAll());
sqlFieldsQuery = new SqlFieldsQuery("select * from person where id>10 and resume like '%共和%' order by id desc limit 2,2 ");
logger.error("执行sqlFieldsQuery分页查找结果6" + cache.query(sqlFieldsQuery).getAll());
}catch (Exception e){
}
finally {
}
}
}
private Person getMaxPerson(){
IgniteCache<Long,Person> cache = Ignition.ignite().cache(CACHE_NAME);
SqlFieldsQuery query = new SqlFieldsQuery("SELECT * FROM person order by id desc limit 1");
QueryCursor<List<?>> cursor = cache.query(query);
List<List<?>> list = cursor.getAll();
logger.error(list);
Person person = (Person) list.get(0).get(1);
return person;
}
}
CacheOnlyStoreFactoryExampleUsingMysql.java
package com.github.aloxc.ignite.datagrid.store;
import com.github.aloxc.ignite.model.Person;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.SpringResource;
import org.jetbrains.annotations.Nullable;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.stereotype.Component;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* Created by Administrator on 2016/12/1.
*/
@Component
public class CacheOnlyStoreFactoryExampleUsingMysql extends JdbcDaoSupport implements CacheStore<Long,Person>,Serializable{
private final static String TABLE_NAME = "ignite_person";
private static final long serialVersionUID = -309306329114471641L;
@SpringResource(resourceName = "tybbs_user_datasource")
private transient DriverManagerDataSource tybbs_user_datasource;
@Override
public void loadCache(final IgniteBiInClosure<Long, Person> igniteBiInClosure, @Nullable Object... objects) throws CacheLoaderException {
logger.error("spring配置文件中的数据ds " + (tybbs_user_datasource != null ? tybbs_user_datasource : null));
logger.error("spring配置文件中的数据ds " + this.getDataSource());
super.setDataSource(tybbs_user_datasource);
logger.error("从数据库加载所有数据");
long start = System.currentTimeMillis();
StringBuffer sql = new StringBuffer();
sql.append("SELECT `id`,");
sql.append("`first_name`, `last_name`,`resume`");
sql.append(" FROM `").append(TABLE_NAME).append("` WHERE 1 = 1");
sql.append(" ORDER BY `id` DESC");
logger.error("spring jdbc Template 是否为空 " + (this.getJdbcTemplate() == null));
List<Person> list = this.getJdbcTemplate().query(sql.toString(), new CacheLoadOnlyStoreRowMapper<Person>());
for(Person person : list){
igniteBiInClosure.apply(person.getId(),person);
}
logger.error("从数据库加载所有数据花费" + (System.currentTimeMillis() - start) + "毫秒");
}
@Override
public void sessionEnd(boolean b) throws CacheWriterException {
}
@Override
public Person load(Long id) throws CacheLoaderException {
logger.error("从数据库加载数据" + id);
long start = System.currentTimeMillis();
StringBuffer sql = new StringBuffer();
sql.append("SELECT `id`,");
sql.append("`first_name`, `last_name`,`resume`");
sql.append(" FROM `").append(TABLE_NAME).append("` WHERE id = ?");
Person person = this.getJdbcTemplate().queryForObject(sql.toString(),new CacheLoadOnlyStoreRowMapper<Person>(),id);
logger.error("从数据库加载数据花费" + (System.currentTimeMillis() - start) + "毫秒");
return person;
}
@Override
public Map<Long, Person> loadAll(Iterable<? extends Long> iterable) throws CacheLoaderException {
return null;
}
@Override
public void write(Cache.Entry<? extends Long, ? extends Person> entry) throws CacheWriterException {
logger.error("开始更新数据到数据库中" + entry.getKey()) ;
long start = System.currentTimeMillis();
long id = entry.getKey();
Person p = entry.getValue();
StringBuffer sql = new StringBuffer();
sql.append("update `").append(TABLE_NAME).append("` set first_name = ? ").append(" , last_name = ?,resume = ? where id = ? ");
int result = this.getJdbcTemplate().update(sql.toString(),p.getFirstName(),p.getLastName(),p.getResume(),p.getId());
logger.error("更新数据到数据库中,花费" + (System.currentTimeMillis() - start) + "毫秒,影响行数 = " + result);
if(result == 0){//是新增数据,需要使用insert语句
start = System.currentTimeMillis();
sql.delete(0,sql.length() - 1);
//在CacheStore的所有方法中需要自己捕获除CacheStore接口类中的方法定义上的异常,否则可能会出现一些预料外的情况。
sql.append("insert into `").append(TABLE_NAME).append("` (`id`,`first_name`,`last_name`,`resume`) values(?,?,?,?);");
result = this.getJdbcTemplate().update(sql.toString(),p.getId(),p.getFirstName(),p.getLastName(),p.getResume());
logger.error("插入数据到数据库中,花费" + (System.currentTimeMillis() - start) + "毫秒,影响行数 = " + result);
}
}
@Override
public void writeAll(Collection<Cache.Entry<? extends Long, ? extends Person>> collection) throws CacheWriterException {
logger.error("开始更新所有数据到db中,需要更新的数据量" + collection.size());
long start = System.currentTimeMillis();
logger.error("更新所有数据到db中花费" + (System.currentTimeMillis() - start) + "毫秒");
}
@Override
public void delete(Object o) throws CacheWriterException {
}
@Override
public void deleteAll(Collection<?> collection) throws CacheWriterException {
}
}
Person.java
package com.github.aloxc.ignite.model;
import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QueryTextField;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* Person class.
*/
public class Person implements Serializable {
/** */
private static final AtomicLong ID_GEN = new AtomicLong();
private static final long serialVersionUID = 182338337431676877L;
/** Person ID (indexed). */
@QuerySqlField(index = true)
private Long id;
/** First name (not-indexed). */
@QuerySqlField
private String firstName;
/** Last name (not indexed). */
@QuerySqlField
private String lastName;
/** Resume text (create LUCENE-based TEXT index for this field). */
@QueryTextField
private String resume;
public Person() {
// No-op.
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getResume() {
return resume;
}
public void setResume(String resume) {
this.resume = resume;
}
}
CacheLoadOnlyStoreRowMapper.java
package com.github.aloxc.ignite.datagrid.store;
import com.github.aloxc.ignite.model.Person;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* Created by Administrator on 2016/12/2.
*/
public class CacheLoadOnlyStoreRowMapper<T extends Person> implements RowMapper<Person> {
private static Log logger = LogFactory.getLog(CacheLoadOnlyStoreRowMapper.class);
@Override
public Person mapRow(ResultSet rs, int i) throws SQLException {
Person person = new Person();
person.setId(rs.getLong("id"));
person.setFirstName(rs.getString("first_name"));
person.setLastName(rs.getString("last_name"));
person.setResume(rs.getString("resume"));
logger.error("从数据库加载数据" + person);
return person;
}
}
CREATE TABLE `ignite_person` (
`id` bigint(20) NOT NULL,
`first_name` varchar(50) DEFAULT NULL,
`last_name` varchar(50) DEFAULT NULL,
`resume` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
测试说明:需要把编译后的代码打包放到各ignite服务器的libs目录中,另外需要设置各ignite服务器的default-config.xml.
default-config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<context:component-scan base-package="com.github.aloxc.ignite.datagrid.store"/>
<!-- Datasource for sample in-memory H2 database. -->
<bean id="h2-example-db" class="org.h2.jdbcx.JdbcDataSource">
<property name="URL" value="jdbc:h2:tcp://localhost/mem:ExampleDb" />
<property name="user" value="sa" />
</bean>
<bean class="org.springframework.jdbc.datasource.DriverManagerDataSource" id="tybbs_user_datasource">
<property name="driverClassName" value="com.mysql.jdbc.Driver"></property>
<property name="url" value="jdbc:mysql://192.168.20.185:3306/b_user"></property>
<property name="username" value="xroot"></property>
<property name="password" value="l2lDm*xpass"></property>
</bean>
<bean id="cacheOnlyStoreFactoryExampleUsingMysql" class="com.github.aloxc.ignite.datagrid.store.CacheOnlyStoreFactoryExampleUsingMysql">
<property name="dataSource" ref="tybbs_user_datasource"></property>
</bean>
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="peerClassLoadingEnabled" value="true"/>
<property name="clientMode" value="false"/>
<property name="deploymentMode" value="SHARED"/>
<property name="marshaller">
<bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/>
</property>
<property name="cacheConfiguration" >
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="cacheMode" value="PARTITIONED"/>
<property name="writeThrough" value="true"/>
<property name="readThrough" value="true" />
<property name="name" value="cacheOnlyUsingMysql"/>
<property name="backups" value="1"/>
<property name="writeBehindEnabled" value="true" />
<property name="writeBehindFlushSize" value="3" />
<property name="writeBehindFlushFrequency" value="0" />
<property name="writeBehindBatchSize" value="10" />
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder.SingletonFactory">
<constructor-arg ref="cacheOnlyStoreFactoryExampleUsingMysql" />
</bean>
</property>
<property name="queryEntities">
<util:list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Long"/>
<property name="valueType" value="com.github.aloxc.ignite.model.Person"/>
<property name="fields">
<map>
<entry key="id" value="java.lang.Long"/>
<entry key="first_name" value="java.lang.String"/>
<entry key="last_name" value="java.lang.String"/>
</map>
</property>
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg name="field" value="id" />
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg name="field" value="first_name" />
</bean>
</list>
</property>
</bean>
</util:list>
</property>
</bean>
</list>
</property>
</bean>
</beans>
测试过程中发现一些问题。
1. 当通过缓存读取一个不存在的对象的时候会发生异常。
2. 既需要在配置文件中指定bean的字段、索引,也需要在bean中通过注解指定相关字段的查询及索引,缺一不可。
3. 尽量不要用sql的like查询操作,可能会有你预想不到的情况,这个情况主要是指本来缓存有相关数据,但是通过like取不出我们期望的数据。应该使用textQuery查询。
ps:注意下本文中的代码及配置是通过把我测试代码中一些关于公司的测试环境的配置去掉了,可能本文的配置及代码会有少部分错误,如出现错误可发邮件给我。leerohwa#gmail.com,(#替换为@)