[关闭]
@aloxc 2017-12-05T06:34:20.000000Z 字数 6979 阅读 585

我们一起学ignite之CacheInterceptor

一起学 ignite 拦截器 源码分析


如题,ignite的cache也有拦截功能,其接口定义为CacheInterceptor,设置缓存的拦截处理器后我们就可以对缓存数据操作前后加上一些特定的逻辑,这个功能就是缓存拦截器。

ignite的缓存默认是没有添加任何拦截功能,不在操作缓存的前后做任何处理逻辑,当我们需要添加一些功能的时候,可以定义自己的拦截器,ignite默认为我们提供了一个拦截适配器,CacheInterceptorAdapter,代码如下:

  1. public class CacheInterceptorAdapter<K, V> implements CacheInterceptor<K, V> {
  2. private static final long serialVersionUID = 0L;
  3. @Nullable @Override public V onGet(K key, V val) {
  4. return val;
  5. }
  6. @Nullable @Override public V onBeforePut(Cache.Entry<K, V> entry, V newVal) {
  7. return newVal;
  8. }
  9. @Override public void onAfterPut(Cache.Entry<K, V> entry) {
  10. // No-op.
  11. }
  12. @Nullable @Override public IgniteBiTuple<Boolean, V> onBeforeRemove(Cache.Entry<K, V> entry) {
  13. return new IgniteBiTuple<>(false, entry.getValue());
  14. }
  15. @Override public void onAfterRemove(Cache.Entry<K, V> entry) {
  16. // No-op.
  17. }
  18. }

可以看到默认的拦截适配器没有做任何处理。
我们可以自己定义个一拦截器,可以在里面添加下对缓存操作的日志。如下

  1. package com.github.aloxc.ignite.benchmark;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import org.apache.ignite.Ignite;
  5. import org.apache.ignite.IgniteCache;
  6. import org.apache.ignite.Ignition;
  7. import org.apache.ignite.cache.CacheInterceptor;
  8. import org.apache.ignite.cache.CacheMemoryMode;
  9. import org.apache.ignite.cache.CacheMode;
  10. import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
  11. import org.apache.ignite.configuration.CacheConfiguration;
  12. import org.apache.ignite.configuration.DeploymentMode;
  13. import org.apache.ignite.configuration.IgniteConfiguration;
  14. import org.apache.ignite.lang.IgniteBiTuple;
  15. import org.jetbrains.annotations.Nullable;
  16. import javax.cache.Cache;
  17. import java.sql.Timestamp;
  18. import java.util.Random;
  19. /**
  20. * Created by Administrator on 2016/12/27.
  21. */
  22. public class BenchmarkUserInterceptor implements CacheInterceptor<Integer,BenchmarkUser> {
  23. private static Log logger = LogFactory.getLog(BenchmarkUserInterceptor.class);
  24. @Nullable
  25. @Override
  26. public BenchmarkUser onGet(Integer key, @Nullable BenchmarkUser value) {
  27. logger.error(Thread.currentThread().getStackTrace()[1] + "被调用!key = " + key + ",value = " + value);
  28. return value;
  29. }
  30. @Nullable
  31. @Override
  32. public BenchmarkUser onBeforePut(Cache.Entry<Integer, BenchmarkUser> entry, BenchmarkUser newVal) {
  33. logger.error(Thread.currentThread().getStackTrace()[1] + "被调用!entry = " + entry + ",newVal = " + newVal);
  34. return newVal;
  35. }
  36. @Override
  37. public void onAfterPut(Cache.Entry<Integer, BenchmarkUser> entry) {
  38. logger.error(Thread.currentThread().getStackTrace()[1] + "被调用!entry = " + entry);
  39. }
  40. @Nullable
  41. @Override
  42. public IgniteBiTuple<Boolean, BenchmarkUser> onBeforeRemove(Cache.Entry<Integer, BenchmarkUser> entry) {
  43. logger.error(Thread.currentThread().getStackTrace()[1] + "被调用! entry = " + entry);
  44. return new IgniteBiTuple<>(false, entry.getValue());
  45. }
  46. @Override
  47. public void onAfterRemove(Cache.Entry<Integer, BenchmarkUser> entry) {
  48. logger.error(Thread.currentThread().getStackTrace()[1] + "被调用! entry = " + entry);
  49. }
  50. private static final String CACHE_NAME = "benchmark4";
  51. private static Ignite ignite;
  52. private static IgniteCache<Integer, BenchmarkUser> cache;
  53. public static void main(String[] args){
  54. logger.error("开始插入测试数据");
  55. try {
  56. IgniteConfiguration cfg = new IgniteConfiguration();
  57. cfg.setClientMode(false);
  58. CacheConfiguration cacheConfiguration = new CacheConfiguration<String, BenchmarkUser>();
  59. cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);//分区
  60. cacheConfiguration.setBackups(0);//关闭备份
  61. cacheConfiguration.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
  62. cacheConfiguration.setOffHeapMaxMemory(524 * 1000 * 1000);//限制堆外内存大小4G
  63. cacheConfiguration.setEvictionPolicy(new LruEvictionPolicy(100001));
  64. cacheConfiguration.setStartSize(3 * 1000 * 1000);//初始化缓存容量大小,类似于hashmap的size
  65. cacheConfiguration.setSwapEnabled(false);//禁用交换存储
  66. cacheConfiguration.setInterceptor(new BenchmarkUserInterceptor());
  67. cfg.setCacheConfiguration(cacheConfiguration);
  68. cfg.setDeploymentMode(DeploymentMode.SHARED);
  69. cfg.setPeerClassLoadingEnabled(true);
  70. ignite = Ignition.start(cfg);
  71. cache = ignite.getOrCreateCache(CACHE_NAME);
  72. logger.error("资源初始化完毕");
  73. Random r = new Random();
  74. for (int id = 0; id < 3; id++) {
  75. BenchmarkUser user = new BenchmarkUser();
  76. user.setId(id);
  77. user.setName("user-" + id);
  78. user.setCommentCount(id);
  79. user.setComposeCount(id);
  80. user.setReplyCount(id);
  81. user.setIp("ip" + id);
  82. user.setLoginCount(id);
  83. user.setPermission(r.nextBoolean());
  84. user.setLoginTime(new Timestamp(System.currentTimeMillis()));
  85. user.setRegisterTime(new Timestamp(System.currentTimeMillis()));
  86. cache.put(id,user);
  87. cache.remove(id);
  88. }
  89. } catch (Throwable e) {
  90. logger.error("发生异常",e);
  91. } finally {
  92. logger.error("程序运行完毕");
  93. // ignite.destroyCache(CACHE_NAME);
  94. ignite.close();
  95. System.exit(1);
  96. }
  97. }
  98. }

允许程序后通过日志中可以看到一些内容,

  1. onBeforePut(BenchmarkUserInterceptor.java:39)被调用!entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=null],newVal = {"id":0,"name":"user-0","ip":"ip0","loginCount":0,"composeCount":0,"replyCount":0,"commentCount":0,"permission":true,"loginTime":1482823782805,"registerTime":1482823782805}
  2. onAfterPut(BenchmarkUserInterceptor.java:45)被调用!entry = CacheLazyEntry [key=null, val={"id":0,"name":"user-0","ip":"ip0","loginCount":0,"composeCount":0,"replyCount":0,"commentCount":0,"permission":true,"loginTime":1482823782805,"registerTime":1482823782805}, keepBinary=false, updateCntr=1]
  3. onBeforeRemove(BenchmarkUserInterceptor.java:51)被调用! entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=null]
  4. onAfterRemove(BenchmarkUserInterceptor.java:57)被调用! entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=2]
  5. onBeforePut(BenchmarkUserInterceptor.java:39)被调用!entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=null],newVal = {"id":1,"name":"user-1","ip":"ip1","loginCount":1,"composeCount":1,"replyCount":1,"commentCount":1,"permission":true,"loginTime":1482823783157,"registerTime":1482823783157}
  6. onAfterPut(BenchmarkUserInterceptor.java:45)被调用!entry = CacheLazyEntry [key=null, val={"id":1,"name":"user-1","ip":"ip1","loginCount":1,"composeCount":1,"replyCount":1,"commentCount":1,"permission":true,"loginTime":1482823783157,"registerTime":1482823783157}, keepBinary=false, updateCntr=1]
  7. onBeforeRemove(BenchmarkUserInterceptor.java:51)被调用! entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=null]
  8. onAfterRemove(BenchmarkUserInterceptor.java:57)被调用! entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=2]
  9. onBeforePut(BenchmarkUserInterceptor.java:39)被调用!entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=null],newVal = {"id":2,"name":"user-2","ip":"ip2","loginCount":2,"composeCount":2,"replyCount":2,"commentCount":2,"permission":false,"loginTime":1482823783161,"registerTime":1482823783161}
  10. onAfterPut(BenchmarkUserInterceptor.java:45)被调用!entry = CacheLazyEntry [key=null, val={"id":2,"name":"user-2","ip":"ip2","loginCount":2,"composeCount":2,"replyCount":2,"commentCount":2,"permission":false,"loginTime":1482823783161,"registerTime":1482823783161}, keepBinary=false, updateCntr=1]
  11. onBeforeRemove(BenchmarkUserInterceptor.java:51)被调用! entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=null]
  12. onAfterRemove(BenchmarkUserInterceptor.java:57)被调用! entry = CacheLazyEntry [key=null, val=null, keepBinary=false, updateCntr=2]

从日志中看到,已经执行我们的拦截逻辑(打印操作日志)了,并且从日中还看到了CacheLazyEntry,通过查看源码我们可以得到一些方法,
getPartitionUpdateCounter():key的更新次数,
getValue():类型为我们cache中保存的数据类型,其值为当前操作的value,

x

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注