MQ之ActiveMQ
2021-07-07 21:35:14 0 举报
AI智能生成
ActiveMQ入门到精通
作者其他创作
大纲/内容
入门概述
前言
1),在何种情景下使用了消息中间件?
2),为什么要在系统中引入消息中间件?
从生活到实际生产案例
上述问题引出的产生背景
系统之间直接调用实际工程落地和存在的问题
系统之间的直接调用,耦合比较严重
面对大流量并发时,容易被冲垮
等待同步存在性的问题
你如何解决呢?
需要有一种东西能摆平上述情况
是什么?
定义
面向消息的中间件 (message-oriented middleware) MOM
是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并给予数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用的解耦,弹性伸缩,冗余存储,流量削峰,异步通信,数据同步等功能
执行流程如下
发送者把消息发送给消息服务器,消息服务器消息放在若干 队列(一对一)/主题(topic 一对多)中,在合适的时候,消息服务器会把消息转发给接受者,在这个过程中,发送者和接受者是异步的,也就是发送无需等待,而且发送者和接受者的声明周期也没有必然关系,尤其在发布 pub/订阅 sub模式下,也可以完成一对多的通信
特点
采用异步处理模式
消息发送者可以发送一个消息而无需等待,消息发送者将消息发送到一个虚拟的通道(主题/队列)上,消息接收者则订阅或监听该通道,一条消息可能最终发送给一个或多个消息接收者,这些接收者都无需对消息发送者做出同步回应,整个过程是异步的
应用系统之间解耦
发送者和接受者不必了解对方,只需要确认消息即可
发送者海和接受者不必同时在线
能干嘛?
解耦
削峰
异步
在哪下?
activeMQ
怎么玩?
主要的功能
实现高可用,高性能,可伸缩,易用和安全的企业
异步消息的消费和处理
控制消息的消费顺序
可以和spring/springboot整合简化编码
配置集群容错的mq集群
...
ActiveMQ(默认端口61616)安装和控制台
普通启动activeMQ
进入/bin/目录 ./activemq start
查看 activemq 是否启动的几种方式
ps -ef|grep activemq
netstat -anp |grep 61616
lsof -i:61616
关闭avtivemq
/bin/目录下 ./activemq stop
带日志的启动的方式
在 activemq目录创建日志文件logs
在/bin/启动带日志方式
./activemq start > /opt/apache-activemq-5.16.0/logs/run_activemq.log
activemq 控制台
ip:8161
默认用户名和密码都是 8161
如果打开了防火墙的8161端口还是无法访问,则需 到 conf文件夹的jetty.xml进行修改
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="127.0.0.1"/>#127.0.0.1 修改为虚拟机的ip
<property name="port" value="8161"/>
</bean>
<!-- the default port number for the web console -->
<property name="host" value="127.0.0.1"/>#127.0.0.1 修改为虚拟机的ip
<property name="port" value="8161"/>
</bean>
备注
采用了 61616 端口提供的JMS服务
采用 8161 端口提供了管理控制台服务
Java编码实现ActiveMQ通讯
idea 创建 maven工程
pom.xml
JMS 编码总体架构
jms -java message Service 注意: Destination 目的地有俩种 (队列或者主题)
jms 和 jdbc 连接相似
粗说目的地 Destionation队列和主题
队列
queue (一对一)
主题
topic (一对多)
在点对点的消息传递中,目的地被称为队列(Queue)
生产者
消费者
receive(4000L) 带时间和 receive()不带时间的区别
同步阻塞 receive() ,订阅者或接受者调用 MessageConsumer 的 receive() 方法来接收消息, receive 方法在接受消息之前(或超时之前)将一直阻塞
消费者监听
System.in.read();
如果不加这句话 可能还没有连接上activemq 就已经执行关闭连接的代码,这句话就是一直在那监听着 ,有消息了消费,没消费了等待
编码小总结
JMS 开发的基本步骤
1),创建 connectionFactory
2), 通过 connectionFactory 创建 connection
3), 启动 connection
4), 听过 conection 创建 session
5), 创建 jms distiation
6), 创建 jms producer 或者创建 jms message 并设置 distiation
7), 创建jms consumer 或者注册一个 jms message listener
8), 发送或者接受 jms message
9), 关闭所有的 jms 资源
俩种消费方式
同步阻塞方式 receive()
订阅者或接受者调用 messageConsumer 的receive() 方法来接收消息,receive 方法能够接受到消息之前(或超时之前)将一直阻塞
异步非阻塞方式(监听器 onMessage())
订阅者或接受者通过messageConsumer的 setMessageListener (MessageListener listener)注册一个消息监听器
当消息到达之后,系统自动调用监听器 messageListener的onMessage(Message message)方法
在发布订阅消息传递中,目的地被称为主题(topic)
发布主题生产者
发布主题消费者
先启动订阅在启动生产,不然发送的消息是废消息
控制台
总结
两大模式特性
两大模式比较
比较项目
工作模式
有无状态
传递完整性
处理效率
topic
订阅发布模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者将会收到消息
无状态
如果没有订阅者将会被丢弃
由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异
queue
负载均衡模式,如果当前没有消费者,消息不会丢失,如果有多个消费者,那么一条消息只能发送给一个消费者,并且要求消费者ack信息
Queue数据默认会在mq服务器上以文件形式存储
消息不会被丢弃
由于一条消息只发送一个消费者,所以就算消费者再多,性能也不会有明显降低,当然不同消息协议的具体性能也有差异的
JMS 规范和落地产品
是什么?
什么是JavaEE
JavaEE 是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准
JMS只是JavaEE的一个子模块
什么是Java消息服务
Java消息服务指的是 两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用的接口,包括创建,发送,读取消息等,用于支持JAVA应用程序的开发
在JavaEE中,当两个应用程序使用 JMS进行通信时,它们之间并不是直接通信的,而是通过发送一个共同的消息收发服务组件关联起来已达到解耦/异步/削峰的效果
MQ中间件的其它落地产品
activeMQ
RabbitMQ
Kafka
RocketMQ
JMS的组成结构和特点
JMS provider
实现JMS 接口和规范的消息中间件,也就是我们的 MQ服务器
JMS producer
消息生产者,创建和发送JMS消息的客户端应用
JMS consumer
消息接收者,接收和处理JMS消息的客户端使用
JMS message
消息头
JMSDestination
消息发送的目的地,主要是指 Queue 和 Topic
JMSDeliveryMode
持久模式和非持久模式,一条持久性的消息: 应该被传递 一次仅仅一次,这就意味着如果 JMS提供者出现故障,该数据并不会丢失,它会在服务器回复之后再次传递
一次非持久化的消息: 最多会传一次,这意味着服务器出现故障和宕机,该消息将永久消息
JMSExpiration
可以设置消息在一定时间后过期,默认是用不过期的,消息过期时间,灯油 Destination 的 send 方法中的 timeToLive 值加上发送时刻的 GMT时间值
如果 timeToLive 值等于0,则 JMSExpiration 被设为0 ,表示该消息永不过期
如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息已经被清除
JMSPriority
消息优先级.从0-9十个级别,0-4是普通消息,5-9是加急消息
JMS不要求MQ严格遵照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达,默认4级
JMSMessageID
唯一识别每个消息的表示由 MQ产生
消息体
封装具体的消息数据
5种消息体格式
TextMessage
普通字符串消息,包含一个String
MapMessage
一个Map类型消息,key为String类型,而值为Java的基本数据类型
StreamMessage
Java数据流消息,用标准流操作来顺序填充和读取
ObjectMessage
对象信息,包含一个可序列化的Java对象
BytesMessage
二进制数组消息,包含一个byte[]
发送和接受的消息类型必须一致对应
消息属性
如果需要除消息头字段意外的值,那么可以使用消息属性
识别/去重/重点标注等操作非常有用的方法
是什么
JMS的可靠性
PERSISTENT 持久化
参数设置说明
非持久
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
非持久化: 当服务器宕机,消息不存在
持久
textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
持久化: 当服务器宕机,消息依然存在
持久的 Queue
这是队列的默认传送模式,此模式保证这些消息只能被传递一次和成功使用一次,对于这些消息,可靠性是优先考虑的因素
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些信息
持久的 Topic
代码
先启动订阅再启动生产
持久的发布主题生产者
持久的订阅注意消费者
总结
1),一定要先运行一次消费者,等于向 MQ注册,类似于我订阅了这个主题
2), 然后再运行生产者发送信息,此时
3),无论消费者是否在线,都会收到,不在线的话,下次连接的时候,会把没有收过的信息都接受过来
类似于weiixn公众号订阅发布
Transaction 事务
producer 提交时的事物
false
只要执行 send ,就进入到队列中
关闭事务,那第2个签收参数的设置需要有效
true
先执行 send 再执行 commit ,消息才被真正的提交到队列中
消息需要批量发送,需要缓冲区处理
事务偏生产者/签收偏消费者
Acknowledge : 签收
非事务
自动签收(默认)
Session.AUTO_ACKNOWLEDGE
手动签收
Session.CLIENT_ACKNOWLEDGE
acknowledge() 进行签收
允许重复消息
Session.DUPS_OK_ACKNOWLEDGE
事务
生产者事务开启,只有commit后才能将全部消息变为消费
消息生产者
消息消费者
签收和事务关系
在事务性会话中,当一个事务被成功提交则消息被自动签收
如果事务回滚,则消息会被再次传递
非事务性会话中,消息何时被确认取决于创建会话时的应答模式
JMS的点对点总结
点对点模型是基于 队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输称为可能,和我们平时发短信类似。
1: 如果在session关闭时有部分消息已被收到但是还没有被签收(ack),那当消费者下次连接到队列时,这些消息会被再次接收
2: 队列可以长久地保存消息直到消费者接收消息,消费者不需要因为担心消息会丢失而时刻和队列保持激活状态,充分体现了异步传输模式的优势
JMS的发布订阅总结
非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和 MQ保持连接状态才能收到发送到某个主题的消息
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到
一句话: 先要订阅注册才能接受到发布,只给订阅者发布消息
持久订阅
客户端首先向 MQ注册一个自己的身份ID识别号,当这个客户端处于离线状态时,生产者会为这个ID保存所有发送到主题的消息,当这个客户端再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息
非持久订阅状态下,不能恢复或重新派送一个未签收消息
持久状态下,才能恢复或重新派发一个未签收的消息
用哪个?
当所有的消息必须被接收,用持久订阅
当丢失消息能够容忍,则使用非持久订阅
ActiveMQ的Broker
是什么?
相当于一个ActiveMQ服务器实例,说白了,Broker 其实就是实现了用代码的形式启动 ActiveMQ将MQ嵌入到Java代码中,以便随时启动,在用的时候再去启动可以节省资源,也保证了可靠性
类似 redis 启动多个端口的redis实例 6379, 6380和 6381
cp activemq.xml activemq02.xml
./activemq start xbean:file:/opt/apache-activemq-5.16.0/conf/activemq02.xml
嵌入式Broker
理论
用ActiveMQ Broker 作为独立的消息服务器来构建Java应用
ActiveMQ 也支持在vm 中通信基于嵌入式的 broker ,能够无缝的集成其它Java 应用
pom.xml
EmbedBroker
idea使用 jps -l 查看正在运行的线程
队列验证
Spring整合 ActiveMQ
maven 修改,添加 Spring 支持的 JMS 包
Spring配置文件
队列
生产者
消费者
主题
只需修改 applicationContext.xml添加一个 topic实例bean
在Spring 里面实现消费者不启动,直接通过配置监听完成
Spring 配置文件
新建的监听类
只需要启动生产者,消费者不用启动,自动会监听记录
SpringBoot 整合 ActiveMQ
队列
生产者
pom.xml
application.yml
配置类ConfigBean
生产者类
Test测试类
主启动类
新需求: 要求每隔3s 向 MQ推送消息,一下定时发送 Case
producerMsgScheduled()
消费者
pom.xml 和生产者pom.xml相同
application.yml 和 生产者 application.yml 相同
消费者类
主启动类
主题/发布订阅
topic 生产者
application.yml
pub-sub-domain: true # false 是队列 true 是主题 不写就是默认就是false队列
生产类 new ActiveMQTopic(myTopic)
topic 消费者
application.yml
pub-sub-domain: true # false 是队列 true 是主题 不写就是默认就是false队列
其它不变
ActiveMQ的传输协议
面试题
默认的 61616 端口如何修改
你生产上的链接协议如何配置的? 使用的是 tcp?
官网
activemq官网
是什么
activemq 支持 client-broker 通讯协议: TCP , NIO , UDP , SSL , HTTP , VM\
其中配置 Transport Connation 的文件在 activeMQ 的 conf/activemq.xml 中的 <transportConnectors> 标签中
URL 描述信息的头部都采用协议名称例如
1), 描述amqp 协议监听端口时,采用URL描述格式为 amqp://......
2), 描述stomp协议监听端口时,采用URL描述格式为 stomp://......
3), 唯独openwire协议监听端口时,URL采用确实tcp://...... (这是因为 AcyiveMQ 中默认的消息就是 openwire)
有哪些
Transmission Control Protocol (TCP) 默认
1), 这个是默认的 Broker 配置,TCP 的 Client 监听端口为 61616
2), 在网络传输数据前,必须要序列化数据,消息通过一个叫 wire ptotocol 来序列化成字节流
默认情况下activemq把 wire protocol 叫作 openwire 它的目的上是促进网络上的效率和数据快速交互
3), TCP 连接的 url形式为: tcp://hostname:port?key=value
4), TCP传输的优点 :
TCP协议传输可靠性,稳定性强
高效性,字节流方式传递,效率很高
有效性,可用性,应用广泛,支持任何平台
New I/O api protocol (NIO)
1), NIO协议和 TCP 协议类似但 NIO 更侧重于底层的访问操作,它允许开发人员对同一份资源有更多的client 调用和服务端有更多的负载
2), 适合使用NIO的场景
有大量的Client 去连接到 Broker 上,一般情况下,大量的 Client 去连接 Broker 是被操作系统的线程有所限制,因此,NIO的实现比TCP 需要更少的线程去运行,所以建议使用NIO协议
可能对于Broker 有一个很迟钝的网络协议,NIO比 TCP 提供更好的性能
3), nio://hostname:port?key=value
AMQP协议
一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端.中间件不同产品,不同开发语言限值
stomp 协议
流文本定向协议,是一种为 MOM(message oriented Midleware ),面向消息中间件设置的简单文本协议
SSL 协议
连接url形式为 ssl://localhost:61616?trace=false
mqtt 协议
是IBM 开发的一个即时通信协议,有可能成为物联网的重要组成部分,该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当时做传感器和制动器,(比如通过 twitter 让房屋联网) 的通信协议
NIO 案例演示
修改 conf/activemq.xml
<transportConnectors><transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/></transportConnectors>
private static final String ACTIVE_MQ = "nio://192.168.247.111:61618";
NIO案例演示增强
上述NIO性能不错了,如何进一步优化?
问题
URL 格式头以 "nio" 开头,表示这个端口使用以 tcp协议为基础的nio网络 io 模型 但是这样的设置方式,只能使这个端口支持 openwire 协议。
那么我们怎么既让这个端口支持 NIO 网络的IO 模型,又让它支持多个协议呢?
解决
使用 auto 关键字
修改 activemq.xml
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&
wireFormat.maxFrameSize=104857600&
org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&
org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />
wireFormat.maxFrameSize=104857600&
org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&
org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />
使用 + 符号来为端口设置多特性
如果我们即需要某一个端口支持NIO 网络 IO 模型,又需要它支持多个协议
idea 生产者者和消费者配置
private static final String ACTIVE_MQ = "nio://192.168.247.111:61608";
private static final String ACTIVE_MQ = "tcp://192.168.247.111:61608";
其它协议可能出现错误,因为代码不同
ActiveMQ的消息存储和持久化
官网
是什么
面试题
activeMQ 持久化机制
LevelDB
KahaDB
说明
一句话: ActiveMQ 宕机了,消息不会丢失的机制
为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都采用 持久化机制
ActiveMQ 的消息持久化机制,有 JDBC , AMQ , KAHADB , 和 LevelDB ,无论采取哪一种持久化机制,消息的存储逻辑都是一致的
就是在发送者将消息发送后,消息中心首先将消息存储在本地数据文件,内存数据库或者远程数据库等再试图将消息发送给接受者,成功则将消息从存储中删除,失败则继续尝试发送
消息中心启动以后首先要检查指定的 存储位置,如果有未发送成功的消息,则需要把消息发送出去
有哪些
AMQ Message Store
以文件存储,具有写入速度快和容易恢复的特点,消息存储在一个文件中,文件的默认大小为 32 M ,当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为 可删除,在下一个清除阶段,这个文件被删除, AMQ 适用于 activeMQ 5.3之前的版本
KahaDB (默认)
基于日志文件,从 activeMQ 5.4开始默认的持久化插件
kahadb 官网
验证 KahaDB 是持久化存储 /conf/activemq.xml
说明
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址
KahaDB 是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化
数据被追加到 data logs 中,当不需要log文件中的数据时,log文件将会被丢弃
/data/kahadb/里面文件是啥
db -[number].log
存储消息到预定大小的数据记录文件中,当数据文件(>32M)时一个新的文件随之创建,number数值也随之变大,当不再有其它引用到数据文件中的任何消息时,文件会被删除和归档
db.data
包含了持久化的 BTree 索引,索引了消息记录中的消息,它是消息的索引文件,使用 BTree 作为索引指向了 db-[number].log 里面存储的信息
db.free
当前 db.data 文件那些页面是空闲的,文件具体内容是所有空闲页的ID
db.redo
用来进行消息恢复,如果KahaDB 消息存储在强制退出后启动,用于恢复BTree 索引
lock
文件锁,表示当前获得 kahadb 读写权限的 broker
JDBC
消息基于 JDBC 存储
LevelDB
这种文件系统是从 activemq 5.8之后引进的,它和 kahadb 非常相似,也是基于文件的本地数据库存储形式,但是它提供了比 kahandb 更快的持久化
但它不使用自定义的 btree 实现索引预写日志,而是使用 基于levelDB 的索引
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
JDBC Message store with activemq journal
JDBC消息存储
1), 把mysql jar包放在 lib/mysql-connector-java-8.0.22.jar
2), 在 conf/acvtivemq.xml 文件中删掉 kahadb 添加jdbc 连接
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#my-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#my-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
3), 在 conf/activemq.xml 添加 my-ds bean (</broker> 结束之后 和 <import> 之前)
4), sql和建表说明
create database activemq
表
activemq_acks
activemq_lock
activemq_msgs
5), activemq运行测试
如果出现错误 查看activemq 日志 data/activemq.log
6),代码运行验证
一定要开持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
队列
生产
消费
topic
生产
消费
7),数据库情况
一旦运行生产code
点到点(队列)
当 DeliveryMode 设置为 NON_PERSISTENT 时, 消息被保存在内存中
当 DeliveryMode 设置为 PERSISTENT 时,消息保存在 broker 的相应的文件或者 数据库中
而且点对点类型中消息一旦被 consumer 消费就从 broker 中删除
发布/订阅类型
先启动消费者,然后在启动生产者
8),小总结
如果是 Queue ,在没有消费者启动的情况下会把小修存在 activemq_msgs ,表中,只要有任意消费者已经消费过了,消费之后这些数据将会立即被删除
如果是 topic 一般是先启动消费订阅然后再启动生产的情况下会将消息保存在 activemq_acks中
JDBC Message store with ActiveMQ Journal
说明
这种方式克服了 JDBC store 的不足,JDBC每消息过来,都需要去写库和读库
activemq journal 使用高速缓存写入技术,大大提高了效率
当消费者的消费速度能够及时跟上生产者的消息时,journal 文件能够大大减少写入到DB中的消息
怎么玩
修改conf/activemq.xml 文件
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataDirectory="activemq-data"
dataSource="#my-ds"/>
</persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataDirectory="activemq-data"
dataSource="#my-ds"/>
</persistenceFactory>
重新启动activemq start
activemq 持久化总结
持久化消息主要指
mq所在服务器down 了消息不会丢失的机制
activemq 的消息持久化机制
kahadb
基于日志文件
jdbc
第三方数据库
leveldb
基于文件的本地数据库存储
ActiveMQ多节点集群
面试题
引入消息队列之后该如何保证其高可用性
是什么
基于 Zookeeper 和 LevelDB 搭建 activemq 集群,集群仅提供主备方式的高可用集群功能,避免单点故障
zookeeper + replicated + leveldb -store 的主从集群
三种集群方式对比
基于 Shared File System 共享文件系统 kahaDB默认
基于JDBC
基于可复制的LevelDB
本次案例采用 zk+replicated levelDB store
是什么
从 activemq5.9开始,activemq的集群方式取消了传统 master-slave方式,增加了基于 zookeeper+leveldb 的master-slave 实现方式,从5.9版本以后也是官网的推荐
基于 zookeeper 和 leveldb 搭建集群,集群仅提供主备方式的高可用集群功能,避免单点故障
官网集群原理图
使用 zookeeper 集群注册所有的 activemq broker 但只有其中一个 broker 可以提供服务它将被视为 master ,其它的 broker 处于待机状态被视为 slave
如果 master 因故障而不能提供服务 zookeeper 会从 slave 中选举出来一个 broker 充当 master ,slave 连接 master 并同它们的存储过程,slave 不接受客户端连接,所有的存储操作都将被复制到连接至 master 的 slaves
如果master 宕机得到了最新更新的 slave 会成为master ,故障节点在恢复后会重新加入到集群中并连接master 进行slave 模式
所有需要同步的消息操作都将等待存储状态被复制到其它法定节点的操作完成才能完成
所以,,如果你设置了 replicas =3 ,那么法定大小是 (3/2)+1=2,master 将会存储并更新然后等待 (2-1)=1 个slave 存储和更新完成,才汇报 success
有一个 node 要做为观察员存在,当一个新的 master 被选中,你需要至少保证一个法定node 在线以能够找到拥有最新状态的node 这个node 才可以成为新的 master
因此 推荐运行至少3个 replica nodes 以防止一个node 失败后服务中断
部署规划和步骤
1),环境版本
Centos7
JDK1.8
apache-activemq-5.16.0
apache-zookeeper-3.5.8-bin
2), 关闭防火墙并保证 windows 可以ping 通activeMQ服务器
3), 要求具备ZK集群并可以成功启动
参考 zookeeper 集群的创建参考
主机
zookeeper集群端口
amq集群bind 端口
amq消息tcp端口
管理控制台端口
amq节点的安装目录
192.168.247.111
2181
bind=tcp://0.0.0.0:63631
61616
8161
/opt/mq_cluster/mq_node01
192.168.247.111
2182
bind=tcp://0.0.0.0:63632
61617
8162
/opt/mq_cluster/mq_node02
192.168.247.111
2183
bind=tcp://0.0.0.0:63633
61618
8163
/opt/mq_cluster/mq_node03
4),创建3台集群目录(在/opt/目录下)
mkdir mq_cluster
cp -Rf apache-activemq-5.16.0/ mq_node01
cp -Rf mq_node01/ mq_node02
cp -Rf mq_node01/ mq_node03
5),修改管理控制台端口(8161)
mq_node01 文件不做修改使用默认的
修改 mq_node02/conf/jetty.xml 修改为 8162
修改 mq_node03/conf/jetty.xml 修改为 8163
6), hostname 名字映射
vim /etc/hosts
7), activemq 集群配置
修改第一台activemq在 vim /conf/activemq.xml
3个节点的 BrokerName 要求全部一致(这里剩下2个省略)
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
3个节点持久化配置(bind 端口修改)
1),把activemq.xml的kahaDB 持久化注释掉
2),第一台61616配置如下
3),第一台61617配置如下
4),第一台61618配置如下
8),修改各个节点的消息端口(61616 的tcp端口修改)
61616默认不修改
修改mq_node02 和 mq_node03 的conf/activemq.xml
9),按顺序启动3个activemq节点,到这一步的前提是zk集群已经成功启动
创建 activemq 的shell脚本
start
vim activemq_start.sh
chmod u+x activemq_start.sh
./activemq_start.sh
stop
vim activemq_stop.sh
chmod u+x activemq_stop.sh
./ activemq_stop.sh
10), zk节点状态说明
3台zk集群连接任意连接一台
进入 zookeeper的bin目录执行 ./zkCli.sh
ls / 查看里面是否有注册成功的activemq
[activemq, zookeeper]
ls /activemq/leveldb-stores
[00000000000, 00000000001, 00000000002]
如何查看 那一台是master 主机 ,那几个是从机呢
get /activemq/leveldb-stores/00000000000
{"id":"zzyymq","container":null,"address":"tcp://zookeeper-activemq-zgx:63632","position":-1,"weight":1,"elected":"0000000000"}
get /activemq/leveldb-stores/00000000001
{"id":"zzyymq","container":null,"address":null,"position":-1,"weight":1,"elected":null}
get /activemq/leveldb-stores/00000000002
{"id":"zzyymq","container":null,"address":null,"position":-1,"weight":1,"elected":null}
说明: elected 的值不为空,说明这个节点时 master ,其它;两个是 slave
集群可用性测试
activemq 的客户端只能访问 master 的 broker ,其它处于 slave 的broker 不能访问,所以客户端连接的broker 应该使用 failover 协议(失败转移)
当一个activemq 节点挂掉或者一个 zookeeper 节点挂掉,activemq 服务依然正常运转,如果仅剩一个 activemq 节点,由于不能选举出 master ,所以 activemq 不能正常运行
同样的
如果zookeeper 仅剩一个节点,不管activemq 各节点存活,activemq也不能提供正常的服务,(activemq集群的高可用,依赖于zookeeper 集群的高可用)
代码测试
生产者/消费者
static final String ACTIVE_MQ = "failover:(tcp://192.168.247.111:61616,tcp://192.168.247.111:61617,tcp://192.168.247.111:61618)";
private static final String QUEUE_NAME = "queue_closter";
private static final String QUEUE_NAME = "queue_closter";
干掉一台activemq节点,它会自动切换到另外一个活的activemq服务器
3台机器中的activemq 只会有一个mq可以被客户端连接使用,在测试时可以把master 关掉,然后在重试客户端消息发送和消费还可以正常使用,则说明集群搭建正常
如何查看那个activemq 是master 呢
使用 lsof -i:8161(你自己设置的端口)
java 80506 root 167u IPv6 894034 0t0 TCP zookeeper-activemq-zgx:patrol-snmp (LISTEN)
则说明是master主机
杀掉正在运行的 8161端口
kill -9
高级特性和大厂常考重点
1),引入消息队列之后该如何保证其高可用性
zookeeper +leavedb 集群
2),异步投递Async Sends
异步投递
官网
对于一个 slow(慢) consumer ,使用同步发送消息可能出现 producer 堵塞等情况,慢消费适合异步发送
是什么
activemq 支持同步,异步两种发送到 broker ,模式的选择对发送延时有巨大的影响,producer 能达到怎样的产出率 (产出率=发送数据总量/时间) 主要受发送延时有影响,使用异步发送可以显著的提高发送的性能
activemq 默认使用异步发送的模式,除非明确指定使用同步发送的方式和在未使用事务的前提下发送持久化的消息,这2中情况下都是同步发送的
如果你没有使用事务且发送的持久化的消息,每一次发送都是同步发送的且会阻塞 producer 直到 broker, 返回一个确定,表示消息已经被安全的持久化磁盘,确认机制提供了安全的保障,但同时会阻塞客户端带来了很大的延迟
很多高性能的应用,允许在失败的情况下有少量的数据丢失,如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化消息
异步发送
它可以最大化 producer 端的发送效率,我们通常在发送消息比较密集的情况下使用异步发送,它可以很大的提升 producer 性能,不过这也带来了额外的问题
就是需要消耗较多的 client 端内存同时也会导致 broker 端的性能消耗增加
此外它不能有效确保消息发送成功,在 useAsyncSend=true 的情况下客户端需要容忍消息丢失的可能
官网配置
1), cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
2), ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
3), ((ActiveMQConnection)connection).setUseAsyncSend(true);
面试题追问
异步消息如何确认发送成功?
异步发送丢失消息的场景是: 生产者设置 UseAsyncSend=true ,使用 producer.send(msg) 持续发送消息
由于消息不阻塞,生产者会认为所有的 send 的消息均被成功发送至 mq
如果mq 突然宕机,此时生产者端内存中尚未被发送至 mq 的消息都会丢失
所以,正确的异步发送方法是需要接受回调的
同步发送和异步发送的区别就在此
同步发送需要等 send 不阻塞了就表示一定发送成功
异步发送需要接受回执并由客户端再判断一次是否发送成功
代码
3), 延时投递和定时投递
官网
4大属性
AMQ_SCHEDULED_DELAY
延迟投递时间
AMQ_SCHEDULED_PERIOD
重复投递的时间间隔
AMQ_SCHEDULED_REPEAT
重复投递的次数
AMQ_SCHEDULED_CRON
Linux Crom 表达式
案例演示
conf/activemq.xml 文件修改如下
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
在Java 代码里面封装的辅助消息类型 SecheduledMessage
延时投递的producer
延时投递的consumer
4),分发策略
5), activemq 消费重试机制
面试题
具体哪些情况下会引起消息重发
Client 用了 transactions 且在 session 中调用了 rollback()
Client 用了 transactions 且在调用 commit()之前关闭或者没有 commit()
Client 在 Client_AckNowLedge 的传递模式下,在session 中调用了 receover()
请说说消息重发时间间隔和重发次数吗
间隔 1s 次数:6
有毒 Poison ack 谈谈你的理解
一个消息被 redelivedred 超过默认的最大重发次数(默认6次)时,消费端会给mq 发送一个 poison ack 表示这个消息有毒,告诉 broker 不要再发了,这个时候 broker 会把这个消息放到 DLQ(死信队列)
记得给学生讲解2次,上述回答
官网
属性说明
maximumRedeliveries
最大重传次数,达到最大重连次数后抛出异常,为-1 时不限制次数为0时表示不尽兴重传,默认值为6
initialRedeliveryDelay
初始重发延迟时间,默认1000L
代码测试
消费者测试
我们查看localhost:61616 在消费者开启事物不尽兴提交的情况下1次失败的情况下在重发6次会出现死信队列
ActiveMQ.DLQ
如何改变默认重发策略
//1 连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ);
//2 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(queuePolicy);
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ);
//2 建立连接
Connection connection = activeMQConnectionFactory.createConnection();
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(queuePolicy);
6), 死信队列
官网
是什么?
activemq 中引入了 死信队列(Dead letter queue) 的概念,即一条消息再被重发了多次后(默认重发6次 redeliveryCount=6)将会被activemq 移入死信队列,开发人员可以在这个queue中查看出错的消息,进行人工干预
死信队列的使用,处理失败的消息
一般生产环境中使用 activemq 的时候设计两个队列,1个是核心业务队列,一个是死信队列
核心业务队列,例如来让订单系统发送订单信息,然后另一个死信队列就是用来处理异常情况
假设第三方物流系统故障此时无法请求,那么仓储系统每次消费一条订单信息,尝试通知发货和配送都会遇到对方的接口报错,此时仓储啥系统就可以把这条消息拒绝访问或者标识失败,一旦标志这条消息处理失败后,mq就会把这条消息转入提前设置好的死信队列
activemq 死信队列的配置介绍
sharedDeadLetterStrategy
将所有的 deadLetter 保存在一个共享队列中,这是activemq broker 端默认的策略
共享队列默认为 ActiveMQ.DLQ 可以通过 deadLetterQueue 属性来定
individualDeadLetterStrategy
把 deadLetter 放入各自的死信队列中
对于queue 而言 ,死信队列通道的前缀默认为 ActiveMQ.DLQ.Queue
对于topic 而言,死信队列的前缀为 ActiveMQ.DLQ.Topic
比如队列 Order 那么它的死信队列 通道为 ActiveMQ.DLQ.Queue.Order
我们使用 queuePerfix 和 topicPrefix 来指定上述前缀
配置案例
自动删除过期消息
存放非持久消息到死信队列中
7),如何保证消息不被重复消费?幂等性问题你谈谈
网络延迟传输中,会造成mq重试,在重试过程中,可能会造成重复消费
1),如果消息是做数据库的插入操作.给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据
2),如果上面两种情况还不行,准备一个第三方服务来做消费记录,以redis 为例,给消息分配一个全局id ,只要消费过该记录,将 id message 以key-val 形式写入redis 那么消费者开始消费前,先去redis 中查询有没有消费记录即可
0 条评论
下一页
为你推荐
查看更多