一、RabbitMQ的深入理解和最简单的用途说明
如何在新的系统中使用RabbitMQ.
系统设计的两个重大问题.
第一条要满足未来的业务需求的不断变化和增加. 也就是可扩展性.
第二条要满足性能的可伸缩性. 也就是可集群性…通过增加机器能处理更多的请求
第三条要解耦合.
如果不解耦合, 未来业务增加或变更的时候你还在修改3年前写的代码.试问你有多大的把握保证升级好系统不出问题? 如何可以写新的代码而不用修改老代码所带来的好处谁都知道…
第四条简单易懂.
以上4条在任何一个系统中都要遵循的原则. 以前是无法做到的. 自从有了MQ以后. 这些都可以同时做到了.
以前的设计理念是把系统看作一个人,按照工作的指令从上到下的执行.
现在要建立的概念是, 把系统的各个功能看作不同的人. 人与人之间的沟通通过消息进行交流传递信息…
有了MQ以后把一个人的事情分给了不同的人, 分工合作所带来的好处是专业化, 并行化. 当然也引入了一些麻烦,性能开销多一些, 工作任务的完整性不能立即得到反馈.幸好我们可以通过最终一致性.来解决这个麻烦的问题…
下面进入正题.
第一个问题RabbitMQ是如何支持可扩展性的.
如上图, 寄件人P是系统的一个功能模块. 用来发送消息. 一般是在某些重要的业务状态变更时发送消息. 例如: 新订单产生时, 订单已打包时, 订单已出库时, 订单已发出时.
那么当事件 新订单产生时, 我们需要把这个信息告诉谁呢? 给财务? 还是给仓库发货?
这个地方最大的重点是. 当事件产生时. 根本不关心. 该投递给谁.
我只要把我的重要的信息投到这个乱七八糟的MQ系统即可. 其它人你该干嘛干嘛. 反正我的任务完成了. (有没有甩手掌柜的感觉..)
我只要告诉系统,我的事件属于那一类.
例如: “某某省.某某市.某某公司.产生新订单”
那么这个地址就属于 投递地址.. 至于这个地址具体投到哪个邮箱那是邮局的事情.
当然还有一些具体的订单内容也属于要告诉系统的内容.
那么下一个问题来了, 邮局怎么知道 你的这个消息应该投递给谁?
参考我们现实世界中的邮寄系统.是默认的省市县这么投递的. 这是固定思维.
但是我们的MQ系统中不是这样的. 是先有收件人的邮箱. (队列Queue). MQ才能投递. 否则就丢弃这个信息…
所以MQ系统应该先有收件人的邮箱 Queue 也就是队列. 才能接收到信息.
再有邮局
再有发信息的人.
RabbitMQ能实现系统扩展的一个重要功能在于, 可以两个邮箱收同一个地址的信.
翻译成专业的话 RabbitMQ 可以 两个队列Queue订阅同一个RoutingKey的信息..
RabbitMQ在投递的时候,会把一份信息,投递到多个队列邮箱中Queue…
这是系统可扩展性的基础.
第二个问题RabbitMQ如何满足性能的可伸缩性. 也就是可集群性
先上图
从上图, 可以看到. 性能扩展的关键点就在于 订阅人C1, 订阅人C2 轮流收到邮箱队列里面的信息, 订阅人C1和订阅人C2收到的信息内容不同, 但都属于同一类….
所以. 订阅人C1和订阅人C2是干同一种工作的客户端.用来提高处理能力.
上面说完了,如何使用. 下面再分析一下几个关注点.
如果订阅人的down机了. 信息会丢失吗?
事实上是不会的. 只要有邮箱(队列Queue)存在.信息就一直存在, 除非订阅人去取走.
如果订阅人一直down机, 邮箱队列能存多少信息?会不会爆掉?
理论上和实际上都是有上限的不可能无限多. 具体多少看硬盘吧..我没测到过上限.
我这篇文章并不打算讲解邮局的4种投递模式. 有其它文章讲的很好. 我只打算使用topic这种模式. 因为它更灵活一些.
再说一下我的另外两个观点.
不要在业务程序中用代码定义创建 邮局 ExChange. 和邮箱Queue队列 这属于系统设计者要构架的事情. 要有专门独立的程序和规则去创建. 这样可以统一管理事件类型.避免过多的乱七八糟的RoutingKey混乱.
我的理解认为
消息系统的分布式可扩展的实现在于消息广播, 集群性的实现在于邮箱队列.
RabbitMQ是先广播后队列的.
Exchange: 就是邮局的概念等同于 中国邮政和顺丰快递、
routingkey: 就是邮件地址的概念.
queue: 就是邮箱接收软件,但是可以接收多个地址的邮件,通过bind实现。
producer: 消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
二、为什么要使用RabbitMQ?RabbitMQ有什么优点?
原因有三:
1.解耦
2.异步
3.削峰
三、rabbitmq的routingkey的作用
对于消息发布者而言它只负责把消息发布出去,甚至它也不知道消息是发到哪个queue,消息通过exchange到达queue,exchange的职责非常简单,就是一边接收发布者的消息一边把这些消息推到queue中。
而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey,下面有一张关系图,p(发布者) —> x(exchange) bindding(绑定关系也就是我们的routingkey) 红色代表着queue
我们来看代码:
在消息的生产者端:
@Component public class RabbitOrderSender { //自动注入RabbitTemplate模板类 @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; //回调函数: confirm确认 final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); String messageId = correlationData.getId(); if(ack){ //如果confirm返回成功 则进行更新 brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date()); } else { //失败则进行具体的后续操作:重试 或者补偿等手段 System.err.println("异常处理..."); } } }; //发送消息方法调用: 构建自定义对象消息 public void sendOrder(Order order) throws Exception { // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中 rabbitTemplate.setConfirmCallback(confirmCallback); //消息唯一ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); rabbitTemplate.convertAndSend("order-exchange", "order.ABC", order, correlationData); } } 利用rabbitTemplate(import org.springframework.amqp.rabbit.core.RabbitTemplate;需要在pom.xml中导入amqp的依赖)的convertAndSend方法就可以发送,这里order-exchange为交换机exchange,order.ABC为routingKey,并没有指定对应消息需要发往哪个队列,还有指定消息回调。
在消息的消费者端:
@Component public class OrderReceiver { //配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true"), exchange = @Exchange(name="order-exchange",durable = "true",type = "topic"), key = "order.*" ) ) @RabbitHandler//如果有消息过来,在消费的时候调用这个方法 public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws IOException { //消费者操作 System.out.println("---------收到消息,开始消费---------"); System.out.println("订单ID:"+order.getId()); /** * Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag, * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。 * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。 */ Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); /** * multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认 * 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认 */ boolean multiple = false; //ACK,确认一条消息已经被消费。不然的话,在rabbitmq首页会有Unacked显示为未处理数1. channel.basicAck(deliveryTag,multiple); } }
本期内容就到这里啦~以上内容均可在 方包博客「http://fang1688.cn」 网站直接搜索名称访问哦。欢迎感兴趣的小伙伴试试,如果本文对您有帮助,也请帮忙点个 赞 + 在看 啦!❤️
欢迎大家加入方包的「优派编程」学习圈子,和多名小伙伴们一起交流学习,向方包 1 对 1 提问、跟着方包做项目、领取大量编程资源等。Q群「891029429」欢迎想一起学习进步的小伙伴~
另外方包最近开发了一款工具类的小程序「方包工具箱」,功能包括:抖音、小红书、快手去水印,天气预报,小说在线免费阅读(内含上万部热门小说),历史今天,生成图片二维码,图片识别文字,ai伪原创文章,数字摇号抽奖,文字转语音MP3功能...
定期分享 it编程干货
⬇️ 点击链接阅读原文直达 方包博客
评论抢沙发