@aloxc
2017-12-05T06:25:48.000000Z
字数 5715
阅读 508
一起学
ignite
源码分析
转载请注明本文作者:aloxc,邮箱leerohwa#gmail.com[#更换为@]
如题,ignite还支持memcache协议,具体就是ignite启动的时候启动了一个绑定到11211--11310这100个端口中的一个的nio服务。
这100个端口是这样选择的,初始是从11211端口起一个nio服务,如果起的这个nio服务没启动成功起来就往之前的端口加1后再次起nio服务,至到成功的起了一个nio服务。这个服务可以兼容任何第三方协议,只需要实现一个接口类org.apache.ignite.internal.processors.rest.client.message.GridClientMessage
即可。比如本文讲解的memcache协议,就是实现该接口的类
org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage
。
分析了这些,那我们可不可以实现一套自己的协议后使用ignite呢,答案是肯定的,我们只需要修改ignite里面解析rest请求的相关代码即可,这个留待以后再写一篇文章。
我们也可以去掉相关rest接口相关nio服务的启动代码,直接不启动rest服务
相关代码(org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProtocol
中)如下
public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException {
assert hnd != null;
ConnectorConfiguration cfg = ctx.config().getConnectorConfiguration();
assert cfg != null;
lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);
GridNioParser parser = new GridTcpRestParser(false);
try {
host = resolveRestTcpHost(ctx.config());
SSLContext sslCtx = null;
if (cfg.isSslEnabled()) {
Factory<SSLContext> igniteFactory = ctx.config().getSslContextFactory();
Factory<SSLContext> factory = cfg.getSslFactory();
// This factory deprecated and will be removed.
GridSslContextFactory depFactory = cfg.getSslContextFactory();
if (factory == null && depFactory == null && igniteFactory == null)
// Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log.
throw new SSLException("SSL is enabled, but SSL context factory is not specified.");
if (factory != null)
sslCtx = factory.create();
else if (depFactory != null)
sslCtx = depFactory.createSslContext();
else
sslCtx = igniteFactory.create();
}
int startPort = cfg.getPort();
int portRange = cfg.getPortRange();
int lastPort = portRange == 0 ? startPort : startPort + portRange - 1;
for (int port0 = startPort; port0 <= lastPort; port0++) {
if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) {
port = port0;
if (log.isInfoEnabled())
log.info(startInfo());
return;
}
}
IgniteUtils.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " +
"[firstPort=" + cfg.getPort() + ", lastPort=" + lastPort + ", host=" + host + ']');
}
catch (SSLException e) {
IgniteUtils.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),
"Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory is " +
"properly configured.");
}
catch (IOException e) {
IgniteUtils.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),
"Failed to start " + name() + " protocol on port " + port + ". " +
"Check restTcpHost configuration property.");
}
}
如果说我们线上服务器,不希望ignite启动后还兼容memcache命令,可以把这个方法中行lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);
后面的代码注释掉。
也可以处理下我们代码中关于引用到org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage
的地方。
下面看下解析memcache相关协议的方法,见类org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestParse.java
//相应解码
private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state)
throws IOException, IgniteCheckedException {
assert state.packetType() == GridClientPacketType.MEMCACHE;
assert state.packet() != null;
assert state.packet() instanceof GridMemcachedMessage;
GridMemcachedMessage req = (GridMemcachedMessage)state.packet();
ByteArrayOutputStream tmp = state.buffer();
int i = state.index();
while (buf.remaining() > 0) {
byte b = buf.get();
if (i == 0)
req.requestFlag(b);
else if (i == 1)
req.operationCode(b);
else if (i == 2 || i == 3) {
tmp.write(b);
if (i == 3) {
req.keyLength(IgniteUtils.bytesToShort(tmp.toByteArray(), 0));
tmp.reset();
}
}
else if (i == 4)
req.extrasLength(b);
else if (i >= 8 && i <= 11) {
tmp.write(b);
if (i == 11) {
req.totalLength(IgniteUtils.bytesToInt(tmp.toByteArray(), 0));
tmp.reset();
}
}
else if (i >= 12 && i <= 15) {
tmp.write(b);
if (i == 15) {
req.opaque(tmp.toByteArray());
tmp.reset();
}
}
else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) {
tmp.write(b);
if (i == HDR_LEN + req.extrasLength() - 1) {
req.extras(tmp.toByteArray());
tmp.reset();
}
}
else if (i >= HDR_LEN + req.extrasLength() &&
i < HDR_LEN + req.extrasLength() + req.keyLength()) {
tmp.write(b);
if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) {
req.key(tmp.toByteArray());
tmp.reset();
}
}
else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() &&
i < HDR_LEN + req.totalLength()) {
tmp.write(b);
if (i == HDR_LEN + req.totalLength() - 1) {
req.value(tmp.toByteArray());
tmp.reset();
}
}
if (i == HDR_LEN + req.totalLength() - 1)
// Assembled the packet.
return assemble(ses, req);
i++;
}
state.index(i);
return null;
}
//响应编码
private ByteBuffer encodeMemcache(GridMemcachedMessage msg) throws IgniteCheckedException {
GridByteArrayList res = new GridByteArrayList(HDR_LEN);
int keyLen = 0;
int keyFlags = 0;
if (msg.key() != null) {
ByteArrayOutputStream rawKey = new ByteArrayOutputStream();
keyFlags = encodeObj(msg.key(), rawKey);
msg.key(rawKey.toByteArray());
keyLen = rawKey.size();
}
int dataLen = 0;
int valFlags = 0;
if (msg.value() != null) {
ByteArrayOutputStream rawVal = new ByteArrayOutputStream();
valFlags = encodeObj(msg.value(), rawVal);
msg.value(rawVal.toByteArray());
dataLen = rawVal.size();
}
int flagsLen = 0;
if (msg.addFlags())// || keyFlags > 0 || valFlags > 0)
flagsLen = FLAGS_LENGTH;
res.add(MEMCACHE_RES_FLAG);
res.add(msg.operationCode());
// Cast is required due to packet layout.
res.add((short)keyLen);
// Cast is required due to packet layout.
res.add((byte)flagsLen);
// Data type is always 0x00.
res.add((byte)0x00);
res.add((short)msg.status());
res.add(keyLen + flagsLen + dataLen);
res.add(msg.opaque(), 0, msg.opaque().length);
// CAS, unused.
res.add(0L);
assert res.size() == HDR_LEN;
if (flagsLen > 0) {
res.add((short) keyFlags);
res.add((short) valFlags);
}
assert msg.key() == null || msg.key() instanceof byte[];
assert msg.value() == null || msg.value() instanceof byte[];
if (keyLen > 0)
res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length);
if (dataLen > 0)
res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length);
return ByteBuffer.wrap(res.entireArray());
}
其实本人认为ignite这么做兼容别的命令有点画蛇添足,虽然有很多公司在使用memcache,并且也愿意使用ignite的情况下也不会简单的更换到ignite并且还用memcache命令连接ignite,虽然直接替换下memcache地址,接着使用memcache命令连接ignite这样成本较小。我个人不希望ignite这么做,.