MQ之ActiveMQ
2021-07-07 21:35:14 0 举报
AI智能生成
ActiveMQ入门到精通
作者其他创作
大纲/内容
MQ之ActiveMQ
入门概述
前言
从生活到实际生产案例
上述问题引出的产生背景
系统之间直接调用实际工程落地和存在的问题
font color=\"#c41230\
等待同步存在性的问题
你如何解决呢?
需要有一种东西能摆平上述情况
是什么?
定义
面向消息的中间件 (message-oriented middleware) MOM
通过提供消息传递和font color=\"#0076b3\
执行流程如下
特点
采用异步处理模式
应用系统之间解耦
发送者海和接受者不必同时在线
能干嘛?
解耦
削峰
异步
在哪下?
activeMQ
http://activemq.apache.org/
怎么玩?
主要的功能
异步消息的消费和处理
控制消息的消费顺序
可以和spring/springboot整合简化编码
配置集群容错的mq集群
...
ActiveMQ(默认端口61616)安装和控制台
普通启动activeMQ
进入/bin/目录 ./activemq start
查看 activemq 是否启动的几种方式
ps -ef|grep activemq
netstat -anp |grep 61616
lsof -i:61616
关闭avtivemq
/bin/目录下 ./activemq stop
带日志的启动的方式
在 activemq目录创建日志文件logs
在/bin/启动带日志方式
./activemq start > /opt/apache-activemq-5.16.0/logs/run_activemq.log
activemq 控制台
ip:8161
默认用户名和密码都是 8161
<bean id=\"jettyPort\" class=\"org.apache.activemq.web.WebConsolePort\" init-method=\"start\"> <!-- the default port number for the web console --> <property name=\"host\" value=\"127.0.0.1\"/>#127.0.0.1 修改为虚拟机的ip <property name=\"port\" value=\"8161\"/> </bean>
备注
采用了 61616 端口提供的JMS服务
采用 8161 端口提供了管理控制台服务
Java编码实现ActiveMQ通讯
idea 创建 maven工程
pom.xml
JMS 编码总体架构
jms -java message Service 注意: Destination 目的地有俩种 (队列或者主题)
jms 和 jdbc 连接相似
粗说目的地 Destionation队列和主题
队列
queue (一对一)
主题
topic (一对多)
font color=\"#0076b3\
生产者
消费者
receive(4000L) 带时间和 receive()不带时间的区别
消费者监听
System.in.read();
编码小总结
JMS 开发的基本步骤
俩种消费方式
同步阻塞方式 receive()
异步非阻塞方式(监听器 onMessage())
订阅者或接受者通过messageConsumer的 setMessageListener (MessageListener listener)注册一个消息监听器
发布主题生产者
发布主题消费者
控制台
总结
两大模式特性
两大模式比较
比较项目
工作模式
有无状态
传递完整性
处理效率
topic
无状态
如果没有订阅者将会被丢弃
queue
Queue数据默认会在mq服务器上以文件形式存储
消息不会被丢弃
JMS 规范和落地产品
什么是JavaEE
JavaEE 是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准
JMS只是JavaEE的一个子模块
什么是Java消息服务
MQ中间件的其它落地产品
RabbitMQ
Kafka
RocketMQ
JMS的组成结构和特点
JMS provider
JMS producer
JMS consumer
JMS message
消息头
JMSDestination
JMSDeliveryMode
JMSExpiration
JMSPriority
消息优先级.从0-9font color=\"#c41230\
JMSfont color=\"#c41230\
JMSMessageID
唯一识别每个消息的表示由 MQ产生
消息体
封装具体的消息数据
5种消息体格式
TextMessage
MapMessage
StreamMessage
ObjectMessage
BytesMessage
发送和接受的消息类型必须一致对应
消息属性
识别/去重/重点标注等操作非常有用的方法
是什么
JMS的可靠性
PERSISTENT 持久化
参数设置说明
非持久
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
持久
textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
持久的 Queue
持久的 Topic
代码
先启动订阅再启动生产
持久的发布主题生产者
持久的订阅注意消费者
类似于weiixn公众号订阅发布
Transaction 事务
producer 提交时的事物
false
true
事务偏生产者/签收偏消费者
Acknowledge : 签收
非事务
自动签收(默认)
Session.AUTO_ACKNOWLEDGE
手动签收
Session.CLIENT_ACKNOWLEDGE
acknowledge() 进行签收
允许重复消息
Session.DUPS_OK_ACKNOWLEDGE
事务
消息生产者
消息消费者
签收和事务关系
JMS的点对点总结
JMS的发布订阅总结
非持久订阅
一句话: font color=\"#c41230\
持久订阅
用哪个?
ActiveMQ的Broker
cp activemq.xml activemq02.xml
./activemq start xbean:file:/opt/apache-activemq-5.16.0/conf/activemq02.xml
嵌入式Broker
理论
用ActiveMQ Broker 作为独立的消息服务器来构建Java应用
EmbedBroker
idea使用 jps -l 查看正在运行的线程
队列验证
Spring整合 ActiveMQ
Spring配置文件
只需修改 applicationContext.xml添加一个 topic实例bean
Spring 配置文件
新建的监听类
SpringBoot 整合 ActiveMQ
application.yml
配置类ConfigBean
生产者类
Test测试类
主启动类
新需求: font color=\"#c41230\
producerMsgScheduled()
pom.xml 和生产者pom.xml相同
application.yml 和 生产者 application.yml 相同
消费者类
主题/发布订阅
topic 生产者
pub-sub-domain: true # false 是队列 true 是主题 不写就是默认就是false队列
生产类 new ActiveMQTopic(myTopic)
topic 消费者
其它不变
ActiveMQ的传输协议
面试题
默认的 61616 端口如何修改
你生产上的链接协议如何配置的? 使用的是 tcp?
官网
activemq官网
其中配置 Transport Connation 的文件在 activeMQ 的 conf/activemq.xml 中的 <transportConnectors> 标签中
URL 描述信息的头部都采用协议名称例如
有哪些
Transmission Control Protocol (TCP) 默认
默认情况下activemq把 wire protocol 叫作 openwire 它的目的上是促进网络上的效率和数据快速交互
New I/O api protocol (NIO)
AMQP协议
stomp 协议
SSL 协议
连接url形式为 ssl://localhost:61616?trace=false
mqtt 协议
NIO 案例演示
修改 conf/activemq.xml
<transportConnectors><transportConnector name=\"nio\" uri=\"nio://0.0.0.0:61618?trace=true\"/></transportConnectors>
private static final String ACTIVE_MQ = \"nio://192.168.247.111:61618\";
NIO案例演示增强
问题
URL 格式头以 \"nio\
解决
使用 auto 关键字
修改 activemq.xml
<transportConnector name=\"auto+nio\" uri=\"auto+nio://0.0.0.0:61608?maximumConnections=1000& wireFormat.maxFrameSize=104857600& org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20& org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50\" />
使用 + 符号来为端口设置多特性
idea 生产者者和消费者配置
private static final String ACTIVE_MQ = \"nio://192.168.247.111:61608\";
private static final String ACTIVE_MQ = \"tcp://192.168.247.111:61608\";
ActiveMQ的消息存储和持久化
http://activemq.apache.org/persistence
activeMQ 持久化机制
LevelDB
KahaDB
说明
一句话:font color=\"#c41230\
消息中心启动以后首先要检查指定的 font color=\"#c41230\
AMQ Message Store
KahaDB (默认)
kahadb 官网
http://activemq.apache.org/kahadb
验证 KahaDB 是持久化存储 /conf/activemq.xml
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址
/data/kahadb/里面文件是啥
db -[number].log
db.data
db.free
db.redo
lock
JDBC
消息基于 JDBC 存储
<persistenceAdapter><kahaDB directory=\"activemq-data\" journalMaxFileLength=\"32mb\"/></persistenceAdapter>
JDBC Message store with activemq journal
JDBC消息存储
<persistenceAdapter> <jdbcPersistenceAdapter dataSource=\"#my-ds\" createTablesOnStartup=\"true\"/> </persistenceAdapter>
create database activemq
表
activemq_acks
activemq_lock
activemq_msgs
如果出现错误 查看activemq 日志 data/activemq.log
一定要开持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
生产
消费
topic
一旦运行生产code
点到点(队列)
而且点对点类型中消息一旦被 consumer 消费就从 broker 中删除
发布/订阅类型
如果是 topic 一般是先启动消费订阅然后再启动生产的情况下会将消息保存在 activemq_acks中
JDBC Message store with ActiveMQ Journal
怎么玩
修改conf/activemq.xml 文件
<persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles=\"4\" journalLogFileSize=\"32768\" useJournal=\"true\" useQuickJournal=\"true\" dataDirectory=\"activemq-data\" dataSource=\"#my-ds\"/> </persistenceFactory>
重新启动activemq start
activemq 持久化总结
持久化消息主要指
mq所在服务器down 了消息不会丢失的机制
activemq 的消息持久化机制
kahadb
基于日志文件
jdbc
第三方数据库
leveldb
基于文件的本地数据库存储
ActiveMQ多节点集群
引入消息队列之后该如何保证其高可用性
zookeeper + replicated + leveldb -store 的主从集群
三种集群方式对比
http://activemq.apache.org/masterslave
基于 Shared File System 共享文件系统 kahaDB默认
基于JDBC
基于可复制的LevelDB
本次案例采用 zk+replicated levelDB store
官网集群原理图
使用 zookeeper 集群注册所有的 activemq broker 但只有font color=\"#c41230\
所有需要同步的消息操作都将等待存储状态被复制到其它法定节点的操作完成才能完成
因此 推荐运行至少3个 replica nodes 以防止一个node 失败后服务中断
部署规划和步骤
Centos7
JDK1.8
apache-activemq-5.16.0
apache-zookeeper-3.5.8-bin
参考 zookeeper 集群的创建参考
主机
zookeeper集群端口
amq集群bind 端口
amq消息tcp端口
管理控制台端口
amq节点的安装目录
192.168.247.111
2181
bind=tcp://0.0.0.0:63631
61616
8161
/opt/mq_cluster/mq_node01
2182
bind=tcp://0.0.0.0:63632
61617
8162
/opt/mq_cluster/mq_node02
2183
bind=tcp://0.0.0.0:63633
61618
8163
/opt/mq_cluster/mq_node03
mkdir mq_cluster
cp -Rf apache-activemq-5.16.0/ mq_node01
cp -Rf mq_node01/ mq_node02
cp -Rf mq_node01/ mq_node03
mq_node01 文件不做修改使用默认的
修改 mq_node02/conf/jetty.xml 修改为 8162
修改 mq_node03/conf/jetty.xml 修改为 8163
vim /etc/hosts
修改第一台activemq在 vim /conf/activemq.xml
3个节点的 BrokerName 要求全部一致(这里剩下2个省略)
<broker xmlns=\"http://activemq.apache.org/schema/core\" brokerName=\"localhost\" dataDirectory=\"${activemq.data}\">
3个节点持久化配置(bind 端口修改)
61616默认不修改
修改mq_node02 和 mq_node03 的conf/activemq.xml
创建 activemq 的shell脚本
start
vim activemq_start.sh
chmod u+x activemq_start.sh
./activemq_start.sh
stop
vim activemq_stop.sh
chmod u+x activemq_stop.sh
./ activemq_stop.sh
3台zk集群连接任意连接一台
进入 zookeeper的bin目录执行 ./zkCli.sh
ls / 查看里面是否有注册成功的activemq
[font color=\"#c41230\
ls /activemq/leveldb-stores
如何查看 font color=\"#c41230\
get /activemq/leveldb-stores/00000000000
{\"id\":\"zzyymq\
get /activemq/leveldb-stores/00000000001
get /activemq/leveldb-stores/00000000002
说明: font color=\"#c41230\
集群可用性测试
同样的
代码测试
生产者/消费者
static final String ACTIVE_MQ = \"font color=\"#c41230\
如何查看那个activemq 是master 呢
使用 lsof -i:8161(你自己设置的端口)
java 80506 root 167u IPv6 894034 0t0 TCP zookeeper-activemq-zgx:patrol-snmp (LISTEN)
则说明是master主机
杀掉正在运行的 8161端口
kill -9
高级特性和大厂常考重点
zookeeper +leavedb 集群
异步投递
http://activemq.apache.org/async-sends
如果你没有使用事务且发送的持久化的消息span style=\"font-size: inherit;\
异步发送
就是需要消耗较多的 client 端内存同时也会导致 broker 端的性能消耗增加
官网配置
面试题追问
异步消息如何确认发送成功?
同步发送和异步发送的区别就在此
同步发送需要等 send 不阻塞了就表示一定发送成功
异步发送需要接受回执并由客户端再判断一次是否发送成功
4大属性
AMQ_SCHEDULED_DELAY
延迟投递时间
AMQ_SCHEDULED_PERIOD
重复投递的时间间隔
AMQ_SCHEDULED_REPEAT
重复投递的次数
AMQ_SCHEDULED_CRON
Linux Crom 表达式
案例演示
conf/activemq.xml 文件修改如下
<broker xmlns=\"http://activemq.apache.org/schema/core\" brokerName=\"localhost\" dataDirectory=\"${activemq.data}\" schedulerSupport=\"true\">
在Java 代码里面封装的辅助消息类型 SecheduledMessage
延时投递的producer
延时投递的consumer
具体哪些情况下会引起消息重发
Client 用了 transactions 且在 session 中调用了 rollback()
Client 用了 transactions 且在调用 commit()之前关闭或者没有 commit()
请说说消息重发时间间隔和重发次数吗
间隔 1s 次数:6
有毒 Poison ack 谈谈你的理解
http://activemq.apache.org/redelivery-policy
属性说明
maximumRedeliveries
initialRedeliveryDelay
消费者测试
我们查看localhost:61616 在消费者开启事物不尽兴提交的情况下1次失败的情况下在重发6次会出现死信队列
ActiveMQ.DLQ
如何改变默认重发策略
//1 连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ); //2 建立连接 Connection connection = activeMQConnectionFactory.createConnection(); RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); queuePolicy.setMaximumRedeliveries(2); activeMQConnectionFactory.setRedeliveryPolicy(queuePolicy);
activemq 死信队列的配置介绍
sharedDeadLetterStrategy
共享队列默认为 ActiveMQ.DLQ 可以通过 deadLetterQueue 属性来定
individualDeadLetterStrategy
把 deadLetter 放入各自的死信队列中
比如队列 Order 那么它的死信队列 通道为 ActiveMQ.DLQ.Queue.Order
我们使用 queuePerfix 和 topicPrefix 来指定上述前缀
配置案例
自动删除过期消息
存放非持久消息到死信队列中
0 条评论
回复 删除
下一页