开发者

RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

目录
  • RocketMq消息处理
  • 1. 处理PULL_MESSAGE请求
  • 2. 获取消息
  • 3. 挂起请求:PullRequestHoldService#suspendPullRequest
    • 3.1 处理挂起请求的线程:PullRequestHoldService
    • 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup
    • 3.3 消息分发中唤醒consumer请求
  • 总结

    RocketMq消息处理

    RocketMq消息处理整个流程如下:

    本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

    RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    • 消息接收:消息接收是指接收producer的消息,处理类是SendMessageProcessor,将消息写入到commigLog文件后,接收流程处理完毕;
    • 消息分发:broker处理消息分发的类是ReputMessageService,它会启动一个线程,不断地将commitLong分到到对应的consumerQueue,这一步操作会写两个文件:consumerQueueindexFile,写入后,消息分发流程处理 完毕;
    • 消息投递:消息投递是指将消息发往consumer的流程,consumer会发起获取消息的请求,broker收到请求后,调用PullMessageProcessor类处理,从consumerQueue文件获取消息,返回给consumer后,投递流程处理完毕。

    以上就是rocketMq处理消息的流程了,接下来我们就从源码来分析消息投递的实现。

    1. 处理PULL_MESSAGE请求

    producer不同,consumerbroker拉取消息时,发送的请求codePULL_MESSAGEprocessorPullMessageProcessor,我们直接进入它的processRequest方法:

    @Override
    public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        // 调用方法
        return this.processRequest(ctx.channel(), request, true);
    }
    

    这个方法就只是调用了一个重载方法,多出来的参数true表示允许broker挂起请求,我们继续,

    /**
     * 继续处理
     */
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, 
            boolean brokerAllowSuspend)throws RemotingCommandException {
        RemotingCommand response = RemotingCommand
            .createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader 
            = (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) 
            request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
        response.setOpaque(request.getOpaque());
        // 省略权限校验流程
        // 1. rocketMq 可以设置校验信息,以阻挡非法客户端的连接
        // 2. 同时,对topic可以设置DENY(拒绝)、ANY(PUB 或者 SUB 权限)、PUB(发送权限)、SUB(订阅权限)等权限,
        //    可以细粒度控制客户端对topic的操作内容
        ...
        // 获取订阅组
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager()
            .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
        ...
        // 获取订阅主题
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager()
            .selectTopicConfig(requestHeader.getTopic());
        ...
        // 处理filter
        // consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag与sql92
        // 这里我们重点关注拉取消息的流程,具体的过滤细节后面再分析
        ...
        // 获取消息
        // 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件
        // 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容
        final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), 
            requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
        if (getMessageResult != null) {
            // 省略一大堆的校验过程
            ...
            switch (response.getCode()) {
                // 表示消息可以处理,这里会把消息内容写入到 response 中
                case ResponseCode.SUCCESS:
                    ...
                    // 处理消息消息内容,就是把消息从 getMessageResult 读出来,放到 response 中
                    if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                        final long beginTimeMills = this.brokerController.getMessageStore().now();
                        // 将消息内容转为byte数组
                        final byte[] r = this.readGetMessageResult(getMessageResult, 
                            requestHeader.getConsumerGroup(), requestHeader.getTopic(), 
                            requestHeader.getQueueId());
                        ...
                        response.setBody(r);
                    } else {
                        try {
                            // 消息转换
                            FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(
                                getMessageResult.getBufferTotalSize()), getMessageResult);
                            channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                                ...
                            });
                        } catch (Throwable e) {
                            ...
                        }
                        response = null;
                    }
                    break;
                // 未找到满足条件的消息
                case ResponseCode.PULL_NOT_FOUND:
                    // 如果支持挂起,就挂起当前请求
                    if (brokerAllowSuspend && hasSuspendFlag) {
                        ...
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                            this.brokerController.getMessageStore().now(), offset, subscriptionData, 
                            messageFilter);
                        // 没有找到相关的消息,挂起操作
                        this.brokerController.getPullRequestHoldService()
                            .suspendPullRequest(topic, queueId, pullRequest);
                        response = null;
                        break;
                    }
                // 省略其他类型的处理
                ...
                    break;
                default:
                    assert false;
            }
        } else {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("store getMessage return null");
        }
        ...
        return response;
    }
    

    在源码中,这个方法也是非常长,这里我抹去了各种细枝末节,仅留下了一些重要的流程,整个处理流程如下:

    • 权限校验:rocketMq 可以设置校验信息,以阻挡非法客户端的连接,同时也可以设置客户端的发布、订阅权限,细节度控制访问权限;
    • 获取订阅组、订阅主题等,这块主要是通过请求消息里的内容获取broker中对应的记录
    • 创建过滤组件:consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tagsql92
    • 获取消息:先是根据 topicqueueId 获取 ConsumerQueue 文件,根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容,消息的过滤操作也是发生在这一步
    • 转换消息:如果获得了消息,就是把具体的消息内容,复制到reponse
    • 挂起请求:如果没获得消息,而当前请求又支持挂起,就挂起当前请求

    以上代码还是比较清晰的,相关流程代码中都作了注释。

    以上流程就是整个消息的获取流程了,在本文中,我们仅关注与获取消息相关的步骤,重点关注以下两个操作:

    • 获取消息
    • 挂起请求

    2. 获取消息

    获取消息的方法为DefaultMessageStore#getMessage,代码如下:

    public GetMessageResult getMessage(final String group, final String topic, final int queueId, 
            final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
        // 省略一些判断
        ...php
        // 根据topic与queueId一个ConsumeQueue,consumeQueue记录的是消息在commitLog的位置
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();
            if (...) {
                // 判断 offset 是否符合要求
                ...
            } else {
                // 从 consumerQueue 文件中获取消息
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    ...
                    for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; 
                        i += Consume编程Queue.CQ_STORE_UNIT_SIZE) {
                        // 省略一大堆的消息过滤操作
                        ...
                        // 从 commitLong 获取消息
                        SelectMappedBufferResult selectResult 
                                = this.commitLog.getMessage(offsetPy, sizePy);
                        if (null == selectResult) {
                            if (getResult.getBufferTotalSize() == 0) {
                                status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                            }
                            nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                            continue;
                        }
                        // 省略一大堆的消息过滤操作
                        ...
                    }
                }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }
        if (GetMessageStatus.FOUND == status) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        long elapsedTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
        getResult.setStatus(status);
        // 又是处理 offset
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
        return getResult;
    }
    

    这个方法不是比较长的,这里仅保留了关键流程,获取消息的关键流程如下:

    • 根据topicqueueId找到ConsumerQueue
    • ConsumerQueue对应的文件中获取消息信息,如taghashCode、消息在commitLog中的位置信息
    • 根据位置信息,从commitLog中获取完整的消息

    经过以上步骤,消息就能获取到了,不过在获取消息的前后,会进行消息过滤操作,即根据tagsql语法来过滤消息,关于消息过滤的一些细节,我们留到后面消息过滤相关章节作进一步分析。

    3. 挂起请求:PullRequestHoldService#suspendPullRequest

    broker无新消息时,consumer拉取消息的请求就会挂起,方法为PullRequestHoldService#suspendPullRequest

    public class PullRequestHoldService extends ServiceThread {
        private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
            new ConcurrentHashMap<String, ManyPullRequest>(1024);
        public void suspendPullRequest(final String topic, final int queueId, 
                final PullRequest pullRequest) {
            String key = this.buildKey(topic, queueId);
            ManyPullRequest mpr = this.pullRequestTable.get(key);
            if (null == mpr) {
                mpr = new ManyPullRequest();
                ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
                if (prev != null) {
                    mpr = prev;
                }
            }
            mpr.addPullRequest(pullRequest);
        }
        ...
    }
    

    suspendPullRequest方法中,所做的工作仅是把当前请求放入pullRequestTable中了。从代码中可以看到,pullRequestTable是一个ConcurrentMapkeytopic@queueIdvalue 就是挂起的请求了。

    请求挂起后,何时处理呢?这就是PullRequestHoldService线程的工作了。

    3.1 处理挂起请求的线程:PullRequestHoldService

    看完PullRequestHoldService#suspendPullRequest方法后,我们再来看看PullRequestHoldService

    PullRequestHoldServiceServiceThread的子类(上一次看到ServiceThread的子类还是ReputMessageService),它也会启动一个新线程来处理挂起操作。

    我们先来看看它是在哪里启动PullRequestHoldService的线程的,在BrokerController的启动方法start()中有这么一行:

    BrokerController#start

    public void start() throws Exception {
        ...
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
        ...
    }
    

    这里就是启动pullRequestHoldService的线程操作了。

    为了探究这个线程做了什么,我们进入PullRequestHoldService#run方法:

    @Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                // 等待中
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(
                        this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
                long beginLockTimestamp = this.systemClock.now();
                // 检查操作
             编程客栈   this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
        log.info("{} service end", this.getServiceName());
    }
    

    从代码来看,这个线程先是进行等待,然后调用PullRequestHoldService#checkHoldRequest方法,看来关注就是这个方法了,它的代码如下:

    private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0];
                int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore()
                    .getMaxOffsetInQueue(topic, queueId);
                try {
                    // 调用notifyMessageArriving方法操作
                    this.notifyMessageArriving(topic, queueId, offset);
                } catch (Throwable e) {
                    log.error(...);
                }
            }
        }
    }
    

    这个方法调用了PullRequestHoldService#notifyMessageArriving(...),我们继续进入:

    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {编程
        // 继续调用
        notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
    }
    /**
     * 这个方法就是最终调用的了
     */
    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, 
        final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();
                for (PullRequest request : requestList) {
                    // 判断是否有新消息到达,要根据 comsumerQueue 的偏移量与request的偏移量判断
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore()
               编程客栈             .getMaxOffsetInQueue(topic, queueId);
                    }
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }
                        if (match) {
                            try {
                                // 唤醒操作
                                this.brokerController.getPullMessageProcessor()
                                    .executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }
                    // 超时时间到了
                    if (System.currentTimeMillis() >= 
                            (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            // 唤醒操作
                            this.brokerController.getPullMessageProcessor()
                                .executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                    replayList.add(request);
                }
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }
    

    这个方法就是用来检查是否有新消息送达的操作了,方法虽然有点长,但可以用一句话来总结:如果有新消息送达,或者pullRquest hold住的时间到了,就唤醒pullRquest(即调用PullMessageProcessor#executeRequestWhenWakeup方法)。

    • 在判断是否有新消息送达时,会获取comsumerQueue文件中的最大偏移量,与当前pullRquest中的偏移量进行比较,如果前者大,就表示有新消息送达了,需要唤醒pullRquest
    • 前面说过,当consumer请求没获取到消息时,brokerhold这个请求一段时间(30s),当这个时间到了,也会唤醒pullRquest,之后就不会再hold住它了

    3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup

    我们再来看看 PullMessageProcessor#executeRequestWhenWakeup 方法:

    public void executeRequestWhenWakeup(final Channel channel,
        final RemotingCommand request) throws RemotingCommandException {
        // 关注 Runnable#run() 方法即可
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    // 再一次调用 PullMessageProcessor#processRequest(...) 方法
                    final RemotingCommand response = PullMessageProcessor.this
                        .processRequest(channel, request, false);
                    ...
                } catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", e1);
                }
            }
        };
        // 提交任务
        this.brokerController.getPullMessageExecutor()
            .submit(new RequestTask(run, channel, request));
    }
    

    这个方法准备了一个任务,然后将其提交到线程池中执行,任务内容很简单,仅是调用了PullMessageProcessor#processRequest(...) 方法,这个方法就是本节一始提到的处理consumer拉取消息的方法了。

    3.3 消息分发中唤醒consumer请求

    在分析消息分发流程时,DefaultMessageStore.ReputMessageService#doReput方法中有这么一段:

    private void doReput() {
        ...
        // 分发消息
        DefaultMessageStore.this.doDispatch(dispatchRequest);
        // 长轮询:如果有消息到了主节点,并且开启了长轮询
        if (BrokerRole.SLAVE != DefaultMessageStore.this
                .getMessageStoreConfig().getBrokerRole()
                &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
            // 调用NotifyMessageArrivingListener的arriving方法
            DefaultMessageStore.this.messageArrivingListener.arriving(
                dispatchRequest.getTopic(),
                dispatchRequest.getQueueId(), 
                dispatchRequest.getConsumeQueueOffset() + 1,
                dispatchRequest.getTagsCode(), 
                dispatchRequest.getStoreTimestamp(),
                dispatchRequest.getBitMap(), 
                dispatchRequest.getPropertiesMap());
        }
        ...
    }
    

    这段就是用来主动唤醒hold住的consumer请求的,我们进入NotifyMessageArrivingListener#arriving方法:

     @Override
    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
            msgStoreTime, filterBitMap, properties);
    }
    

    最终它也是调用了 PullRequestHoldService#notifyMessageArriving(...) 方法。

    总结

    本文主要分析了broker处理PULL_MESSAGE请求的流程,总结如下:

    • broker处理PULL_MESSAGEprocesso开发者_Go学习rPullMessageProcessorPullMessageProcessorprocessRequest(...)就是整个消息获取流程了
    • broker在获取消息时,先根据请求的topicqueueId找到consumerQueue,然后根据请求中的offset参数从consumerQueue文件中找到消息在commitLog的位置信息,最后根据位置信息从commitLog中获取消息内容
    • 如果broker中没有当前consumerQueue的消息,broker会挂起当前线程,直到超时(默认30s)或收到新的消息时再唤醒

    参考  

    RocketMQ源码分析专栏

    以上就是RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析的详细内容,更多关于RocketMQ broker 消息投递的资料请关注我们其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜