[关闭]
@vonzhou 2016-09-25T09:04:08.000000Z 字数 44860 阅读 523

RocketMQ - 20160919

MessageQueue RocketMQ


RocketMQ 简介

2016-09-24_095423.png-36.9kB

整体架构

2016-09-24_091833.png-17.2kB

NameServer - 服务发现

轻量名称服务,无状态。

  1. public boolean initialize() {
  2. /** 从本地文件加载配置信息到内存中 */
  3. this.kvConfigManager.load();
  4. /** 启动TCP Server, 使用Netty */
  5. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  6. this.remotingExecutor =
  7. Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  8. /** 注册TCP消息处理器 */
  9. this.registerProcessor();
  10. /** 定期移除非活跃的Broker, broker channel 默认的失效时间是2m */
  11. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  12. @Override
  13. public void run() {
  14. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  15. }
  16. }, 5, 10, TimeUnit.SECONDS);
  17. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  18. @Override
  19. public void run() {
  20. NamesrvController.this.kvConfigManager.printAllPeriodically();
  21. }
  22. }, 1, 10, TimeUnit.MINUTES);
  23. return true;
  24. }

Producer - 发送消息

rocketmq-send.JPG-63kB

 Where?

路由信息来自NameServer。客户端会缓存路由信息TopicPublishInfo, 同时定期从NameServer取Topic路由信息,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。如果本地查询不到,则需要同步询问NameServer, 如果该Topic不存在,则使用default。

How ?

Producer 在得到了具体的通信地址后,发送过程就显而易见了。通过代码可以看到在选择消息队列进行发送时采用随机方式,同时和上一次发送的broker保持不同,防止热点。

  1. /**
  2. * 发送消息实现
  3. */
  4. private SendResult sendDefaultImpl(//
  5. Message msg,//
  6. final CommunicationMode communicationMode,//
  7. final SendCallback sendCallback, final long timeout//
  8. ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  9. this.makeSureStateOK();
  10. Validators.checkMessage(msg, this.defaultMQProducer);
  11. final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000;
  12. final long beginTimestamp = System.currentTimeMillis();
  13. long endTimestamp = beginTimestamp;
  14. /** 获取路由信息 */
  15. TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  16. if (topicPublishInfo != null && topicPublishInfo.ok()) {
  17. MessageQueue mq = null;
  18. Exception exception = null;
  19. SendResult sendResult = null;
  20. /** 发送失败时的重试时间 */
  21. int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed();
  22. int times = 0;
  23. String[] brokersSent = new String[timesTotal];
  24. for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {
  25. /** 上次发送选择的broker */
  26. String lastBrokerName = null == mq ? null : mq.getBrokerName();
  27. /** 选择一个queue */
  28. MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
  29. if (tmpmq != null) {
  30. mq = tmpmq;
  31. brokersSent[times] = mq.getBrokerName();
  32. try {
  33. sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
  34. endTimestamp = System.currentTimeMillis();
  35. switch (communicationMode) {
  36. case ASYNC:
  37. return null;
  38. case ONEWAY:
  39. return null;
  40. case SYNC:
  41. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  42. if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
  43. continue;
  44. }
  45. }
  46. return sendResult;
  47. default:
  48. break;
  49. }
  50. }
  51. catch (RemotingException e) {
  52. log.warn("sendKernelImpl exception", e);
  53. log.warn(msg.toString());
  54. exception = e;
  55. endTimestamp = System.currentTimeMillis();
  56. continue;
  57. }
  58. catch (MQClientException e) {
  59. log.warn("sendKernelImpl exception", e);
  60. log.warn(msg.toString());
  61. exception = e;
  62. endTimestamp = System.currentTimeMillis();
  63. continue;
  64. }
  65. catch (MQBrokerException e) {
  66. log.warn("sendKernelImpl exception", e);
  67. log.warn(msg.toString());
  68. exception = e;
  69. endTimestamp = System.currentTimeMillis();
  70. switch (e.getResponseCode()) {
  71. case ResponseCode.TOPIC_NOT_EXIST:
  72. case ResponseCode.SERVICE_NOT_AVAILABLE:
  73. case ResponseCode.SYSTEM_ERROR:
  74. case ResponseCode.NO_PERMISSION:
  75. case ResponseCode.NO_BUYER_ID:
  76. case ResponseCode.NOT_IN_CURRENT_UNIT:
  77. continue;
  78. default:
  79. if (sendResult != null) {
  80. return sendResult;
  81. }
  82. throw e;
  83. }
  84. }
  85. catch (InterruptedException e) {
  86. log.warn("sendKernelImpl exception", e);
  87. log.warn(msg.toString());
  88. throw e;
  89. }
  90. }
  91. else {
  92. break;
  93. }
  94. } // end of for
  95. if (sendResult != null) {
  96. return sendResult;
  97. }
  98. String info =
  99. String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //
  100. times, //
  101. (System.currentTimeMillis() - beginTimestamp), //
  102. msg.getTopic(),//
  103. Arrays.toString(brokersSent));
  104. info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
  105. throw new MQClientException(info, exception);
  106. }
  107. List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
  108. if (null == nsList || nsList.isEmpty()) {
  109. throw new MQClientException("No name server address, please set it."
  110. + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null);
  111. }
  112. throw new MQClientException("No route info of this topic, " + msg.getTopic()
  113. + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null);
  114. }
  115. /**
  116. * 采用 round robin 的方式选择队列,且和上次选择的broker不同
  117. */
  118. public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  119. if (lastBrokerName != null) {
  120. int index = this.sendWhichQueue.getAndIncrement();
  121. for (int i = 0; i < this.messageQueueList.size(); i++) {
  122. int pos = Math.abs(index++) % this.messageQueueList.size();
  123. MessageQueue mq = this.messageQueueList.get(pos);
  124. if (!mq.getBrokerName().equals(lastBrokerName)) {
  125. return mq;
  126. }
  127. }
  128. return null;
  129. }
  130. else {
  131. int index = this.sendWhichQueue.getAndIncrement();
  132. int pos = Math.abs(index) % this.messageQueueList.size();
  133. return this.messageQueueList.get(pos);
  134. }
  135. }
  136. /**
  137. * 构造协议消息
  138. */
  139. private SendResult sendKernelImpl(final Message msg,//
  140. final MessageQueue mq,//
  141. final CommunicationMode communicationMode,//
  142. final SendCallback sendCallback,//
  143. final long timeout) throws MQClientException, RemotingException, MQBrokerException,
  144. InterruptedException {
  145. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  146. if (null == brokerAddr) {
  147. tryToFindTopicPublishInfo(mq.getTopic());
  148. brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  149. }
  150. SendMessageContext context = null;
  151. if (brokerAddr != null) {
  152. if(this.defaultMQProducer.isSendMessageWithVIPChannel()) {
  153. brokerAddr = MixAll.brokerVIPChannel(brokerAddr);
  154. }
  155. byte[] prevBody = msg.getBody();
  156. try {
  157. int sysFlag = 0;
  158. if (this.tryToCompressMessage(msg)) {
  159. sysFlag |= MessageSysFlag.CompressedFlag;
  160. }
  161. final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  162. if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
  163. sysFlag |= MessageSysFlag.TransactionPreparedType;
  164. }
  165. /** 在发送消息前后,预留灵活处理的空间 */
  166. if (this.hasSendMessageHook()) {
  167. context = new SendMessageContext();
  168. context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  169. context.setCommunicationMode(communicationMode);
  170. context.setBornHost(this.defaultMQProducer.getClientIP());
  171. context.setBrokerAddr(brokerAddr);
  172. context.setMessage(msg);
  173. context.setMq(mq);
  174. this.executeSendMessageHookBefore(context);
  175. }
  176. /** 构造 request header */
  177. SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  178. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  179. requestHeader.setTopic(msg.getTopic());
  180. requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  181. requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  182. requestHeader.setQueueId(mq.getQueueId());
  183. requestHeader.setSysFlag(sysFlag);
  184. requestHeader.setBornTimestamp(System.currentTimeMillis());
  185. requestHeader.setFlag(msg.getFlag());
  186. requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  187. requestHeader.setReconsumeTimes(0);
  188. requestHeader.setUnitMode(this.isUnitMode());
  189. if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  190. String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
  191. if (reconsumeTimes != null) {
  192. requestHeader.setReconsumeTimes(new Integer(reconsumeTimes));
  193. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
  194. }
  195. }
  196. SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
  197. brokerAddr,// 1
  198. mq.getBrokerName(),// 2
  199. msg,// 3
  200. requestHeader,// 4
  201. timeout,// 5
  202. communicationMode,// 6
  203. sendCallback// 7
  204. );
  205. if (this.hasSendMessageHook()) {
  206. context.setSendResult(sendResult);
  207. this.executeSendMessageHookAfter(context);
  208. }
  209. return sendResult;
  210. }
  211. catch (RemotingException e) {
  212. if (this.hasSendMessageHook()) {
  213. context.setException(e);
  214. this.executeSendMessageHookAfter(context);
  215. }
  216. throw e;
  217. }
  218. catch (MQBrokerException e) {
  219. if (this.hasSendMessageHook()) {
  220. context.setException(e);
  221. this.executeSendMessageHookAfter(context);
  222. }
  223. throw e;
  224. }
  225. catch (InterruptedException e) {
  226. if (this.hasSendMessageHook()) {
  227. context.setException(e);
  228. this.executeSendMessageHookAfter(context);
  229. }
  230. throw e;
  231. }
  232. finally {
  233. msg.setBody(prevBody);
  234. }
  235. }
  236. throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
  237. }
  238. /**
  239. * 向特定 broker 发送消息
  240. * 异步采用 Netty 的 ChannelFutureListener 实现
  241. */
  242. public SendResult sendMessage(//
  243. final String addr,// 1
  244. final String brokerName,// 2
  245. final Message msg,// 3
  246. final SendMessageRequestHeader requestHeader,// 4
  247. final long timeoutMillis,// 5
  248. final CommunicationMode communicationMode,// 6
  249. final SendCallback sendCallback// 7
  250. ) throws RemotingException, MQBrokerException, InterruptedException {
  251. RemotingCommand request = null;
  252. if (sendSmartMsg) {
  253. SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
  254. request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
  255. }
  256. else {
  257. request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
  258. }
  259. request.setBody(msg.getBody());
  260. switch (communicationMode) {
  261. case ONEWAY:
  262. this.remotingClient.invokeOneway(addr, request, timeoutMillis);
  263. return null;
  264. case ASYNC:
  265. this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback);
  266. return null;
  267. case SYNC:
  268. return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
  269. default:
  270. assert false;
  271. break;
  272. }
  273. return null;
  274. }

Broker - 处理来自Producer的消息

每个producer在发送消息的时候都和对应的Broker建立了长连接,此时broker已经准备好接收Message,Broker的SendMessageProcessor.sendMessage处理消息的存储。接收到消息后,会先写入Commit Log文件(顺序写,写满了会新建一个新的文件),然后更新Consume queue文件(存储如何由topic定位到具体的消息)。

捕获2.JPG-72.1kB

  1. /**
  2. * Broker 处理 Producer发送过来的消息 - SendMessageProcessor
  3. *
  4. */
  5. private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
  6. final RemotingCommand request,//
  7. final SendMessageContext mqtraceContext,//
  8. final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
  9. final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
  10. final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
  11. response.setOpaque(request.getOpaque());
  12. if (log.isDebugEnabled()) {
  13. log.debug("receive SendMessage request command, " + request);
  14. }
  15. response.setCode(-1);
  16. super.msgCheck(ctx, requestHeader, response);
  17. if (response.getCode() != -1) {
  18. return response;
  19. }
  20. final byte[] body = request.getBody();
  21. /** 消息已经来到了该 broker,只需要知道具体是哪个 queue */
  22. int queueIdInt = requestHeader.getQueueId();
  23. TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
  24. if (queueIdInt < 0) {
  25. queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
  26. }
  27. /** 包装 Message */
  28. MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
  29. msgInner.setTopic(requestHeader.getTopic());
  30. msgInner.setBody(body);
  31. msgInner.setFlag(requestHeader.getFlag());
  32. MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
  33. msgInner.setPropertiesString(requestHeader.getProperties());
  34. msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
  35. msgInner.setQueueId(queueIdInt);
  36. msgInner.setSysFlag(sysFlag);
  37. msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
  38. msgInner.setBornHost(ctx.channel().remoteAddress());
  39. msgInner.setStoreHost(this.getStoreHost());
  40. msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
  41. if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
  42. String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  43. if (traFlag != null) {
  44. response.setCode(ResponseCode.NO_PERMISSION);
  45. response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
  46. + "] sending transaction message is forbidden");
  47. return response;
  48. }
  49. }
  50. /** 消息存储 */
  51. PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
  52. if (putMessageResult != null) {
  53. boolean sendOK = false;
  54. switch (putMessageResult.getPutMessageStatus()) {
  55. // Success
  56. case PUT_OK:
  57. sendOK = true;
  58. response.setCode(ResponseCode.SUCCESS);
  59. break;
  60. case FLUSH_DISK_TIMEOUT:
  61. response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
  62. sendOK = true;
  63. break;
  64. case FLUSH_SLAVE_TIMEOUT:
  65. response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
  66. sendOK = true;
  67. break;
  68. case SLAVE_NOT_AVAILABLE:
  69. response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
  70. sendOK = true;
  71. break;
  72. /** Failed 创建映射文件失败*/
  73. case CREATE_MAPEDFILE_FAILED:
  74. response.setCode(ResponseCode.SYSTEM_ERROR);
  75. response.setRemark("create maped file failed, please make sure OS and JDK both 64bit.");
  76. break;
  77. /** .. 其他错误 */
  78. }
  79. if (sendOK) {
  80. this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
  81. this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
  82. putMessageResult.getAppendMessageResult().getWroteBytes());
  83. this.brokerController.getBrokerStatsManager().incBrokerPutNums();
  84. if (!this.brokerController.getBrokerConfig().isHighSpeedMode()) {
  85. // For commercial
  86. int incValue =
  87. (int) Math.ceil(putMessageResult.getAppendMessageResult().getWroteBytes() / BrokerStatsManager.SIZE_PER_COUNT);
  88. this.brokerController.getBrokerStatsManager().incCommercialTopicSendTimes(requestHeader.getProducerGroup(),
  89. msgInner.getTopic(), BrokerStatsManager.StatsType.SEND_SUCCESS.toString(), incValue);
  90. this.brokerController.getBrokerStatsManager().incCommercialTopicSendSize(requestHeader.getProducerGroup(),
  91. msgInner.getTopic(), BrokerStatsManager.StatsType.SEND_SUCCESS.toString(),
  92. putMessageResult.getAppendMessageResult().getWroteBytes());
  93. }
  94. response.setRemark(null);
  95. responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
  96. responseHeader.setQueueId(queueIdInt);
  97. responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
  98. /** ACK */
  99. doResponse(ctx, request, response);
  100. if (hasSendMessageHook()) {
  101. mqtraceContext.setMsgId(responseHeader.getMsgId());
  102. mqtraceContext.setQueueId(responseHeader.getQueueId());
  103. mqtraceContext.setQueueOffset(responseHeader.getQueueOffset());
  104. }
  105. return null;
  106. }
  107. else {
  108. // For commercial
  109. this.brokerController.getBrokerStatsManager().incCommercialTopicSendTimes(requestHeader.getProducerGroup(),
  110. msgInner.getTopic(), BrokerStatsManager.StatsType.SEND_FAILURE.toString(), 1);
  111. }
  112. }
  113. else {
  114. response.setCode(ResponseCode.SYSTEM_ERROR);
  115. response.setRemark("store putMessage return null");
  116. }
  117. return response;
  118. }
  119. /**
  120. * DefaultMessageStore
  121. */
  122. public PutMessageResult putMessage(MessageExtBrokerInner msg) {
  123. if (this.shutdown) {
  124. log.warn("message store has shutdown, so putMessage is forbidden");
  125. return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
  126. }
  127. /** broker slave 不能到这里 */
  128. if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
  129. long value = this.printTimes.getAndIncrement();
  130. if ((value % 50000) == 0) { /** 控制warn log的频率*/
  131. log.warn("message store is slave mode, so putMessage is forbidden ");
  132. }
  133. return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
  134. }
  135. /** 不可写 */
  136. if (!this.runningFlags.isWriteable()) {
  137. long value = this.printTimes.getAndIncrement();
  138. if ((value % 50000) == 0) {
  139. log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
  140. }
  141. return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
  142. }
  143. else {
  144. this.printTimes.set(0);
  145. }
  146. if (msg.getTopic().length() > Byte.MAX_VALUE) {
  147. log.warn("putMessage message topic length too long " + msg.getTopic().length());
  148. return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
  149. }
  150. if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
  151. log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
  152. return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
  153. }
  154. long beginTime = this.getSystemClock().now();
  155. /**
  156. * 消息写入 commit log
  157. */
  158. PutMessageResult result = this.commitLog.putMessage(msg);
  159. long eclipseTime = this.getSystemClock().now() - beginTime;
  160. if (eclipseTime > 1000) {
  161. log.warn("putMessage not in lock eclipse time(ms) " + eclipseTime);
  162. }
  163. this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
  164. if (null == result || !result.isOk()) {
  165. this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
  166. }
  167. return result;
  168. }
  169. /**
  170. * CommitLog 写消息
  171. */
  172. public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
  173. // Set the storage time
  174. msg.setStoreTimestamp(System.currentTimeMillis());
  175. // Set the message body BODY CRC (consider the most appropriate setting
  176. // on the client)
  177. msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
  178. // Back to Results
  179. AppendMessageResult result = null;
  180. StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
  181. String topic = msg.getTopic();
  182. int queueId = msg.getQueueId();
  183. final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
  184. if (tranType == MessageSysFlag.TransactionNotType//
  185. || tranType == MessageSysFlag.TransactionCommitType) {
  186. // Delay Delivery
  187. if (msg.getDelayTimeLevel() > 0) {
  188. if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
  189. msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
  190. }
  191. topic = ScheduleMessageService.SCHEDULE_TOPIC;
  192. queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
  193. // Backup real topic, queueId
  194. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
  195. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
  196. msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
  197. msg.setTopic(topic);
  198. msg.setQueueId(queueId);
  199. }
  200. }
  201. long eclipseTimeInLock = 0;
  202. MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
  203. synchronized (this) {
  204. long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
  205. // Here settings are stored timestamp, in order to ensure an orderly global
  206. msg.setStoreTimestamp(beginLockTimestamp);
  207. if (null == mapedFile || mapedFile.isFull()) {
  208. mapedFile = this.mapedFileQueue.getLastMapedFile();
  209. }
  210. if (null == mapedFile) {
  211. log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
  212. return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
  213. }
  214. /** 写消息到内存映射区 */
  215. result = mapedFile.appendMessage(msg, this.appendMessageCallback);
  216. switch (result.getStatus()) {
  217. case PUT_OK:
  218. break;
  219. case END_OF_FILE:
  220. // Create a new file, re-write the message
  221. mapedFile = this.mapedFileQueue.getLastMapedFile();
  222. if (null == mapedFile) {
  223. // XXX: warn and notify me
  224. log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
  225. return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
  226. }
  227. result = mapedFile.appendMessage(msg, this.appendMessageCallback);
  228. break;
  229. case MESSAGE_SIZE_EXCEEDED:
  230. case PROPERTIES_SIZE_EXCEEDED:
  231. return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
  232. case UNKNOWN_ERROR:
  233. return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
  234. default:
  235. return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
  236. }
  237. eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
  238. } // end of synchronized
  239. if (eclipseTimeInLock > 500) {
  240. log.warn("[NOTIFYME]putMessage in lock eclipse time(ms) " + eclipseTimeInLock);
  241. }
  242. PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
  243. // Statistics
  244. storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
  245. storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
  246. GroupCommitRequest request = null;
  247. /** Synchronization flush 如果是同步刷盘策略 */
  248. if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
  249. GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
  250. if (msg.isWaitStoreMsgOK()) {
  251. request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
  252. service.putRequest(request);
  253. boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
  254. if (!flushOK) {
  255. log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
  256. + " client address: " + msg.getBornHostString());
  257. putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
  258. }
  259. } else {
  260. service.wakeup();
  261. }
  262. } else { /** Asynchronous flush 异步刷盘只需要唤醒对应的线程*/
  263. this.flushCommitLogService.wakeup();
  264. }
  265. /** Synchronous write double 如果当前broker是master,则同步双写*/
  266. if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
  267. HAService service = this.defaultMessageStore.getHaService();
  268. if (msg.isWaitStoreMsgOK()) {
  269. // Determine whether to wait
  270. if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
  271. if (null == request) {
  272. request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
  273. }
  274. service.putRequest(request);
  275. service.getWaitNotifyObject().wakeupAll();
  276. boolean flushOK =
  277. // TODO
  278. request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
  279. if (!flushOK) {
  280. log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
  281. + msg.getTags() + " client address: " + msg.getBornHostString());
  282. putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
  283. }
  284. } else {
  285. // Slave problem, Tell the producer, slave not available
  286. putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
  287. }
  288. }
  289. }
  290. return putMessageResult;
  291. }
  292. /**
  293. * 获得一个映射区,如果不存在则创建
  294. */
  295. public MapedFile getLastMapedFile(final long startOffset, boolean needCreate) {
  296. long createOffset = -1;
  297. MapedFile mapedFileLast = null;
  298. {
  299. this.readWriteLock.readLock().lock();
  300. if (this.mapedFiles.isEmpty()) {
  301. createOffset = startOffset - (startOffset % this.mapedFileSize);
  302. }
  303. else { /** 获得最近正在使用的 mapped file */
  304. mapedFileLast = this.mapedFiles.get(this.mapedFiles.size() - 1);
  305. }
  306. this.readWriteLock.readLock().unlock();
  307. }
  308. if (mapedFileLast != null && mapedFileLast.isFull()) { /** 正在使用的 mapped file 满了 */
  309. createOffset = mapedFileLast.getFileFromOffset() + this.mapedFileSize;
  310. }
  311. if (createOffset != -1 && needCreate) {
  312. /** 可以看到这里文件命名的规则,以 起始offset作为文件名,便于定位索引 */
  313. String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
  314. String nextNextFilePath =
  315. this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mapedFileSize);
  316. MapedFile mapedFile = null;
  317. if (this.allocateMapedFileService != null) {
  318. mapedFile =
  319. this.allocateMapedFileService.putRequestAndReturnMapedFile(nextFilePath,
  320. nextNextFilePath, this.mapedFileSize);
  321. }
  322. else {
  323. try {
  324. mapedFile = new MapedFile(nextFilePath, this.mapedFileSize);
  325. }
  326. catch (IOException e) {
  327. log.error("create mapedfile exception", e);
  328. }
  329. }
  330. if (mapedFile != null) {
  331. this.readWriteLock.writeLock().lock();
  332. if (this.mapedFiles.isEmpty()) { /** 标识为该message queue的第一个创建的mapped file */
  333. mapedFile.setFirstCreateInQueue(true);
  334. }
  335. this.mapedFiles.add(mapedFile); /** 加入链表 */
  336. this.readWriteLock.writeLock().unlock();
  337. }
  338. return mapedFile;
  339. }
  340. return mapedFileLast;
  341. }
  342. /**
  343. * 把消息追加到 mapped file 中,采用回调的方式
  344. */
  345. public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
  346. assert msg != null;
  347. assert cb != null;
  348. int currentPos = this.wrotePostion.get();
  349. if (currentPos < this.fileSize) {
  350. ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
  351. byteBuffer.position(currentPos);
  352. AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
  353. this.wrotePostion.addAndGet(result.getWroteBytes());
  354. this.storeTimestamp = result.getStoreTimestamp();
  355. return result;
  356. }
  357. log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "
  358. + this.fileSize);
  359. return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  360. }
  361. /**
  362. * 消息追加实现,同时维护映射关系 (topic-queueId, offset)供消费者使用
  363. */
  364. public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {
  365. // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
  366. MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;
  367. // PHY OFFSET
  368. long wroteOffset = fileFromOffset + byteBuffer.position();
  369. String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(), wroteOffset);
  370. // Record ConsumeQueue information
  371. String key = msgInner.getTopic() + "-" + msgInner.getQueueId();
  372. Long queueOffset = CommitLog.this.topicQueueTable.get(key);
  373. if (null == queueOffset) {
  374. queueOffset = 0L; /** 表示第一条 */
  375. CommitLog.this.topicQueueTable.put(key, queueOffset);
  376. }
  377. /** Transaction messages that require special handling 事务类型的消息 */
  378. final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
  379. switch (tranType) {
  380. // Prepared and Rollback message is not consumed, will not enter the consumer queue
  381. case MessageSysFlag.TransactionPreparedType:
  382. case MessageSysFlag.TransactionRollbackType:
  383. queueOffset = 0L;
  384. break;
  385. case MessageSysFlag.TransactionNotType:
  386. case MessageSysFlag.TransactionCommitType:
  387. default:
  388. break;
  389. }
  390. /** Serialize message */
  391. final byte[] propertiesData =
  392. msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
  393. if (propertiesData.length > Short.MAX_VALUE) {
  394. log.warn("putMessage message properties length too long. length={}", propertiesData.length);
  395. return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
  396. }
  397. final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;
  398. final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
  399. final int topicLength = topicData == null ? 0 : topicData.length;
  400. final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
  401. final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
  402. // Exceeds the maximum message
  403. if (msgLen > this.maxMessageSize) {
  404. CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
  405. + ", maxMessageSize: " + this.maxMessageSize);
  406. return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
  407. }
  408. // Determines whether there is sufficient free space
  409. if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
  410. this.resetMsgStoreItemMemory(maxBlank);
  411. // 1 TOTALSIZE
  412. this.msgStoreItemMemory.putInt(maxBlank);
  413. // 2 MAGICCODE
  414. this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);
  415. // 3 The remaining space may be any value
  416. // Here the length of the specially set maxBlank
  417. byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
  418. return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
  419. queueOffset);
  420. }
  421. /** Initialization of storage space 根据协议格式写入 */
  422. this.resetMsgStoreItemMemory(msgLen);
  423. // 1 TOTALSIZE
  424. this.msgStoreItemMemory.putInt(msgLen);
  425. // 2 MAGICCODE
  426. this.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode);
  427. // 3 BODYCRC
  428. this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
  429. // 4 QUEUEID
  430. this.msgStoreItemMemory.putInt(msgInner.getQueueId());
  431. // 5 FLAG
  432. this.msgStoreItemMemory.putInt(msgInner.getFlag());
  433. // 6 QUEUEOFFSET
  434. this.msgStoreItemMemory.putLong(queueOffset);
  435. // 7 PHYSICALOFFSET
  436. this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
  437. // 8 SYSFLAG
  438. this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
  439. // 9 BORNTIMESTAMP
  440. this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
  441. // 10 BORNHOST
  442. this.msgStoreItemMemory.put(msgInner.getBornHostBytes());
  443. // 11 STORETIMESTAMP
  444. this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
  445. // 12 STOREHOSTADDRESS
  446. this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
  447. // 13 RECONSUMETIMES
  448. this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
  449. // 14 Prepared Transaction Offset
  450. this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
  451. // 15 BODY
  452. this.msgStoreItemMemory.putInt(bodyLength);
  453. if (bodyLength > 0)
  454. this.msgStoreItemMemory.put(msgInner.getBody());
  455. // 16 TOPIC
  456. this.msgStoreItemMemory.put((byte) topicLength);
  457. this.msgStoreItemMemory.put(topicData);
  458. // 17 PROPERTIES
  459. this.msgStoreItemMemory.putShort(propertiesLength);
  460. if (propertiesLength > 0)
  461. this.msgStoreItemMemory.put(propertiesData);
  462. // Write messages to the queue buffer
  463. byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
  464. AppendMessageResult result =
  465. new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(),queueOffset);
  466. switch (tranType) {
  467. case MessageSysFlag.TransactionPreparedType:
  468. case MessageSysFlag.TransactionRollbackType:
  469. break;
  470. case MessageSysFlag.TransactionNotType: /** 非事务类型 或 事务提交 就更新queue offset,下一个消息存储的位置 */
  471. case MessageSysFlag.TransactionCommitType:
  472. // The next update ConsumeQueue information
  473. CommitLog.this.topicQueueTable.put(key, ++queueOffset);
  474. break;
  475. default:
  476. break;
  477. }
  478. return result;
  479. }

Broker - 处理来自Consumer的pull request

2016-09-25_154155.png-50.3kB

  1. /**
  2. * broker处理消息pull请求
  3. */
  4. private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
  5. boolean brokerAllowSuspend) throws RemotingCommandException {
  6. RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
  7. final PullMessageResponseHeader responseHeader =
  8. (PullMessageResponseHeader) response.readCustomHeader();
  9. final PullMessageRequestHeader requestHeader =
  10. (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
  11. response.setOpaque(request.getOpaque());
  12. SubscriptionGroupConfig subscriptionGroupConfig =
  13. this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
  14. requestHeader.getConsumerGroup());
  15. if (null == subscriptionGroupConfig) {
  16. response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
  17. response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
  18. + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
  19. return response;
  20. }
  21. if (!subscriptionGroupConfig.isConsumeEnable()) {
  22. response.setCode(ResponseCode.NO_PERMISSION);
  23. response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
  24. return response;
  25. }
  26. final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
  27. final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
  28. final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
  29. final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
  30. /** */
  31. TopicConfig topicConfig =
  32. this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
  33. if (null == topicConfig) {
  34. log.error("the topic " + requestHeader.getTopic() + " not exist, consumer: "
  35. + RemotingHelper.parseChannelRemoteAddr(channel));
  36. response.setCode(ResponseCode.TOPIC_NOT_EXIST);
  37. response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
  38. + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
  39. return response;
  40. }
  41. if (!PermName.isReadable(topicConfig.getPerm())) {
  42. response.setCode(ResponseCode.NO_PERMISSION);
  43. response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
  44. return response;
  45. }
  46. if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
  47. String errorInfo =
  48. "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :"
  49. + requestHeader.getTopic() + " topicConfig.readQueueNums: "
  50. + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
  51. log.warn(errorInfo);
  52. response.setCode(ResponseCode.SYSTEM_ERROR);
  53. response.setRemark(errorInfo);
  54. return response;
  55. }
  56. SubscriptionData subscriptionData = null;
  57. if (hasSubscriptionFlag) {
  58. try {
  59. subscriptionData =
  60. FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(),
  61. requestHeader.getTopic(), requestHeader.getSubscription());
  62. }
  63. catch (Exception e) {
  64. log.warn("parse the consumer's subscription[{}] failed, group: {}",
  65. requestHeader.getSubscription(),//
  66. requestHeader.getConsumerGroup());
  67. response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
  68. response.setRemark("parse the consumer's subscription failed");
  69. return response;
  70. }
  71. }
  72. else {
  73. ConsumerGroupInfo consumerGroupInfo =
  74. this.brokerController.getConsumerManager().getConsumerGroupInfo(
  75. requestHeader.getConsumerGroup());
  76. if (null == consumerGroupInfo) {
  77. log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
  78. response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
  79. response.setRemark("the consumer's group info not exist"
  80. + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
  81. return response;
  82. }
  83. if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
  84. && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
  85. response.setCode(ResponseCode.NO_PERMISSION);
  86. response.setRemark("the consumer group[" + requestHeader.getConsumerGroup()
  87. + "] can not consume by broadcast way");
  88. return response;
  89. }
  90. subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
  91. if (null == subscriptionData) {
  92. log.warn("the consumer's subscription not exist, group: {}", requestHeader.getConsumerGroup());
  93. response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
  94. response.setRemark("the consumer's subscription not exist"
  95. + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
  96. return response;
  97. }
  98. if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
  99. log.warn("the broker's subscription is not latest, group: {} {}",
  100. requestHeader.getConsumerGroup(), subscriptionData.getSubString());
  101. response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
  102. response.setRemark("the consumer's subscription not latest");
  103. return response;
  104. }
  105. }
  106. final GetMessageResult getMessageResult =
  107. this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
  108. requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(),
  109. requestHeader.getMaxMsgNums(), subscriptionData);
  110. if (getMessageResult != null) {
  111. response.setRemark(getMessageResult.getStatus().name());
  112. responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
  113. responseHeader.setMinOffset(getMessageResult.getMinOffset());
  114. responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
  115. if (getMessageResult.isSuggestPullingFromSlave()) {
  116. responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig
  117. .getWhichBrokerWhenConsumeSlowly());
  118. log.warn(
  119. "consume message too slow, suggest pulling from slave. group={}, topic={}, subString={}, queueId={}, offset={}",
  120. requestHeader.getConsumerGroup(), requestHeader.getTopic(),
  121. subscriptionData.getSubString(), requestHeader.getQueueId(),
  122. requestHeader.getQueueOffset());
  123. }
  124. else {
  125. responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
  126. }
  127. switch (getMessageResult.getStatus()) {
  128. case FOUND:
  129. response.setCode(ResponseCode.SUCCESS);
  130. if (this.hasConsumeMessageHook()) {
  131. ConsumeMessageContext context = new ConsumeMessageContext();
  132. context.setConsumerGroup(requestHeader.getConsumerGroup());
  133. context.setTopic(requestHeader.getTopic());
  134. context.setClientHost(RemotingHelper.parseChannelRemoteAddr(channel));
  135. context.setStoreHost(this.brokerController.getBrokerAddr());
  136. context.setQueueId(requestHeader.getQueueId());
  137. final SocketAddress storeHost =
  138. new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(),
  139. brokerController.getNettyServerConfig().getListenPort());
  140. Map<String, Long> messageIds =
  141. this.brokerController.getMessageStore().getMessageIds(requestHeader.getTopic(),
  142. requestHeader.getQueueId(), requestHeader.getQueueOffset(),
  143. requestHeader.getQueueOffset() + getMessageResult.getMessageCount(),
  144. storeHost);
  145. context.setMessageIds(messageIds);
  146. context.setBodyLength(getMessageResult.getBufferTotalSize()
  147. / getMessageResult.getMessageCount());
  148. this.executeConsumeMessageHookBefore(context);
  149. }
  150. break;
  151. case MESSAGE_WAS_REMOVING:
  152. response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
  153. break;
  154. case NO_MATCHED_LOGIC_QUEUE:
  155. case NO_MESSAGE_IN_QUEUE:
  156. if (0 != requestHeader.getQueueOffset()) {
  157. response.setCode(ResponseCode.PULL_OFFSET_MOVED);
  158. log.info(
  159. "the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",//
  160. requestHeader.getQueueOffset(), //
  161. getMessageResult.getNextBeginOffset(), //
  162. requestHeader.getTopic(),//
  163. requestHeader.getQueueId(),//
  164. requestHeader.getConsumerGroup()//
  165. );
  166. }
  167. else {
  168. response.setCode(ResponseCode.PULL_NOT_FOUND);
  169. }
  170. break;
  171. case NO_MATCHED_MESSAGE:
  172. response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
  173. break;
  174. case OFFSET_FOUND_NULL:
  175. response.setCode(ResponseCode.PULL_NOT_FOUND);
  176. break;
  177. case OFFSET_OVERFLOW_BADLY:
  178. response.setCode(ResponseCode.PULL_OFFSET_MOVED);
  179. log.info("the request offset: " + requestHeader.getQueueOffset()
  180. + " over flow badly, broker max offset: " + getMessageResult.getMaxOffset()
  181. + ", consumer: " + channel.remoteAddress());
  182. break;
  183. case OFFSET_OVERFLOW_ONE:
  184. response.setCode(ResponseCode.PULL_NOT_FOUND);
  185. break;
  186. case OFFSET_TOO_SMALL:
  187. response.setCode(ResponseCode.PULL_OFFSET_MOVED);
  188. log.info(
  189. "the request offset too small. group={}, topic={}, requestOffset{}, brokerMinOffset={}, clientIp={}",
  190. requestHeader.getConsumerGroup(), requestHeader.getTopic(),
  191. requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress());
  192. break;
  193. default:
  194. assert false;
  195. break;
  196. }
  197. switch (response.getCode()) {
  198. case ResponseCode.SUCCESS:
  199. this.brokerController.getBrokerStatsManager().incGroupGetNums(
  200. requestHeader.getConsumerGroup(), requestHeader.getTopic(),
  201. getMessageResult.getMessageCount());
  202. this.brokerController.getBrokerStatsManager().incGroupGetSize(
  203. requestHeader.getConsumerGroup(), requestHeader.getTopic(),
  204. getMessageResult.getBufferTotalSize());
  205. this.brokerController.getBrokerStatsManager().incBrokerGetNums(
  206. getMessageResult.getMessageCount());
  207. if(this.brokerController.getBrokerConfig().isTransferMsgByHeap()){
  208. final byte[] r = this.readGetMessageResult(getMessageResult);
  209. response.setBody(r);
  210. }
  211. else {
  212. try {
  213. FileRegion fileRegion =
  214. new ManyMessageTransfer(response.encodeHeader(getMessageResult
  215. .getBufferTotalSize()), getMessageResult);
  216. channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
  217. @Override
  218. public void operationComplete(ChannelFuture future) throws Exception {
  219. getMessageResult.release();
  220. if (!future.isSuccess()) {
  221. log.error(
  222. "transfer many message by pagecache failed, " + channel.remoteAddress(),
  223. future.cause());
  224. }
  225. }
  226. });
  227. } catch (Throwable e) {
  228. log.error("transfer many message by pagecache exception", e);
  229. getMessageResult.release();
  230. }
  231. response = null;
  232. }
  233. break;
  234. case ResponseCode.PULL_NOT_FOUND:
  235. if (brokerAllowSuspend && hasSuspendFlag) {
  236. long pollingTimeMills = suspendTimeoutMillisLong;
  237. if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
  238. pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
  239. }
  240. PullRequest pullRequest =
  241. new PullRequest(request, channel, pollingTimeMills, this.brokerController
  242. .getMessageStore().now(), requestHeader.getQueueOffset());
  243. this.brokerController.getPullRequestHoldService().suspendPullRequest(
  244. requestHeader.getTopic(), requestHeader.getQueueId(), pullRequest);
  245. response = null;
  246. break;
  247. }
  248. case ResponseCode.PULL_RETRY_IMMEDIATELY:
  249. break;
  250. case ResponseCode.PULL_OFFSET_MOVED:
  251. if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
  252. || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
  253. MessageQueue mq = new MessageQueue();
  254. mq.setTopic(requestHeader.getTopic());
  255. mq.setQueueId(requestHeader.getQueueId());
  256. mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
  257. OffsetMovedEvent event = new OffsetMovedEvent();
  258. event.setConsumerGroup(requestHeader.getConsumerGroup());
  259. event.setMessageQueue(mq);
  260. event.setOffsetRequest(requestHeader.getQueueOffset());
  261. event.setOffsetNew(getMessageResult.getNextBeginOffset());
  262. this.generateOffsetMovedEvent(event);
  263. log.warn(
  264. "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
  265. requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(),
  266. event.getOffsetNew(), responseHeader.getSuggestWhichBrokerId());
  267. }
  268. else {
  269. responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
  270. response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
  271. log.warn(
  272. "PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
  273. requestHeader.getTopic(), requestHeader.getConsumerGroup(),
  274. requestHeader.getQueueOffset(), responseHeader.getSuggestWhichBrokerId());
  275. }
  276. break;
  277. default:
  278. assert false;
  279. }
  280. }
  281. else {
  282. response.setCode(ResponseCode.SYSTEM_ERROR);
  283. response.setRemark("store getMessage return null");
  284. }
  285. boolean storeOffsetEnable = brokerAllowSuspend;
  286. storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
  287. storeOffsetEnable = storeOffsetEnable
  288. && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
  289. if (storeOffsetEnable) {
  290. this.brokerController.getConsumerOffsetManager().commitOffset(requestHeader.getConsumerGroup(),
  291. requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
  292. }
  293. return response;
  294. }
  295. public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
  296. final SubscriptionData subscriptionData) {
  297. if (this.shutdown) {
  298. log.warn("message store has shutdown, so getMessage is forbidden");
  299. return null;
  300. }
  301. if (!this.runningFlags.isReadable()) {
  302. log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
  303. return null;
  304. }
  305. long beginTime = this.getSystemClock().now();
  306. GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
  307. long nextBeginOffset = offset;
  308. long minOffset = 0;
  309. long maxOffset = 0;
  310. GetMessageResult getResult = new GetMessageResult();
  311. final long maxOffsetPy = this.commitLog.getMaxOffset();
  312. /** consume queue 逻辑消费队列, 也就是消息的索引信息 */
  313. ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
  314. if (consumeQueue != null) {
  315. minOffset = consumeQueue.getMinOffsetInQuque();
  316. maxOffset = consumeQueue.getMaxOffsetInQuque();
  317. {
  318. // 各种临界情况处理
  319. }
  320. else { /** 根据拉取消息的offset,得到索引对应的 byte buffer */
  321. SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
  322. if (bufferConsumeQueue != null) {
  323. try {
  324. status = GetMessageStatus.NO_MATCHED_MESSAGE;
  325. long nextPhyFileStartOffset = Long.MIN_VALUE;
  326. long maxPhyOffsetPulling = 0;
  327. int i = 0;
  328. final int MaxFilterMessageCount = 16000;
  329. final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
  330. for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {
  331. /** 根据 consume queue 的 index 协议格式,读取字段: CommitLogOffset-Size-TagsCode */
  332. long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 实际便宜
  333. int sizePy = bufferConsumeQueue.getByteBuffer().getInt();// 实际大小
  334. long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
  335. maxPhyOffsetPulling = offsetPy;
  336. if (nextPhyFileStartOffset != Long.MIN_VALUE) {
  337. if (offsetPy < nextPhyFileStartOffset)
  338. continue;
  339. }
  340. boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
  341. if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
  342. isInDisk)) {
  343. break;
  344. }
  345. /** 如果需要在 broker 进行消息过滤 */
  346. if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
  347. /** 具体从 commit log 中读取消息 */
  348. SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
  349. if (selectResult != null) {
  350. this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
  351. getResult.addMessage(selectResult); /** 构造返回的结果 */
  352. status = GetMessageStatus.FOUND;
  353. nextPhyFileStartOffset = Long.MIN_VALUE;
  354. }
  355. else {
  356. if (getResult.getBufferTotalSize() == 0) {
  357. status = GetMessageStatus.MESSAGE_WAS_REMOVING;
  358. }
  359. nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
  360. }
  361. }
  362. else {
  363. if (getResult.getBufferTotalSize() == 0) {
  364. status = GetMessageStatus.NO_MATCHED_MESSAGE;
  365. }
  366. }
  367. }
  368. if (diskFallRecorded) {
  369. long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
  370. brokerStatsManager.recordDiskFallBehind(group, topic, queueId, fallBehind);
  371. }
  372. nextBeginOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
  373. long diff = maxOffsetPy - maxPhyOffsetPulling;
  374. long memory =
  375. (long) (StoreUtil.TotalPhysicalMemorySize * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
  376. getResult.setSuggestPullingFromSlave(diff > memory);
  377. }
  378. finally {
  379. bufferConsumeQueue.release();
  380. }
  381. }
  382. else {
  383. status = GetMessageStatus.OFFSET_FOUND_NULL;
  384. nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
  385. log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
  386. + maxOffset + ", but access logic queue failed.");
  387. }
  388. }
  389. }
  390. else {
  391. status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
  392. nextBeginOffset = nextOffsetCorrection(offset, 0);
  393. }
  394. if (GetMessageStatus.FOUND == status) {
  395. this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
  396. }
  397. else {
  398. this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
  399. }
  400. long eclipseTime = this.getSystemClock().now() - beginTime;
  401. this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
  402. getResult.setStatus(status);
  403. getResult.setNextBeginOffset(nextBeginOffset);
  404. getResult.setMaxOffset(maxOffset);
  405. getResult.setMinOffset(minOffset);
  406. return getResult;
  407. }
  408. /**
  409. * (topic + queueId) -> ConsumeQueue
  410. * 在 broker 启动的时候会load所有topic的consume queue信息,运行时不存在则创建
  411. */
  412. public ConsumeQueue findConsumeQueue(String topic, int queueId) {
  413. ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
  414. if (null == map) {
  415. ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
  416. ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
  417. if (oldMap != null) { /** 再次确认 */
  418. map = oldMap;
  419. }
  420. else {
  421. map = newMap;
  422. }
  423. }
  424. ConsumeQueue logic = map.get(queueId); // 一个逻辑队列
  425. if (null == logic) {
  426. ConsumeQueue newLogic = new ConsumeQueue(//
  427. topic,//
  428. queueId,//
  429. StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//
  430. this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//
  431. this);
  432. ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
  433. if (oldLogic != null) {
  434. logic = oldLogic;
  435. }
  436. else {
  437. logic = newLogic;
  438. }
  439. }
  440. return logic;
  441. }
  442. /**
  443. * 根据起始索引得到消息索引对应的内存映射区
  444. */
  445. public SelectMapedBufferResult getIndexBuffer(final long startIndex) {
  446. int mapedFileSize = this.mapedFileSize;
  447. long offset = startIndex * CQStoreUnitSize; // 每个索引项的大小是20B
  448. if (offset >= this.getMinLogicOffset()) {
  449. MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset);
  450. if (mapedFile != null) {
  451. SelectMapedBufferResult result = mapedFile.selectMapedBuffer((int) (offset % mapedFileSize));
  452. return result;
  453. }
  454. }
  455. return null;
  456. }

Consumer - 拉取消息

  1. 根据topic获得路由信息(本地缓存/from nameserver)
  2. 根据路由信息得到 MessageQueue set(相应broker上的所有 read queues)
  3. 构造pull message消息依次轮询各个MessageQueue
  4. 消费消息

Broker - 存储特点

2016-09-25_155207.png-17.8kB

RocketMQ的消息采用顺序写到commitlog文件,然后利用consume queue文件作为索引。RocketMQ采用零拷贝mmap+write的方式来回应Consumer的请求,RocketMQ宣称大部分请求都会在Page Cache层得到满足,所以消息过多不会因为磁盘读使得性能下降,这里自己的理解是,在64bit机器下,虚存地址空间(vm_area_struct)不是问题,所以相关的文件都会被映射到内存中(有定期删除文件的操作),即使此刻不在内存,操作系统也会因为缺页异常进行换入,虽然地址空间不是问题,但是一个进程映射文件的个数(/proc/sys/vm/max_map_count)是有限的,所以可能在这里发生OOM。

rocketmq-broker-index.jpg-25.1kB

Broker存储目录(默认路径是 $HOME/store):

捕获.JPG-53.2kB

Broker - 刷盘实现

Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。异步而言,只是唤醒对应的线程,不保证执行的时机。

rocketmq-flush.JPG-37kB

顺序消息是如何保证的?

需要业务层自己决定哪些消息应该顺序到达,然后发送的时候通过规则(hash)映射到同一个队列,因为没有谁比业务自己更加知道关于消息顺序的特点。这样的顺序是相对顺序,局部顺序,因为发送方只保证把这些消息顺序的发送到broker上的同一队列,但是不保证其他Producer也会发送消息到那个队列,所以需要Consumer在拉到消息后做一些过滤。

消息过滤

类似于重复数据删除技术(Data Deduplication),可以在源端做,也可以在目的端实现,就是网络和存储的权衡,如果在Broker端做消息过滤就需要逐一比对consume queue 的 tagsCode 字段(hashcode),如果符合则传输给消费者,因为是 hashcode,所以存在误判,需要在 Consumer 接收到消息后进行字符串级别的过滤,确保准确性。

小结

参考

RocketMQ Github

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注