@aloxc
2017-12-05T06:25:48.000000Z
字数 5715
阅读 563
一起学 ignite 源码分析
转载请注明本文作者:aloxc,邮箱leerohwa#gmail.com[#更换为@]
如题,ignite还支持memcache协议,具体就是ignite启动的时候启动了一个绑定到11211--11310这100个端口中的一个的nio服务。
这100个端口是这样选择的,初始是从11211端口起一个nio服务,如果起的这个nio服务没启动成功起来就往之前的端口加1后再次起nio服务,至到成功的起了一个nio服务。这个服务可以兼容任何第三方协议,只需要实现一个接口类org.apache.ignite.internal.processors.rest.client.message.GridClientMessage即可。比如本文讲解的memcache协议,就是实现该接口的类
org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage。
分析了这些,那我们可不可以实现一套自己的协议后使用ignite呢,答案是肯定的,我们只需要修改ignite里面解析rest请求的相关代码即可,这个留待以后再写一篇文章。
我们也可以去掉相关rest接口相关nio服务的启动代码,直接不启动rest服务
相关代码(org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProtocol中)如下
public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException {assert hnd != null;ConnectorConfiguration cfg = ctx.config().getConnectorConfiguration();assert cfg != null;lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);GridNioParser parser = new GridTcpRestParser(false);try {host = resolveRestTcpHost(ctx.config());SSLContext sslCtx = null;if (cfg.isSslEnabled()) {Factory<SSLContext> igniteFactory = ctx.config().getSslContextFactory();Factory<SSLContext> factory = cfg.getSslFactory();// This factory deprecated and will be removed.GridSslContextFactory depFactory = cfg.getSslContextFactory();if (factory == null && depFactory == null && igniteFactory == null)// Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log.throw new SSLException("SSL is enabled, but SSL context factory is not specified.");if (factory != null)sslCtx = factory.create();else if (depFactory != null)sslCtx = depFactory.createSslContext();elsesslCtx = igniteFactory.create();}int startPort = cfg.getPort();int portRange = cfg.getPortRange();int lastPort = portRange == 0 ? startPort : startPort + portRange - 1;for (int port0 = startPort; port0 <= lastPort; port0++) {if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) {port = port0;if (log.isInfoEnabled())log.info(startInfo());return;}}IgniteUtils.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " +"[firstPort=" + cfg.getPort() + ", lastPort=" + lastPort + ", host=" + host + ']');}catch (SSLException e) {IgniteUtils.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),"Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory is " +"properly configured.");}catch (IOException e) {IgniteUtils.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),"Failed to start " + name() + " protocol on port " + port + ". " +"Check restTcpHost configuration property.");}}
如果说我们线上服务器,不希望ignite启动后还兼容memcache命令,可以把这个方法中行lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);后面的代码注释掉。
也可以处理下我们代码中关于引用到org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage的地方。
下面看下解析memcache相关协议的方法,见类org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestParse.java
//相应解码private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state)throws IOException, IgniteCheckedException {assert state.packetType() == GridClientPacketType.MEMCACHE;assert state.packet() != null;assert state.packet() instanceof GridMemcachedMessage;GridMemcachedMessage req = (GridMemcachedMessage)state.packet();ByteArrayOutputStream tmp = state.buffer();int i = state.index();while (buf.remaining() > 0) {byte b = buf.get();if (i == 0)req.requestFlag(b);else if (i == 1)req.operationCode(b);else if (i == 2 || i == 3) {tmp.write(b);if (i == 3) {req.keyLength(IgniteUtils.bytesToShort(tmp.toByteArray(), 0));tmp.reset();}}else if (i == 4)req.extrasLength(b);else if (i >= 8 && i <= 11) {tmp.write(b);if (i == 11) {req.totalLength(IgniteUtils.bytesToInt(tmp.toByteArray(), 0));tmp.reset();}}else if (i >= 12 && i <= 15) {tmp.write(b);if (i == 15) {req.opaque(tmp.toByteArray());tmp.reset();}}else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) {tmp.write(b);if (i == HDR_LEN + req.extrasLength() - 1) {req.extras(tmp.toByteArray());tmp.reset();}}else if (i >= HDR_LEN + req.extrasLength() &&i < HDR_LEN + req.extrasLength() + req.keyLength()) {tmp.write(b);if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) {req.key(tmp.toByteArray());tmp.reset();}}else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() &&i < HDR_LEN + req.totalLength()) {tmp.write(b);if (i == HDR_LEN + req.totalLength() - 1) {req.value(tmp.toByteArray());tmp.reset();}}if (i == HDR_LEN + req.totalLength() - 1)// Assembled the packet.return assemble(ses, req);i++;}state.index(i);return null;}//响应编码private ByteBuffer encodeMemcache(GridMemcachedMessage msg) throws IgniteCheckedException {GridByteArrayList res = new GridByteArrayList(HDR_LEN);int keyLen = 0;int keyFlags = 0;if (msg.key() != null) {ByteArrayOutputStream rawKey = new ByteArrayOutputStream();keyFlags = encodeObj(msg.key(), rawKey);msg.key(rawKey.toByteArray());keyLen = rawKey.size();}int dataLen = 0;int valFlags = 0;if (msg.value() != null) {ByteArrayOutputStream rawVal = new ByteArrayOutputStream();valFlags = encodeObj(msg.value(), rawVal);msg.value(rawVal.toByteArray());dataLen = rawVal.size();}int flagsLen = 0;if (msg.addFlags())// || keyFlags > 0 || valFlags > 0)flagsLen = FLAGS_LENGTH;res.add(MEMCACHE_RES_FLAG);res.add(msg.operationCode());// Cast is required due to packet layout.res.add((short)keyLen);// Cast is required due to packet layout.res.add((byte)flagsLen);// Data type is always 0x00.res.add((byte)0x00);res.add((short)msg.status());res.add(keyLen + flagsLen + dataLen);res.add(msg.opaque(), 0, msg.opaque().length);// CAS, unused.res.add(0L);assert res.size() == HDR_LEN;if (flagsLen > 0) {res.add((short) keyFlags);res.add((short) valFlags);}assert msg.key() == null || msg.key() instanceof byte[];assert msg.value() == null || msg.value() instanceof byte[];if (keyLen > 0)res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length);if (dataLen > 0)res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length);return ByteBuffer.wrap(res.entireArray());}
其实本人认为ignite这么做兼容别的命令有点画蛇添足,虽然有很多公司在使用memcache,并且也愿意使用ignite的情况下也不会简单的更换到ignite并且还用memcache命令连接ignite,虽然直接替换下memcache地址,接着使用memcache命令连接ignite这样成本较小。我个人不希望ignite这么做,.
