@vonzhou
2016-09-25T09:04:08.000000Z
字数 44860
阅读 523
MessageQueue RocketMQ


轻量名称服务,无状态。
public boolean initialize() {/** 从本地文件加载配置信息到内存中 */this.kvConfigManager.load();/** 启动TCP Server, 使用Netty */this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));/** 注册TCP消息处理器 */this.registerProcessor();/** 定期移除非活跃的Broker, broker channel 默认的失效时间是2m */this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);return true;}
路由信息来自NameServer。客户端会缓存路由信息TopicPublishInfo, 同时定期从NameServer取Topic路由信息,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。如果本地查询不到,则需要同步询问NameServer, 如果该Topic不存在,则使用default。
Producer 在得到了具体的通信地址后,发送过程就显而易见了。通过代码可以看到在选择消息队列进行发送时采用随机方式,同时和上一次发送的broker保持不同,防止热点。
/*** 发送消息实现*/private SendResult sendDefaultImpl(//Message msg,//final CommunicationMode communicationMode,//final SendCallback sendCallback, final long timeout//) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000;final long beginTimestamp = System.currentTimeMillis();long endTimestamp = beginTimestamp;/** 获取路由信息 */TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;/** 发送失败时的重试时间 */int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed();int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {/** 上次发送选择的broker */String lastBrokerName = null == mq ? null : mq.getBrokerName();/** 选择一个queue */MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);if (tmpmq != null) {mq = tmpmq;brokersSent[times] = mq.getBrokerName();try {sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);endTimestamp = System.currentTimeMillis();switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}}catch (RemotingException e) {log.warn("sendKernelImpl exception", e);log.warn(msg.toString());exception = e;endTimestamp = System.currentTimeMillis();continue;}catch (MQClientException e) {log.warn("sendKernelImpl exception", e);log.warn(msg.toString());exception = e;endTimestamp = System.currentTimeMillis();continue;}catch (MQBrokerException e) {log.warn("sendKernelImpl exception", e);log.warn(msg.toString());exception = e;endTimestamp = System.currentTimeMillis();switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}}catch (InterruptedException e) {log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}}else {break;}} // end of forif (sendResult != null) {return sendResult;}String info =String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //times, //(System.currentTimeMillis() - beginTimestamp), //msg.getTopic(),//Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);throw new MQClientException(info, exception);}List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null == nsList || nsList.isEmpty()) {throw new MQClientException("No name server address, please set it."+ FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null);}throw new MQClientException("No route info of this topic, " + msg.getTopic()+ FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null);}/*** 采用 round robin 的方式选择队列,且和上次选择的broker不同*/public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName != null) {int index = this.sendWhichQueue.getAndIncrement();for (int i = 0; i < this.messageQueueList.size(); i++) {int pos = Math.abs(index++) % this.messageQueueList.size();MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return null;}else {int index = this.sendWhichQueue.getAndIncrement();int pos = Math.abs(index) % this.messageQueueList.size();return this.messageQueueList.get(pos);}}/*** 构造协议消息*/private SendResult sendKernelImpl(final Message msg,//final MessageQueue mq,//final CommunicationMode communicationMode,//final SendCallback sendCallback,//final long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException {String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {if(this.defaultMQProducer.isSendMessageWithVIPChannel()) {brokerAddr = MixAll.brokerVIPChannel(brokerAddr);}byte[] prevBody = msg.getBody();try {int sysFlag = 0;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.CompressedFlag;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TransactionPreparedType;}/** 在发送消息前后,预留灵活处理的空间 */if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);this.executeSendMessageHookBefore(context);}/** 构造 request header */SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(new Integer(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}}SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//brokerAddr,// 1mq.getBrokerName(),// 2msg,// 3requestHeader,// 4timeout,// 5communicationMode,// 6sendCallback// 7);if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;}catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;}catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;}catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;}finally {msg.setBody(prevBody);}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}/*** 向特定 broker 发送消息* 异步采用 Netty 的 ChannelFutureListener 实现*/public SendResult sendMessage(//final String addr,// 1final String brokerName,// 2final Message msg,// 3final SendMessageRequestHeader requestHeader,// 4final long timeoutMillis,// 5final CommunicationMode communicationMode,// 6final SendCallback sendCallback// 7) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = null;if (sendSmartMsg) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);}else {request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}request.setBody(msg.getBody());switch (communicationMode) {case ONEWAY:this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback);return null;case SYNC:return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);default:assert false;break;}return null;}
每个producer在发送消息的时候都和对应的Broker建立了长连接,此时broker已经准备好接收Message,Broker的SendMessageProcessor.sendMessage处理消息的存储。接收到消息后,会先写入Commit Log文件(顺序写,写满了会新建一个新的文件),然后更新Consume queue文件(存储如何由topic定位到具体的消息)。
/*** Broker 处理 Producer发送过来的消息 - SendMessageProcessor**/private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //final RemotingCommand request,//final SendMessageContext mqtraceContext,//final SendMessageRequestHeader requestHeader) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();response.setOpaque(request.getOpaque());if (log.isDebugEnabled()) {log.debug("receive SendMessage request command, " + request);}response.setCode(-1);super.msgCheck(ctx, requestHeader, response);if (response.getCode() != -1) {return response;}final byte[] body = request.getBody();/** 消息已经来到了该 broker,只需要知道具体是哪个 queue */int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();}/** 包装 Message */MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(sysFlag);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return response;}}/** 消息存储 */PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);if (putMessageResult != null) {boolean sendOK = false;switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:sendOK = true;response.setCode(ResponseCode.SUCCESS);break;case FLUSH_DISK_TIMEOUT:response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);sendOK = true;break;case FLUSH_SLAVE_TIMEOUT:response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);sendOK = true;break;case SLAVE_NOT_AVAILABLE:response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);sendOK = true;break;/** Failed 创建映射文件失败*/case CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("create maped file failed, please make sure OS and JDK both 64bit.");break;/** .. 其他错误 */}if (sendOK) {this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());this.brokerController.getBrokerStatsManager().incBrokerPutNums();if (!this.brokerController.getBrokerConfig().isHighSpeedMode()) {// For commercialint incValue =(int) Math.ceil(putMessageResult.getAppendMessageResult().getWroteBytes() / BrokerStatsManager.SIZE_PER_COUNT);this.brokerController.getBrokerStatsManager().incCommercialTopicSendTimes(requestHeader.getProducerGroup(),msgInner.getTopic(), BrokerStatsManager.StatsType.SEND_SUCCESS.toString(), incValue);this.brokerController.getBrokerStatsManager().incCommercialTopicSendSize(requestHeader.getProducerGroup(),msgInner.getTopic(), BrokerStatsManager.StatsType.SEND_SUCCESS.toString(),putMessageResult.getAppendMessageResult().getWroteBytes());}response.setRemark(null);responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());responseHeader.setQueueId(queueIdInt);responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());/** ACK */doResponse(ctx, request, response);if (hasSendMessageHook()) {mqtraceContext.setMsgId(responseHeader.getMsgId());mqtraceContext.setQueueId(responseHeader.getQueueId());mqtraceContext.setQueueOffset(responseHeader.getQueueOffset());}return null;}else {// For commercialthis.brokerController.getBrokerStatsManager().incCommercialTopicSendTimes(requestHeader.getProducerGroup(),msgInner.getTopic(), BrokerStatsManager.StatsType.SEND_FAILURE.toString(), 1);}}else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");}return response;}/*** DefaultMessageStore*/public PutMessageResult putMessage(MessageExtBrokerInner msg) {if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}/** broker slave 不能到这里 */if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) { /** 控制warn log的频率*/log.warn("message store is slave mode, so putMessage is forbidden ");}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}/** 不可写 */if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}else {this.printTimes.set(0);}if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}long beginTime = this.getSystemClock().now();/*** 消息写入 commit log*/PutMessageResult result = this.commitLog.putMessage(msg);long eclipseTime = this.getSystemClock().now() - beginTime;if (eclipseTime > 1000) {log.warn("putMessage not in lock eclipse time(ms) " + eclipseTime);}this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}return result;}/*** CommitLog 写消息*/public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TransactionNotType//|| tranType == MessageSysFlag.TransactionCommitType) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();synchronized (this) {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();// Here settings are stored timestamp, in order to ensure an orderly globalmsg.setStoreTimestamp(beginLockTimestamp);if (null == mapedFile || mapedFile.isFull()) {mapedFile = this.mapedFileQueue.getLastMapedFile();}if (null == mapedFile) {log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}/** 写消息到内存映射区 */result = mapedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// Create a new file, re-write the messagemapedFile = this.mapedFileQueue.getLastMapedFile();if (null == mapedFile) {// XXX: warn and notify melog.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mapedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} // end of synchronizedif (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock eclipse time(ms) " + eclipseTimeInLock);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());GroupCommitRequest request = null;/** Synchronization flush 如果是同步刷盘策略 */if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (msg.isWaitStoreMsgOK()) {request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()+ " client address: " + msg.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}} else { /** Asynchronous flush 异步刷盘只需要唤醒对应的线程*/this.flushCommitLogService.wakeup();}/** Synchronous write double 如果当前broker是master,则同步双写*/if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (msg.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {if (null == request) {request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());}service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK =// TODOrequest.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "+ msg.getTags() + " client address: " + msg.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}} else {// Slave problem, Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}return putMessageResult;}/*** 获得一个映射区,如果不存在则创建*/public MapedFile getLastMapedFile(final long startOffset, boolean needCreate) {long createOffset = -1;MapedFile mapedFileLast = null;{this.readWriteLock.readLock().lock();if (this.mapedFiles.isEmpty()) {createOffset = startOffset - (startOffset % this.mapedFileSize);}else { /** 获得最近正在使用的 mapped file */mapedFileLast = this.mapedFiles.get(this.mapedFiles.size() - 1);}this.readWriteLock.readLock().unlock();}if (mapedFileLast != null && mapedFileLast.isFull()) { /** 正在使用的 mapped file 满了 */createOffset = mapedFileLast.getFileFromOffset() + this.mapedFileSize;}if (createOffset != -1 && needCreate) {/** 可以看到这里文件命名的规则,以 起始offset作为文件名,便于定位索引 */String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);String nextNextFilePath =this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mapedFileSize);MapedFile mapedFile = null;if (this.allocateMapedFileService != null) {mapedFile =this.allocateMapedFileService.putRequestAndReturnMapedFile(nextFilePath,nextNextFilePath, this.mapedFileSize);}else {try {mapedFile = new MapedFile(nextFilePath, this.mapedFileSize);}catch (IOException e) {log.error("create mapedfile exception", e);}}if (mapedFile != null) {this.readWriteLock.writeLock().lock();if (this.mapedFiles.isEmpty()) { /** 标识为该message queue的第一个创建的mapped file */mapedFile.setFirstCreateInQueue(true);}this.mapedFiles.add(mapedFile); /** 加入链表 */this.readWriteLock.writeLock().unlock();}return mapedFile;}return mapedFileLast;}/*** 把消息追加到 mapped file 中,采用回调的方式*/public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {assert msg != null;assert cb != null;int currentPos = this.wrotePostion.get();if (currentPos < this.fileSize) {ByteBuffer byteBuffer = this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);this.wrotePostion.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "+ this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}/*** 消息追加实现,同时维护映射关系 (topic-queueId, offset)供消费者使用*/public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;// PHY OFFSETlong wroteOffset = fileFromOffset + byteBuffer.position();String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(), wroteOffset);// Record ConsumeQueue informationString key = msgInner.getTopic() + "-" + msgInner.getQueueId();Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {queueOffset = 0L; /** 表示第一条 */CommitLog.this.topicQueueTable.put(key, queueOffset);}/** Transaction messages that require special handling 事务类型的消息 */final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the consumer queuecase MessageSysFlag.TransactionPreparedType:case MessageSysFlag.TransactionRollbackType:queueOffset = 0L;break;case MessageSysFlag.TransactionNotType:case MessageSysFlag.TransactionCommitType:default:break;}/** Serialize message */final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);if (propertiesData.length > Short.MAX_VALUE) {log.warn("putMessage message properties length too long. length={}", propertiesData.length);return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);}final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);final int topicLength = topicData == null ? 0 : topicData.length;final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);// Exceeds the maximum messageif (msgLen > this.maxMessageSize) {CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength+ ", maxMessageSize: " + this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);}// Determines whether there is sufficient free spaceif ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.resetMsgStoreItemMemory(maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);// 3 The remaining space may be any value// Here the length of the specially set maxBlankbyteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset);}/** Initialization of storage space 根据协议格式写入 */this.resetMsgStoreItemMemory(msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.msgStoreItemMemory.put(msgInner.getBornHostBytes());// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.msgStoreItemMemory.put(msgInner.getStoreHostBytes());// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength > 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort(propertiesLength);if (propertiesLength > 0)this.msgStoreItemMemory.put(propertiesData);// Write messages to the queue bufferbyteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result =new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(),queueOffset);switch (tranType) {case MessageSysFlag.TransactionPreparedType:case MessageSysFlag.TransactionRollbackType:break;case MessageSysFlag.TransactionNotType: /** 非事务类型 或 事务提交 就更新queue offset,下一个消息存储的位置 */case MessageSysFlag.TransactionCommitType:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}return result;}

/*** broker处理消息pull请求*/private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);response.setOpaque(request.getOpaque());SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return response;}if (!subscriptionGroupConfig.isConsumeEnable()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());return response;}final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;/** */TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {log.error("the topic " + requestHeader.getTopic() + " not exist, consumer: "+ RemotingHelper.parseChannelRemoteAddr(channel));response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}if (!PermName.isReadable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");return response;}if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {String errorInfo ="queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :"+ requestHeader.getTopic() + " topicConfig.readQueueNums: "+ topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}SubscriptionData subscriptionData = null;if (hasSubscriptionFlag) {try {subscriptionData =FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getSubscription());}catch (Exception e) {log.warn("parse the consumer's subscription[{}] failed, group: {}",requestHeader.getSubscription(),//requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark("parse the consumer's subscription failed");return response;}}else {ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (null == consumerGroupInfo) {log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's group info not exist"+ FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the consumer group[" + requestHeader.getConsumerGroup()+ "] can not consume by broadcast way");return response;}subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());if (null == subscriptionData) {log.warn("the consumer's subscription not exist, group: {}", requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's subscription not exist"+ FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {log.warn("the broker's subscription is not latest, group: {} {}",requestHeader.getConsumerGroup(), subscriptionData.getSubString());response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);response.setRemark("the consumer's subscription not latest");return response;}}final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(),requestHeader.getMaxMsgNums(), subscriptionData);if (getMessageResult != null) {response.setRemark(getMessageResult.getStatus().name());responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());responseHeader.setMinOffset(getMessageResult.getMinOffset());responseHeader.setMaxOffset(getMessageResult.getMaxOffset());if (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());log.warn("consume message too slow, suggest pulling from slave. group={}, topic={}, subString={}, queueId={}, offset={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(),subscriptionData.getSubString(), requestHeader.getQueueId(),requestHeader.getQueueOffset());}else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());}switch (getMessageResult.getStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);if (this.hasConsumeMessageHook()) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getConsumerGroup());context.setTopic(requestHeader.getTopic());context.setClientHost(RemotingHelper.parseChannelRemoteAddr(channel));context.setStoreHost(this.brokerController.getBrokerAddr());context.setQueueId(requestHeader.getQueueId());final SocketAddress storeHost =new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(),brokerController.getNettyServerConfig().getListenPort());Map<String, Long> messageIds =this.brokerController.getMessageStore().getMessageIds(requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(),requestHeader.getQueueOffset() + getMessageResult.getMessageCount(),storeHost);context.setMessageIds(messageIds);context.setBodyLength(getMessageResult.getBufferTotalSize()/ getMessageResult.getMessageCount());this.executeConsumeMessageHookBefore(context);}break;case MESSAGE_WAS_REMOVING:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_MATCHED_LOGIC_QUEUE:case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",//requestHeader.getQueueOffset(), //getMessageResult.getNextBeginOffset(), //requestHeader.getTopic(),//requestHeader.getQueueId(),//requestHeader.getConsumerGroup()//);}else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset: " + requestHeader.getQueueOffset()+ " over flow badly, broker max offset: " + getMessageResult.getMaxOffset()+ ", consumer: " + channel.remoteAddress());break;case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset too small. group={}, topic={}, requestOffset{}, brokerMinOffset={}, clientIp={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break;}switch (response.getCode()) {case ResponseCode.SUCCESS:this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getMessageCount());this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getBufferTotalSize());this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());if(this.brokerController.getBrokerConfig().isTransferMsgByHeap()){final byte[] r = this.readGetMessageResult(getMessageResult);response.setBody(r);}else {try {FileRegion fileRegion =new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {getMessageResult.release();if (!future.isSuccess()) {log.error("transfer many message by pagecache failed, " + channel.remoteAddress(),future.cause());}}});} catch (Throwable e) {log.error("transfer many message by pagecache exception", e);getMessageResult.release();}response = null;}break;case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}PullRequest pullRequest =new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), requestHeader.getQueueOffset());this.brokerController.getPullRequestHoldService().suspendPullRequest(requestHeader.getTopic(), requestHeader.getQueueId(), pullRequest);response = null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {MessageQueue mq = new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());OffsetMovedEvent event = new OffsetMovedEvent();event.setConsumerGroup(requestHeader.getConsumerGroup());event.setMessageQueue(mq);event.setOffsetRequest(requestHeader.getQueueOffset());event.setOffsetNew(getMessageResult.getNextBeginOffset());this.generateOffsetMovedEvent(event);log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(),event.getOffsetNew(), responseHeader.getSuggestWhichBrokerId());}else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueOffset(), responseHeader.getSuggestWhichBrokerId());}break;default:assert false;}}else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store getMessage return null");}boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());}return response;}public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,final SubscriptionData subscriptionData) {if (this.shutdown) {log.warn("message store has shutdown, so getMessage is forbidden");return null;}if (!this.runningFlags.isReadable()) {log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());return null;}long beginTime = this.getSystemClock().now();GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;long nextBeginOffset = offset;long minOffset = 0;long maxOffset = 0;GetMessageResult getResult = new GetMessageResult();final long maxOffsetPy = this.commitLog.getMaxOffset();/** consume queue 逻辑消费队列, 也就是消息的索引信息 */ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {minOffset = consumeQueue.getMinOffsetInQuque();maxOffset = consumeQueue.getMaxOffsetInQuque();{// 各种临界情况处理}else { /** 根据拉取消息的offset,得到索引对应的 byte buffer */SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);if (bufferConsumeQueue != null) {try {status = GetMessageStatus.NO_MATCHED_MESSAGE;long nextPhyFileStartOffset = Long.MIN_VALUE;long maxPhyOffsetPulling = 0;int i = 0;final int MaxFilterMessageCount = 16000;final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {/** 根据 consume queue 的 index 协议格式,读取字段: CommitLogOffset-Size-TagsCode */long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 实际便宜int sizePy = bufferConsumeQueue.getByteBuffer().getInt();// 实际大小long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();maxPhyOffsetPulling = offsetPy;if (nextPhyFileStartOffset != Long.MIN_VALUE) {if (offsetPy < nextPhyFileStartOffset)continue;}boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),isInDisk)) {break;}/** 如果需要在 broker 进行消息过滤 */if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {/** 具体从 commit log 中读取消息 */SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);if (selectResult != null) {this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();getResult.addMessage(selectResult); /** 构造返回的结果 */status = GetMessageStatus.FOUND;nextPhyFileStartOffset = Long.MIN_VALUE;}else {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.MESSAGE_WAS_REMOVING;}nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);}}else {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}}}if (diskFallRecorded) {long fallBehind = maxOffsetPy - maxPhyOffsetPulling;brokerStatsManager.recordDiskFallBehind(group, topic, queueId, fallBehind);}nextBeginOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);long diff = maxOffsetPy - maxPhyOffsetPulling;long memory =(long) (StoreUtil.TotalPhysicalMemorySize * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));getResult.setSuggestPullingFromSlave(diff > memory);}finally {bufferConsumeQueue.release();}}else {status = GetMessageStatus.OFFSET_FOUND_NULL;nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "+ maxOffset + ", but access logic queue failed.");}}}else {status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);}if (GetMessageStatus.FOUND == status) {this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();}else {this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();}long eclipseTime = this.getSystemClock().now() - beginTime;this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);getResult.setStatus(status);getResult.setNextBeginOffset(nextBeginOffset);getResult.setMaxOffset(maxOffset);getResult.setMinOffset(minOffset);return getResult;}/*** (topic + queueId) -> ConsumeQueue* 在 broker 启动的时候会load所有topic的consume queue信息,运行时不存在则创建*/public ConsumeQueue findConsumeQueue(String topic, int queueId) {ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) { /** 再次确认 */map = oldMap;}else {map = newMap;}}ConsumeQueue logic = map.get(queueId); // 一个逻辑队列if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(//topic,//queueId,//StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;}else {logic = newLogic;}}return logic;}/*** 根据起始索引得到消息索引对应的内存映射区*/public SelectMapedBufferResult getIndexBuffer(final long startIndex) {int mapedFileSize = this.mapedFileSize;long offset = startIndex * CQStoreUnitSize; // 每个索引项的大小是20Bif (offset >= this.getMinLogicOffset()) {MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset);if (mapedFile != null) {SelectMapedBufferResult result = mapedFile.selectMapedBuffer((int) (offset % mapedFileSize));return result;}}return null;}

RocketMQ的消息采用顺序写到commitlog文件,然后利用consume queue文件作为索引。RocketMQ采用零拷贝mmap+write的方式来回应Consumer的请求,RocketMQ宣称大部分请求都会在Page Cache层得到满足,所以消息过多不会因为磁盘读使得性能下降,这里自己的理解是,在64bit机器下,虚存地址空间(vm_area_struct)不是问题,所以相关的文件都会被映射到内存中(有定期删除文件的操作),即使此刻不在内存,操作系统也会因为缺页异常进行换入,虽然地址空间不是问题,但是一个进程映射文件的个数(/proc/sys/vm/max_map_count)是有限的,所以可能在这里发生OOM。

Broker存储目录(默认路径是 $HOME/store):
Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。异步而言,只是唤醒对应的线程,不保证执行的时机。
需要业务层自己决定哪些消息应该顺序到达,然后发送的时候通过规则(hash)映射到同一个队列,因为没有谁比业务自己更加知道关于消息顺序的特点。这样的顺序是相对顺序,局部顺序,因为发送方只保证把这些消息顺序的发送到broker上的同一队列,但是不保证其他Producer也会发送消息到那个队列,所以需要Consumer在拉到消息后做一些过滤。
类似于重复数据删除技术(Data Deduplication),可以在源端做,也可以在目的端实现,就是网络和存储的权衡,如果在Broker端做消息过滤就需要逐一比对consume queue 的 tagsCode 字段(hashcode),如果符合则传输给消费者,因为是 hashcode,所以存在误判,需要在 Consumer 接收到消息后进行字符串级别的过滤,确保准确性。