2026/4/18 14:32:13
网站建设
项目流程
做企业网站好处,如何做内部网站,广州seo做得比较好的公司,糖果网站建设策划书模板RocketMq基础篇整体栏目 内容链接地址【一】环境搭建、基本使用、可视化界面https://zhenghuisheng.blog.csdn.net/article/details/147481401【二】rocketmq集群搭建(docker版-2主2从)https://zhenghuisheng.blog.csdn.net/article/details/154921615【三】dashboard安装搭建和…RocketMq基础篇整体栏目内容链接地址【一】环境搭建、基本使用、可视化界面https://zhenghuisheng.blog.csdn.net/article/details/147481401【二】rocketmq集群搭建(docker版-2主2从)https://zhenghuisheng.blog.csdn.net/article/details/154921615【三】dashboard安装搭建和启动详解https://zhenghuisheng.blog.csdn.net/article/details/155371854【四】rocketmq基本使用https://zhenghuisheng.blog.csdn.net/article/details/155679428【五】精通rocketmq核心组件https://zhenghuisheng.blog.csdn.net/article/details/156546721如需转载请附上链接: https://blog.csdn.net/zhenghuishengq/article/details/156546721精通rocketmq核心组件一精通rocketmq核心组件1简述消息的一生2Producer生产者2.1Producer发送流程3Topic消息主题4CommitLog存储4.1commitlog概念4.2与messageQueue关系4.3segment过期删除5ConsumeQueue存储的索引5.1consumeQueue 概念5.2consumeQueue存储内容5.3配合offset使用5.4总结consumeQueue6MessageQueue消息队列6.1messageQueue概念6.2messageQueue与topic关系6.3messageQueue和consumeQueue的关联6.4为什么messageQueue是逻辑概念6.5总结messageQueue7ConsumerGroup消费者组8Consumer8.1consumer消费流程9Offset偏移量9.1offset核心概念10核心组件设计10.1组件设计动机10.2设计动机和技术收益11总结一精通rocketmq核心组件前面几篇文章已经讲解了rocketmq的集群搭建dashboard可视化界面的使用并且通过案例打通了整个链路包括消息的生产、发送以及集群的同步等已经形成了一个闭环整体测试都没问题。在基础环境搭建好之后接下来这篇正式对rocketmq相关组件的介绍 用一条消息的“人生轨迹”串起所有基础概念把Topic、Queue、Message、Producer、Consumer、ConsumerGroup、Offset、commitLog全部讲清楚。在学习之前还是不能脱离官方文档官方文档–领域模型 。1简述消息的一生在学习rocketmq相关组件之前先从消息的角度来看消息的生命周期到底是怎么样的其主要经历以下的流程:消息最初开始从producer发送并且指定对应的nameserver和topicnameserver即注册中心将topic以及queue信息返回给producerproducer拿到nameserver注册中心信息之后决定将消息发往哪个broker以及queuebroker 将消息顺序写入 CommitLog(磁盘文件)后台线程ReputService通过异步扫描CommitLog给每个Queue构建逻辑索引ConsumeQueueConsumer按Queue拉取ConsumeQueue 里的消息索引再去 CommitLog读消息体业务代码消费成功后随后提交消费进度 OffsetBroker再根据Offset偏移量判断哪些消息已经消费过哪些还需要投递再达到保留时间/空间策略后旧消息被物理删除结束一生2Producer生产者在前面我们已经通过一张“鸟瞰图”看过了一条消息的一生无论后面的存储结构多复杂、消费模型多精巧所有消息的起点只有一个Producer。 简单来说Producer 是消息的生产者负责把业务消息送进 RocketMQ 系统。如下图来自官网producer就是整个架构的起点2.1Producer发送流程接下来站在开发者的角度通过一段代码来体现在创建Producer到发送消息前整个流程到底做了哪些事件// 1. 创建生产者Producer Group 名称DefaultMQProducerproducernewDefaultMQProducer(zhsProducerGroup);// 所有参数中该参数必须绑定producer.setNamesrvAddr(192.168.1.246:9876);// 2. 启动 Producer不启动会报错producer.start();// 3. 构建消息MessagemsgnewMessage(zhsTopicV1,TagA,测试);// 4. 发送消息producer.send(msg)// 5. 关闭 Producerproducer.shutdown();首先第一步是指定生产者组的名称可以用于事务消息、故障恢复等场景第二步需要指定nameservernameserver类似于一个注册中心里面记录topicmessageQueue以及broker之间的关系拿到这些信息之后这样producer就知道应该往哪个broker下的哪个topic下的哪个messageQueue发送消息一条消息只会发送到一个messageQueue消息队列中第三步就开始真正的发送消息首先会将消息发送到对应的broker中随后等待broker的回应一个broker就是一个物理机器的结点的意思。返回结构主要有三种只要Producer收到发送成功的返回结果那么就可以认为这条消息被broker接管了发送成功发送失败发送超时至于后面消息如何被存储、怎么消费等等producer不需要关心。3Topic消息主题接下来讲解的核心概念topictopic是一个逻辑概念用于对消息进行分类。message消息具体不是存储在topic中而是落盘存储在commitlog磁盘中 messageQueue消息队列也不存储真实数据而是一个逻辑队列用于路由、顺序、消费维度的划分这个后面重点会讲。topic的作用就是决定这条消息属于哪个业务因此就可以通过不同的topic之间实现业务与业务之间的隔离用一个比较贴切的案例来讲解一下topic在企业中的不同功能或者业务可能会拉不同的群比如用户注册群、订单处理群、广告投放群等等那么这些群名称就是可以真实的对应我们的topic如下图标群聊名称topic名称含义userRegisterChatGroupuserRegisterTopic用户注册userOrderChatGroupuserOrderTopic订单adChatGroupadTopic广告投放群名其本身也不存储任何消息我们需要往哪个群发消息时只需要找到对应的群发即可不需要具体的关心消息是否发送成功触达成功如何存储等。topic的角色就是这里的群名发消息需要确定哪个群(指定topic)收消息也只能看到自己所加入群的消息(消费者订阅的topic)在开发中创建一条消息时需要指定好topic这样就能保证这条消息会发送到哪个topic中。即我要发送一个今日订单统计到订单群那么只需要找到这个订单群聊并且往里面发送消息即可MessagemsgnewMessage(userOrderTopic,uuid.getBytes());topic在整个分布式架构中分布如下一个broker会存在会存在多个topic一个topic也可以存在多个broker中。如下面有四个broker每个broker都有用户注册主题、订单主题、广告投放主题每个主题都会分布在这四个broker上面当然上面这是属于比较理想的状态前面我们有用那个自动创建topic的方式那种方式就是只会只在一个broker中创建topic但是在实际开发中可以通过这个mqadmin中创建或者运维脚本提前创建最后总结topic只负责对消息进行业务分类决定某一条消息属于哪一类的业务本身不存储消息而并且不需要关心数据的存储、消费进度、存储细节等。在producer消息发送前就会指定对应的topic因此topic本身也不需要关心数据混乱的问题。4CommitLog存储4.1commitlog概念接下来先讲解commitlog当消息从producer生产者发到broker的时候消息会携带topicmessageQueue队列等参数随后需要做的重点的事情就是先将消息持久化到磁盘。那么此时就需要将消息持久化到commitlog中即我们的磁盘。上面这幅图就是表示的queue队列和commitlog的真实关系也就是说commitlog是真实存储数据的地方接下来对这幅图做一个详细的解释commitlog顺序追加写他不区分topic和队列他会将所有的数据都通过顺序追加写的方式写到commitlog中。通过上图也可以得知单个topic下的多个队列在commitlog中并不能保证有序性因为消息在producer发送前就会指定messageQueue因此在messageQueue内部是有序。但是在producer中确实是按轮询顺序发送消息但是在实际业务场景中是producer由多个线程发送消息的即使确定了发送往broker的顺序但是由于并发的场景下后面的线程后发但是会先到broker因此无法保证 Topic 级别的全局有序。如下面这个例子时间点 t1线程T1 发送 A1Queue0到 Broker时间点 t2线程T2 发送 B1Queue1到 Broker时间点 t3线程T2 发送 B2Queue1到 Broker时间点 t4线程T1 发送 A2Queue0到 Broker实际情况在broker中可能接收到的顺序是A1–B1–B2–A2 。这里主要强调一点就是producer是顺序发commitlog也是顺序追加写但是由于并发的问题导致谁先到broker谁写写入到commitlog中因此无法保证不了topic的全局有序commitlog磁盘文件通过多段segment组成一个整体数据都会通过追加写的方式写入内部。commitlog本身也存在过期当 segment 停止写入后且该 segment 中最后一条消息距离当前时间超过保留时间时会被删除默认是72小时或者是磁盘使用率超过一定的阈值时就会被删broker会以segment文件为单位清理过期的commitlogcommitlog本身不限制逻辑大小实际受限于磁盘容量和清理策略。注意图中的 id1 并不是一条真实消息而是一个抽象表示用于说明 MessageQueue 中记录的是一条指向 CommitLog 的索引位置在 RocketMQ 中对应的是真实的 CommitLogOffset物理偏移量。4.2与messageQueue关系需要再讲解一个重点再broker的数据的真实流转中虽然commitlog我写在了MessageQueue队列后面但是实际情况是先有commitlog的数据存储再有messageQueue的映射因为messageQueue存储的是commitlog数据的具体索引按理来说也是先有数据存储再有具体的位置所以说先有commitlog再有messageQueue的view视图这里的id和上面的一样也是一个抽象概念具体是存的真实的偏移量。4.3segment过期删除最后再提一下commitlog的组成commitlog由多段的segment组成每段segment为1g的磁盘容量如果commitlog过期被删除时就是以segment为单位删除删除条件如下删除的第一要素文件在停止写入的情况下否则不能删除在满足停止写入条件之后删除主要两个要素判断一个是停止写后72小时没有数据往里面写那么这段segment会被删除或者当磁盘使用率超过阈值(默认75%)时Broker 会优先删除最老、且已停止写入的 segment5ConsumeQueue存储的索引ok上面讲解完了commitlogcommitlog主要是通过顺序追加写的方式将数据存储上面也举了一个例子commitlog可以类似于我们的mysql中的表(这里只是举例)用于存储数据但是随着时间的增长数据量会越来越大而且所有的topic以及队列都会全部追加在commitlog上那么随着时间的推移数据量就会越来越大后面需要找数据也是一个难题。就比如consumer消费者在消费数据之后需要将offset偏移量提交给broker这样消费者在下次消费时就可以通过上次记录的偏移量接着消费。但是这里就出了一个难题假设数据有100w此时消费者消费记录到10w那么定位对应的下一条消息就需要全量从头到尾扫描一次这样效率肯定会大大降低一次在rocketmq内部就给commitlog增加了一个索引就是本章节需要讲的consumeQueue5.1consumeQueue 概念commitlog是一张顺序追加写的大表那么consumeQueue就是用来快速定位消息的索引里面不存储消息内容只存储如何找到消息的信息consumeQueue队列是按照TopicMessageQueue维度进行维护的为什么需要这一层因为在producer中说了在消息发送前会先从nameServer中路由相关信息就已经确定了这条消息会落到哪个topic对应的哪个messageQueue中并且保证了消息时一定会落盘到commitlog。在这个基础上Broker会为每一个messageQueue维护一份对应的consumeQueue用于记录该messageQueue下的消息在commitlog中的物理位置。commitlog负责存储真实数据messageQueue用于决定消费和顺序的逻辑边界consumeQueue按messageQueue维度建立commitlog的索引通过这种方式就可以将确定的messageQueue和不确定的commitlog位置给关联。在后面消费者消费时比如根据offset偏移量得知已经消费到了10万条那么下一条可以直接通过consumeQueue去定位而不需要对commitlog进行全量扫描类似于mysql的索引。5.2consumeQueue存储内容在mysql索引的b树中非叶子结点不会存数据而是存指向叶子结点的idconsumeQueue也与之类似不会存整条消息的内容而是存储找到commitlog中这条内容的视图索引加快查询减少全量扫描。consumeQueue主要存储三个核心的字段分别是commitlogOffset物理偏移量、msgSize消息长度、tagHashCode标签哈希值commotlogOffset表示这条消息在commitlog文件中的起始位置有了这个偏移量就可以直接定位commitlog的起始位置。有点类似于mysql的主键id可以快速定位某条数据msgSize表示这条消息的长度从上面的commitlogOffset为开始连续读取到少个字节这样就能获取到一条完整的消息tagHashCode辅助优化的字段消息tag的hash值某些hash过滤的场景下可以判断是否符合订阅条件。通过这个consumeQueue的索引视图这样就能快速的定位commitlog存储位置如弟N条消息在commitlog的第X个字节开始长度是Y。每个索引固定占用20个字节因此consumeQueue的文件非常小。5.3配合offset使用offset在后续会详讲解这里大概讲一下consumeQueue和offset如何配合使用offset就是偏移量的意思一个消费者组消费了某个topic的消息之后消费者会往broker中提交消费记录这个就是offset偏移量消费者下次消费就可以继续消费下一条消息即可。offset中记录的内容结构如下分别是topic、messageQueueconsumerGroup显而易见消费者在消费下一条消息时为了可以快速的定位到commitlog的位置就可以利用这个consumeQueue快速定位即可。(ConsumerGroup,Topic,MessageQueue)-ConsumeOffset定位到消息之后通过consumeQueue获取起始位置消息长度从而在commitlog中获取到完整消息并且在消费者完成消费之后继续提交的offset就是加上这段消息大小即可。(Topic,MessageQueue,ConsumerGroup)-OffsetNbatchSize完整消费流程如下首先消费者从broker中读取这三元组Topic、MessageQueue和上一次提交的offset偏移量大小NN代表已经消费的条数随后Broker去consumerQueue中找到对应的第N条索引索引中记录了消息的commitlogOffset起始位置和msgSize消息长度通过起始位置和消息长度读取commitlog中的数据随后完成消费消费成功之后提交offset偏移量回brokeroffset N 这次消费条数5.4总结consumeQueueconsumeQueue主要是为了实现消费进度和真实物理存储的关键桥梁是以messageQueue为维度建立的真实存储commitlog的一个视图索引类似于b树的一个高效稳定索引总结一下几点实现快速定位commitlog不需要全量扫描commitlog每个索引占20个字节本身轻量可以支撑海量信息可以精确恢复消费进度支撑高并发消费consumeQueue就是下图中的中间层MessageQueue中的M1代表和commitlog关联其本身不存储消息本身CQ代表为每个topic中的messageQueue维度所建立的索引真实的数据都存储在commitlog中用data表示。因此消费者的offset就可以搞笑的从consumeQueue中定位到commitlog。6MessageQueue消息队列6.1messageQueue概念上面讲解了commitlog数据存储和commitlog的索引consumeQueue接下来讲解rocketmq的核心组件messageQueue上面也有提到messageQueue相关producer 在发送消息前会显式指定 Topic随后从 NameServer 获取该 Topic 下的 MessageQueue 列表并根据内置路由算法(如轮询、一致性哈希等)在客户端本地选择一个具体的 MessageQueue 作为发送目标。consumeQueue是以messageQueue维度设置的commitlog索引每一条消息在逻辑上都会归属于某一个 MessageQueue等结合官方的话说就是MessageQueue消息的最小存储单元。也就是说消息的路由、分配、消费顺序、消费进度的推进、消费的rebalance等都是以 MessageQueue 为维度来进行的。消息的落盘位置不是在MessageQueue队列中而是在commitlog中所以说MessageQueue本身不存储真实数据它更多是一个逻辑队列视图概念在Broker端会为每一个MessageQueue维护对应的索引结构用来记录消息在 CommitLog 中的物理位置但是我们真实的操作消息的时候比如消费等环节还是以messageQueue维度进行。6.2messageQueue与topic关系如下图一个broker中会存在多个topic每个topic和messageQueue的关系如下创建topic时默认会创建4个messageQueue当然这个队列的个数可以修改。所以说topic就是一个逻辑概念一个逻辑分类 messageQueue 并不是物理存储结构而是消息在 Broker 侧的逻辑队列视图producer发送消息前会先去nameserver中获取topic信息以及对应的全部queue列表随后数据在commitlog落盘后然后根据默认的轮询算法通过子线程异步将存储地址存到messageQueue队列中消息地址映射只会进入一个queue队列是消息的最小存储单位。创建topic时不管时默认创建消息队列还是官方推荐都是建议建立多个queue首先是解决吞吐量问题多个queue可以增加读写能力不管是生产者投递还是消费者消费都可以对单个queue实现读/写热点分散。接下来是消息顺序的问题首先queue队列的天然特性就是先进先出因此在单个队列中就已经实现了排队和顺序当然在不同队列之间显然是不能保证有序的所以说rocketmq的顺序消息只能保证单个队列有序即局部有序。如果业务允许的情况下比如整体系统业务吞吐量不高并且一定要全局有序的话那么直接设计一个单queue队列的topic即可6.3messageQueue和consumeQueue的关联在讲解这个问题之前可以先看一下官网提供的这张图: consumeQueue存储 可以看到TopicTest文件下面存在4个物理队列文件每个文件其实就是对应着4个messageQueue。换句话说messageQueue就是一个逻辑概念可以理解为逻辑上的消息文件夹也就是说磁盘中真实存在的只有commitlog存储文件和consumerQueue索引文件messageQueue只是一个逻辑概念除了可以快速通过队列下标定位到consumeQueue之外还用于定义消息的归属、顺序消费的边界以及消费进度等因此MesssageQueue和ConsumeQueue其实就是根据消息的个数一一对应如下图M1对应的就是CQ1因为在单个messageQueue队列中的消息本身是有序的所以第N个messageQueue对应的commitlog索引就是在对应consumeQueue队列的第N条中。可以直接用表格来展示二者关系其内容如下consumeQueue含义index 0MessageQueue中的第1条消息index 1MessageQueue中的第2条消息index 2MessageQueue中的第3条消息index NMessageQueue中的第N1条消息消费者下一次拉取消息的流程如下消费者offset偏移量N之后这个N是上一次提交的消息最后消息的结束位置也是下一条消息的起始位置随后定位到对应consumeQueue的第N个索引从索引中获取到对应的commitlogOffset和msgSize根据偏移量和消息大小获取到commitlog中的内容消费者消费成功后再提交offset为N1到broker中。offset是以消息条数为单位的逻辑偏移量而不是commitlog中的物理字节偏移量。6.4为什么messageQueue是逻辑概念首先谈一下什么是逻辑概念首先指的是在实际磁盘中是不存在的或者说是一个磁盘文件的一个映射物并且也不负责真实数据的持久化等而是真实数据的一个抽象视图主要用于主要用于路由、分组、顺序、消费等模型。上面的topic是逻辑概念相当于是给messageQueue的划分核心主要是对开发中不同的业务进行划分和隔离但是为什么这个messageQueue也要设计成一个逻辑概念呢顺序写在开发中如果一层解决不了那就加一层而这个messageQueue就是新加的一层逻辑概念层这样我们可以不用直接去操作commitlog以及consumeQueue比如顺序写直接写commitlog即可如果直接写messageQueue那么在一个topic的4个messageQueue中就要轮询写如messageQueue顺序写就会变成多文件切换的随机写那么顺序写的功能没了吞吐量大大下降。ConsumerGroup扩容缩容如果messageQueue是物理文件消费者的rebalance机制也会出现问题正常来说是一个消费者对应一个队列如果消费者挂了那么就需要查询rebalance某个messageQueue到某个消费者中如messageQueue0对应的consumerA但是consumerA服务挂了因此需要rebalance队列messageQueue0到consumerB上面如果messageQueue是物理文件那么对应的物理文件也要对应的迁移锁定等操作数据迁移是大工程也是不符合rocketmq设计的初衷的。如果只是逻辑概念那么只需要messageQueue重新rebalance即可内部的存储数据和索引是可以不用动的Broker扩容这个问题和上面的consumerGroup问题是一样的假设增加一个broker由于保证一条消息只存在一个messageQueue的原则那么就需要将一半的数据挪到另一个broker中。如果messageQueue是物理概念那么也需要涉及到数据迁移问题需要将messageQueue对应的历史消息数据迁移如果只是逻辑概念那么只需要对这个逻辑概念进行迁移即可不需要迁移大量的数据6.5总结messageQueue综上MessageQueue 被设计为逻辑概念而非物理存储结构是 RocketMQ 在性能、扩展性与可运维性之间权衡后的必然结果CommitLog负责真实消息的顺序写入物理存储ConsumeQueue按 MessageQueue 维度建立索引物理索引)MessageQueue定义消息归属、顺序边界与消费模型逻辑视图到目前为止rocketmq中的topic和messageQueue都是逻辑概念真实的物理概念目前只有commitlog和consumeQueue。7ConsumerGroup消费者组上面已经讲解完了product发送消息commitlog存储消息等接下来讲解的就是消息如何被消费下图来自官网在讲解consumer之前先讲解rocketmq里面的另一个组件consumerGroup因为ConsumerGroup是规则定义者consumer只是执行者。 用官网的话说消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。在rocketmq整个架构设计中消费是以ConsumerGroup为主体从而解决在多实力环境中出现相互重复消费的问题。Offset是以ConsumerGroup为维度从而实现消费进度统一Rebalance 是以 ConsumerGroup为单位解决一些容灾扩容等问题consumerGroup更加强调的是消息如何被分配而不是具体由哪个消费者来消费如一些集群或者广播消费模式MessageQueue消息由哪个消费者组中的消费者消费每个消费者组的offset偏移量的问题某个消费者挂如何实现Rebalance的问题等。如集群消费和广播消费这些都是属于consumerGroup的属性而不是consumer的属性。集群消费一般用的比较普遍一条消息只会被一个消费者组中的消费者消费一次广播消费会被组内的所有consumer消费者消费。如前面讲解的逻辑messageQueue消息队列consumerGroup会分配组内的一个消费者进行消费如UserTopic对应的MessageQueue0这个消息队列只会被consumerGroupA中consumerA消费在正常集群消费情况下1个MessageQueue对应着一个Consumer接下来就是offset偏移量的问题offset也是针对于真个消费者组的消费进度在某个messageQueue消费的偏移量。offset是consumeGroup在对应messageQueue上消费的游标最后一个就是rebalance问题正常来说是一个消费者对于一个messageQueue 当某个 Consumer 下线时ConsumerGroup 会触发 Rebalance将原本分配给该 Consumer 的 MessageQueue 重新分配给组内其他存活的 Consumer。总而言之ConsumerGroup不负责消息的具体消费而是更加偏向于规则的制定决定消息如何被分配偏移量如何推进扩展等问题8Consumer上面已经讲解了consumerGroup是消息消费规则的定义者那么接下来要讲解的就是消息消费的具体执行者-consumer。不用关心决定消费哪个messageQueue也不要关心从哪个offset起开始消费真正需要关心的主要有下面几件事与broker建立连接按照consumerGroup的分配结果拉取对应messageQueue执行消息消费完成消费后提交结果和偏移量接下来看下面这段消费代码需要指定consumerGroup消费者组NameServer注册中心然后订阅对应的topic以及可以设置tag标签(可选)等设置监听器用于监听订阅的topic最后完成消费// 1. 创建 Consumer必须填写消费组名称DefaultMQPushConsumerconsumernewDefaultMQPushConsumer(zhsConsumerGroup);// 2. 指定 NameServerconsumer.setNamesrvAddr(192.168.1.246:9876);// 3. 订阅 Topic 和 TagAconsumer.subscribe(zhsTopicV2,TagA);// 4. 注册消息监听器并发消费consumer.registerMessageListener((MessageListenerConcurrently)(msgs,context)-{for(MessageExtmsg:msgs){System.out.println(收到消息newString(msg.getBody()), queueIdmsg.getQueueId(), msgIdmsg.getMsgId());}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5. 启动消费者consumer.start();System.out.println(Consumer 启动成功);System.in.read();8.1consumer消费流程上面的代码看起来会稍微比较简单绝大多数核心流程都在rocketmq内部完成在完成建立连接订阅消息和监听消息之后其核心消费流程如下首先会或获取consumerGroup的分配结果正常来说是一个消费者会对应一个messageQueue但是在某个消费者挂了之后会触发rebalance这样就有可能一个消费者会消费多个messageQueue但是这个主要由consumerGroup进行分配consumer主要拉取分配结果即可。再获取到指定的messageQueue之后会根据在consumerGroup中维护的对应messageQueue的offset对messageQueue进行消费上面讲解过messageQueue是一个逻辑队列所以consumer会根据messageQueue获取到真实的物理队列consumerQueue通过对应的consumerQueue中获取commitlogOffset起始位置、msgSize消息大小、以及一些tagHashCode再根据consumerQueue索引获取的信息获取commitlog的真实数据通过这个consumerQueue减少commitlog的全量扫描consumer根据获取的对应消息进行消费开发人员可以对这些数据进行相应的业务处理比如发送im消息等最后消费完成之后consumer会向broker提交新的offsetconsumer本身是无状态的不会对这些offset数据保存offset的更新是以消息的条数为基数单位。9Offset偏移量上面基本已经将所有的流程讲完了比如生产者发送消息commitlog存储消息consumeGroup定制消费规则consumer执行消费消息consumeQueue快速定位消息messageQueue决定消息顺序和边界等。在消息被消费的时候消费者如何知道消息已经消费到了哪个位置呢那么就需要这个offset偏移量来记录。9.1offset核心概念在整个rocketmq设计中offset是整个消费系统的核心组成通过这个offset实现高可用和高性能。offset的核心本质就是一个指针用于记录消息消费的位置上面也谈到了消息的messageQueue和consumerQueue都是以每一条消息为单位边界因此显而易见在offset的单位也是以消息的条数为单位比如某个topic下面的某个messageQueue消费到了第N条消息那么这个offsetNoffset是以topic、messageQueue、consumerGroup为单位进行划分这三者为一个基本单位如下面的例子只要这三者任意一个不一样那么在broker中就得记录相应的offset(Topic,MessageQueue,ConsumerGroup)-Offset# 订单主题 orderTopic ├─MessageQueue-0│ ├─ConsumerGroup-A:Offset100│ └─ConsumerGroup-B:Offset0├─MessageQueue-1│ ├─ConsumerGroup-C:Offset150│ └─ConsumerGroup-D:Offset50offset属于consumerGroup而不属于某一个consumerconsumer可能会挂掉或者动态的扩容缩容等因此offset属于consumerGroup才能保证其连续、稳定以及可恢复等功能。offset统一由broker保管而不保存在consumer端本地防止consumer端出现丢失问题因此每次消息被成功消费之后consumer端需要提交offset偏移量到broker中。如果consumer中出现rebalancemessageQueue可能会归属于不同的consumer但是对应的offset是不会变化即使是有了新的consumer也会继续按照这个offset位置进行消费。10核心组件设计10.1组件设计动机组件类型是否物理存在主要职责为什么这样设计如果反过来会发生什么CommitLog物理存储✅ 磁盘文件存储真实消息内容Body 属性顺序追加写最大化磁盘吞吐减少随机 IO拆成多队列文件会导致随机写、吞吐骤降ConsumeQueue物理索引✅ 磁盘文件按 MessageQueue 维度索引 CommitLog快速定位消息避免扫描大 CommitLog无索引将导致消费性能不可接受MessageQueue逻辑概念❌ 不直接存储定义消息归属、顺序边界、消费单元支撑顺序消费、Rebalance、扩容若为物理文件Rebalance/扩容需搬数据Topic逻辑概念❌ 不直接存储业务分类、消息隔离解耦业务与底层存储若存数据会导致存储与业务强耦合Offset逻辑状态❌元数据记录消费进度精确恢复消费位置无 offset 只能全量重复消费ConsumerGroup逻辑概念❌定义消费语义集群/广播支撑多消费者协同10.2设计动机和技术收益设计目标采用的结构带来的收益高吞吐写入CommitLog 顺序写单机百万级 TPS快速定位消息ConsumeQueue 索引O(1) 定位无需扫描顺序消费MessageQueue单队列天然 FIFO高效 RebalanceMessageQueue逻辑只调整关系不搬数据Broker 可扩展Topic Queue 路由横向扩容低成本消费可恢复Offset精确断点续消费11总结上面已经系统的讲述了rocketmq的核心组件以及每个核心组件的作用和原理通过这些组件为整个系统带来高可用和高性能。接下来对这些组件做一个最终总结namaserver作为注册中心记录所有topic和messageQueue的信息producer作为生产者获取nameserver相关信息指定naveserver和topic将消息发送到brokertopic作为一个逻辑概念相当于对所有messageQueue进行业务分类实现业务与业务之间的隔离commitlog用于存储所有的真实消息数据不会区分具体的topic和messageQueue以顺序追加写的方式存储consumeQueue是commitlog的视图索引以messageQueue为维度设计的可以用于快定位存储在commitlog的数据messageQueue是consumeQueue的逻辑写照messageQueue和consumeQueue根据队列索引一一对应用于定义消息顺序和边界consumerGroup消费者组不执行具体的消息消费而是用于制定消费规则管理offset、定义消费模式、指定consumer和messageQueue之间的关系consumer是消息的真正执行者获取consumerGroup相关信息之后通过offset偏移量获取相关信息随后执行消息消费、提交偏移量。