1. 核心功能
1.1 MQ介绍
RocketMQ基本组件
- Topic: 消息归类的基本单元
- Queue: 消息队列
- Producer
- Consumer
- ConsumerGroup:
- NameServer:可以理解为注册中心,负责更新和发现Broker
- Broker集群:Broker 可以有一个或多个,每一个Brocker就是一个Kafka实例(RacketMQ实例)
1.2 RocketMQ环境搭建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| docker pull apache/rocketmq:5.1.0
docker network create rocketmq
mkdir -p /data/rocketmq/nameserver/{bin,logs}
chmod 777 -R /data/rocketmq/nameserver/*
docker run -d \ --privileged=true --name rmqnamesrv \ apache/rocketmq:5.1.0 sh mqnamesrv
docker cp rmqnamesrv:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh /data/rocketmq/nameserver/bin/
docker rm -f rmqnamesrv
docker run -d --network rocketmq \ --privileged=true --restart=always \ --name rmqnamesrv -p 9876:9876 \ -v /data/rocketmq/nameserver/logs:/home/rocketmq/logs \ -v /data/rocketmq/nameserver/bin/runserver.sh:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh \ apache/rocketmq:5.1.0 sh mqnamesrv
1. -e "MAX_HEAP_SIZE=256M" 设置最大堆内存和堆内存初始大小 2. -e "HEAP_NEWSIZE=128M" 设置新生代内存大小
docker logs -f rmqnamesrv
mkdir -p /data/rocketmq/broker/{store,logs,conf,bin} chmod 777 -R /data/rocketmq/broker/*
vim /data/rocketmq/broker/conf/broker.conf
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
namesrvAddr = 192.168.56.2:9876
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1 = 192.168.56.2
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
deleteWhen = 04
fileReservedTime = 72
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
tlsTestModeEnable = false
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
docker run -d \ --name rmqbroker --privileged=true \ apache/rocketmq:5.1.0 sh mqbroker
docker cp rmqbroker:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh /data/rocketmq/broker/bin
docker rm -f rmqbroker
docker run -d --network rocketmq \ --restart=always --name rmqbroker --privileged=true \ -p 10911:10911 -p 10909:10909 \ -v /data/rocketmq/broker/logs:/root/logs \ -v /data/rocketmq/broker/store:/root/store \ -v /data/rocketmq/broker/conf/broker.conf:/home/rocketmq/broker.conf \ -v /data/rocketmq/broker/bin/runbroker.sh:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh \ -e "NAMESRV_ADDR=rmqnamesrv:9876" \ apache/rocketmq:5.1.0 sh mqbroker --enable-proxy -c /home/rocketmq/broker.conf
docker logs -f rmqbroker
docker pull apacherocketmq/rocketmq-dashboard:latest
docker run -d \ --restart=always --name rmq-dashboard \ -p 8080:8080 --network rocketmq \ -e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ apacherocketmq/rocketmq-dashboard
docker logs -f rmq-dashboard
|
整合springboot
1.3 RocketMQ高可用集群搭建
1.3.1 集群各个角色介绍
1.3.2 双主双从搭建
1.4 消息发送样例
1.4.1 同步消息
同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式

1.4.2 异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知
1.4.3 单向消息
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送
1.4.4 延迟消息
消息放入mq后,过一段时间,才会被监听到,然后消费
比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存
1.4.5 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。
可能大家会有疑问,mq不就是FIFO吗?
rocketMq的broker的机制,导致了rocketMq会有这个问题. 因为一个broker中对应了四个queue
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。
一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。
1.4.6 批量消息
Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费
1.4.7 过滤消息
Rocketmq提供消息过滤功能,通过tag或者key进行区分
我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待
1.4.8 事务消息
它可以被认为是一个两阶段的提交消息实现,以确保分布式系统的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。
2. 项目实战
2.1 项目背景介绍 - 电商高可用MQ实战
环境搭建
springboot
Dubbo
Zookeeper
RocketMQ
MySQL
2.2 功能分析
2.3 下单功能, 保证各个服务的数据一致性
2.4 确认下单功能,通过消息进行数据分发
2.5 整体联调
3. 高级功能和源码分析