[关闭]
@aloxc 2017-12-05T06:25:48.000000Z 字数 5715 阅读 508

我们一起学ignite之memcache协议支持(可兼容任意第三方协议)

一起学 ignite 源码分析


转载请注明本文作者:aloxc,邮箱leerohwa#gmail.com[#更换为@]

如题,ignite还支持memcache协议,具体就是ignite启动的时候启动了一个绑定到11211--11310这100个端口中的一个的nio服务。

这100个端口是这样选择的,初始是从11211端口起一个nio服务,如果起的这个nio服务没启动成功起来就往之前的端口加1后再次起nio服务,至到成功的起了一个nio服务。这个服务可以兼容任何第三方协议,只需要实现一个接口类org.apache.ignite.internal.processors.rest.client.message.GridClientMessage即可。比如本文讲解的memcache协议,就是实现该接口的类
org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage
分析了这些,那我们可不可以实现一套自己的协议后使用ignite呢,答案是肯定的,我们只需要修改ignite里面解析rest请求的相关代码即可,这个留待以后再写一篇文章。

我们也可以去掉相关rest接口相关nio服务的启动代码,直接不启动rest服务
相关代码(org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProtocol中)如下

  1. public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException {
  2. assert hnd != null;
  3. ConnectorConfiguration cfg = ctx.config().getConnectorConfiguration();
  4. assert cfg != null;
  5. lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);
  6. GridNioParser parser = new GridTcpRestParser(false);
  7. try {
  8. host = resolveRestTcpHost(ctx.config());
  9. SSLContext sslCtx = null;
  10. if (cfg.isSslEnabled()) {
  11. Factory<SSLContext> igniteFactory = ctx.config().getSslContextFactory();
  12. Factory<SSLContext> factory = cfg.getSslFactory();
  13. // This factory deprecated and will be removed.
  14. GridSslContextFactory depFactory = cfg.getSslContextFactory();
  15. if (factory == null && depFactory == null && igniteFactory == null)
  16. // Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log.
  17. throw new SSLException("SSL is enabled, but SSL context factory is not specified.");
  18. if (factory != null)
  19. sslCtx = factory.create();
  20. else if (depFactory != null)
  21. sslCtx = depFactory.createSslContext();
  22. else
  23. sslCtx = igniteFactory.create();
  24. }
  25. int startPort = cfg.getPort();
  26. int portRange = cfg.getPortRange();
  27. int lastPort = portRange == 0 ? startPort : startPort + portRange - 1;
  28. for (int port0 = startPort; port0 <= lastPort; port0++) {
  29. if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) {
  30. port = port0;
  31. if (log.isInfoEnabled())
  32. log.info(startInfo());
  33. return;
  34. }
  35. }
  36. IgniteUtils.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " +
  37. "[firstPort=" + cfg.getPort() + ", lastPort=" + lastPort + ", host=" + host + ']');
  38. }
  39. catch (SSLException e) {
  40. IgniteUtils.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),
  41. "Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory is " +
  42. "properly configured.");
  43. }
  44. catch (IOException e) {
  45. IgniteUtils.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),
  46. "Failed to start " + name() + " protocol on port " + port + ". " +
  47. "Check restTcpHost configuration property.");
  48. }
  49. }

如果说我们线上服务器,不希望ignite启动后还兼容memcache命令,可以把这个方法中行lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);后面的代码注释掉。

也可以处理下我们代码中关于引用到org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage的地方。

下面看下解析memcache相关协议的方法,见类org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestParse.java

  1. //相应解码
  2. private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state)
  3. throws IOException, IgniteCheckedException {
  4. assert state.packetType() == GridClientPacketType.MEMCACHE;
  5. assert state.packet() != null;
  6. assert state.packet() instanceof GridMemcachedMessage;
  7. GridMemcachedMessage req = (GridMemcachedMessage)state.packet();
  8. ByteArrayOutputStream tmp = state.buffer();
  9. int i = state.index();
  10. while (buf.remaining() > 0) {
  11. byte b = buf.get();
  12. if (i == 0)
  13. req.requestFlag(b);
  14. else if (i == 1)
  15. req.operationCode(b);
  16. else if (i == 2 || i == 3) {
  17. tmp.write(b);
  18. if (i == 3) {
  19. req.keyLength(IgniteUtils.bytesToShort(tmp.toByteArray(), 0));
  20. tmp.reset();
  21. }
  22. }
  23. else if (i == 4)
  24. req.extrasLength(b);
  25. else if (i >= 8 && i <= 11) {
  26. tmp.write(b);
  27. if (i == 11) {
  28. req.totalLength(IgniteUtils.bytesToInt(tmp.toByteArray(), 0));
  29. tmp.reset();
  30. }
  31. }
  32. else if (i >= 12 && i <= 15) {
  33. tmp.write(b);
  34. if (i == 15) {
  35. req.opaque(tmp.toByteArray());
  36. tmp.reset();
  37. }
  38. }
  39. else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) {
  40. tmp.write(b);
  41. if (i == HDR_LEN + req.extrasLength() - 1) {
  42. req.extras(tmp.toByteArray());
  43. tmp.reset();
  44. }
  45. }
  46. else if (i >= HDR_LEN + req.extrasLength() &&
  47. i < HDR_LEN + req.extrasLength() + req.keyLength()) {
  48. tmp.write(b);
  49. if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) {
  50. req.key(tmp.toByteArray());
  51. tmp.reset();
  52. }
  53. }
  54. else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() &&
  55. i < HDR_LEN + req.totalLength()) {
  56. tmp.write(b);
  57. if (i == HDR_LEN + req.totalLength() - 1) {
  58. req.value(tmp.toByteArray());
  59. tmp.reset();
  60. }
  61. }
  62. if (i == HDR_LEN + req.totalLength() - 1)
  63. // Assembled the packet.
  64. return assemble(ses, req);
  65. i++;
  66. }
  67. state.index(i);
  68. return null;
  69. }
  70. //响应编码
  71. private ByteBuffer encodeMemcache(GridMemcachedMessage msg) throws IgniteCheckedException {
  72. GridByteArrayList res = new GridByteArrayList(HDR_LEN);
  73. int keyLen = 0;
  74. int keyFlags = 0;
  75. if (msg.key() != null) {
  76. ByteArrayOutputStream rawKey = new ByteArrayOutputStream();
  77. keyFlags = encodeObj(msg.key(), rawKey);
  78. msg.key(rawKey.toByteArray());
  79. keyLen = rawKey.size();
  80. }
  81. int dataLen = 0;
  82. int valFlags = 0;
  83. if (msg.value() != null) {
  84. ByteArrayOutputStream rawVal = new ByteArrayOutputStream();
  85. valFlags = encodeObj(msg.value(), rawVal);
  86. msg.value(rawVal.toByteArray());
  87. dataLen = rawVal.size();
  88. }
  89. int flagsLen = 0;
  90. if (msg.addFlags())// || keyFlags > 0 || valFlags > 0)
  91. flagsLen = FLAGS_LENGTH;
  92. res.add(MEMCACHE_RES_FLAG);
  93. res.add(msg.operationCode());
  94. // Cast is required due to packet layout.
  95. res.add((short)keyLen);
  96. // Cast is required due to packet layout.
  97. res.add((byte)flagsLen);
  98. // Data type is always 0x00.
  99. res.add((byte)0x00);
  100. res.add((short)msg.status());
  101. res.add(keyLen + flagsLen + dataLen);
  102. res.add(msg.opaque(), 0, msg.opaque().length);
  103. // CAS, unused.
  104. res.add(0L);
  105. assert res.size() == HDR_LEN;
  106. if (flagsLen > 0) {
  107. res.add((short) keyFlags);
  108. res.add((short) valFlags);
  109. }
  110. assert msg.key() == null || msg.key() instanceof byte[];
  111. assert msg.value() == null || msg.value() instanceof byte[];
  112. if (keyLen > 0)
  113. res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length);
  114. if (dataLen > 0)
  115. res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length);
  116. return ByteBuffer.wrap(res.entireArray());
  117. }

其实本人认为ignite这么做兼容别的命令有点画蛇添足,虽然有很多公司在使用memcache,并且也愿意使用ignite的情况下也不会简单的更换到ignite并且还用memcache命令连接ignite,虽然直接替换下memcache地址,接着使用memcache命令连接ignite这样成本较小。我个人不希望ignite这么做,.

x

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注