MQ
2021-05-19 19:26:29 35 举报
AI智能生成
RibbitMQ
作者其他创作
大纲/内容
MQ(Message Queue)
作用:是在消息传输过程中保存消息的容器,多用于分布式系统之间进行通信
组成
提供者
中间件(broker)
消费者
优势
应用解耦
高可扩展 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
异步提速
高性能 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。
提高了应用程序的响应时间。
提高了应用程序的响应时间。
子主题
削峰填谷
高可用
劣势
系统可用性降低
系统负责度提高
一致性问题
使用条件
生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明
明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能
明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能
容许短暂的不一致性。
确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
AMQP 和 JMS
AMQP
Advanced Message Queuing Protocol(高级消息队列协议)
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。
这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
JMS
JavaMessage Service(应用程序接口)
是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,
或分布式系统中发送消息,进行异步通信。
或分布式系统中发送消息,进行异步通信。
区别
JMS是定义了统一的接口,来对消息操作进行统一 AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言 AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式; AMQP的消息模式更加丰富
RabbitMQ
定义:是通过erlang语言编写的,基于AMQP协议的开源的高性能的消息队列产品
作用:系统间的通信
优势:消息延迟性最低(微妙级别)
自由主题
Broker
中间件
Virtual host
虚拟主机 用于数据的隔离
Connection
连接
Channel
通道
Exchange
交换机 分发消息到具体的队列
Queue
队列 暂存信息
Binding
绑定 的动作,用于绑定交换机和队列
rotting key: 路由key
工作队列模式
producer(生产者)
简单模式
模式特征:提供者 -> 队列 -> 单个消费者
Work queues 工作队列模式
模式特征: 提供者 -> 队列 -> 多个消费者
Pub/Sub 订阅模式
模式特征: 提供者 -> 交换机 -> 多个队列 -> 多个消费者
Routing 路由模式
模式特征:提供者 -> 交换机 -> 路由绑定 ->多个队列 -> 多个消费者
Topics 通配符模式
模式特征:提供者 -> 交换机 -> 路由绑定可用通配符(*,#) ->多个队列 -> 多个消费者
代码解释
导入RabbitMQ依赖
创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
设置参数
setUsername
连接用户名
setPassword
连接密码
setHost
主机地址(默认为主机地址)
setPort
连接端口号
setVirtualHost
虚拟主机名称(默认为 /)
创建连接
Connection connection=工厂对象.newConnection()
创建频道(Channel)
Channel channel= 连接对象.createChannel()
声明队列
channel.queueDeclare(参数1,参数2,参数3,参数4,参数5)
设置参数
参数1(String):队列名称
参数2(boolean):是否定义持久化队列
参数3(boolean):是否独占本次连接
参数4(boolean):是否在不使用的时候自动删除队列
参数5(Map):队列其它参数
创建交换机
channel.exchangeDeclare(参数1,参数2,参数3,参数4,参数5,参数6)
设置参数
参数1(String) : 交换机名称
参数2(enum) : 交换机类型
DIRECT("direct") : 定向
FANOUT("fanout") : 扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic") : 通配符的方式
HEADERS("headers") : 参数匹配
DIRECT("direct") : 定向
FANOUT("fanout") : 扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic") : 通配符的方式
HEADERS("headers") : 参数匹配
参数3(boolean) : 是否持久化
参数4(boolean) : 自动删除
参数5(boolean) : 内部使用(比如使用erlang语言进行一些内部的开发)。 一般false
参数6(Map) : 参数列表
绑定交换机
queueBind(参数1,参数2,参数3)
设置参数
参数1(String) : 队列名称
参数2(String) : 交换机名称
参数3(String) : 路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
如果交换机的类型为fanout ,routingKey设置为""
生产者
发送消息
channel.basicPublish(参数1,参数2,参数3,参数4)
设置参数
参数1(String):交换机名称,如果没有指定则使用默认Default Exchage
参数2(String):路由key,简单模式可以传递队列名称
参数3(BasicProperties):消息其它属性
参数4( byte[ ] ):消息内容
消费者
创建消费者;并设置消息处理
Consumer 是个接口 无法直接创建对象
通过new DefaultConsumer 来创建Consumer对象
但是DefaultConsumer是个空实现
所以通过匿名内部类来重写handleDelivery方法
handleDelivery方法 是个回调方法 当收到消息后,会自动执行
通过new DefaultConsumer 来创建Consumer对象
但是DefaultConsumer是个空实现
所以通过匿名内部类来重写handleDelivery方法
handleDelivery方法 是个回调方法 当收到消息后,会自动执行
handleDelivery(参数1,参数2,参数3,参数4)
参数1(String):consumerTag 消息者标签,在channel.basicConsume时候可以指定
参数2(Envelope ):envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
参数3:properties 属性信息
参数4( byte[ ] ):body 消息
监听消息
channel.basicConsume(参数1,参数2,参数3)
设置参数
参数1(String) :队列名称
参数2(boolean) :是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
参数3(Consumer) :消息接收到后回调
路由key
通配符
系统的名字 . 日志的级别
# 多个单词
* 一个单词
0 条评论
下一页