消息中间件之ActiveMQ
2022-06-21 08:50:09 1 举报
AI智能生成
消息中间件之ActiveMQ
作者其他创作
大纲/内容
1.入门概述
前言
1.在何种场景下需要使用消息中间件
2.为什么要在系统里引入消息中间件
...
从生活Case到实际生产案例
上述问题引出的产生背景
系统之间直接调用实际工程落地和存在的问题
系统之间接口耦合比较严重
面对大流量并发时,容易被冲垮
等待同步存在性能问题
你如何解决
需要有一种东东能摆平上述情况
是什么
定义
特点
采用异步处理模式
应用系统之间解耦合
发送者和接受者不必了解对方,只需要确认消息
发送者和接受者不必同时在线
整体架构
分支主题
能干嘛
解耦
削峰
异步
去哪下
ActiveMQ官网
http://activemq.apache.org/
怎么玩
最重要的功能
实现高可用,高性能,可伸缩,易用和安全的企业级面向消息服务的系统
异步消息的消费和处理
控制消息的消费顺序
可以和Spring或者SpringBoot整合简化代码
配置集群容错的MQ集群
2.ActiveMQ安装和控制台
官网下载
Linux安装
普通启动
默认端口为61616
普通关闭
带日志启动
Apache ActiveMQ控制台
http://127.0.0.1:8161/admin
默认的用户名和密码是admin/admin
备注
ActiveMQ采用61616端口提供JMS服务
ActiveMQ采用8161端口提供管理控制台服务
3.Java编码实现ActiveMQ通讯
IDEA创建Maven工程
POM.xml文件
JMS编码总体架构
回忆一下以前的JDBC编码套路
粗说目的地Destination
队列(Queue)和主题(Topic)
两大模式特性
在点对点的消息传递域中,目的地被称为队列(queue)
上手案例
消息生产者
代码
控制台
控制台说明
消息消费者
代码1-阻塞式消费者
代码2-异步监听式消费者
控制台
编码小总结
JMS开发的基本步骤
两种消费方式
在发布订阅消息传递域中,目的地被称为主题(topic)
上手案例
发布主题生产者
代码
控制台
订阅主题消费者
代码
控制台
先启动订阅者再启动生产者,不然发送的消息是废消息
控制台
小总结
两大模式特效
两大模式比较
4.JMS规范和落地产品
是什么
JavaEE
JMS
Java Message Service(Java消息服务是JavaEE中的一个技术
MQ中间件的其他落地产品
消息队列的详细比较
JMS的组成结构和特点
JMS Provider
实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器
JMS Producer
消息生产者,创建和发送JMS消息的客户端应用
JMS Consumer
消息消费者,接收和处理JMS消息的客户端应用
JSM Message
消息头
JMSDestination
消息发送的目的地,主要是指Queue和Topic
JMSDeliveryMode
JMSExpiration
JMSPriority
JMSMessageID
唯一标识每个消息的标识由MQ产生。
消息属性
封装具体的消息数据
5种消息格式
TxtMessage
普通字符串消息,包含一个String
MapMessage
一个Map类型的消息,key为Strng类型,而值为Java基本类型
BytesMessage
二进制数组消息,包含一个byte[]
StreamMessage
Java数据流消息,用标准流操作来顺序填充和读取
ObjectMessage
对象消息,包含一个可序列化的Java对象
发送和接收的消息体类型必须一致对应
消息体
如果需要除消息字段以外的值,那么可以使用消息属性
识别/去重/重点标注等操作非常有用的方法
是什么
JMS的可靠性
PERSISTENT:持久性
参数设置说明
非持久
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
非持久化:当服务器宕机,消息不存在。
持久
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
持久化:当服务器宕机,消息依然存在。
Queue默认是持久
持久的Queue
演示
结论
持久的Topic
代码
先启动定阅消费者再启动定阅生产者
持久的发布主题生产者
代码
控制台
订阅者在线
订阅者不在线
持久的定阅主题消费者
代码
控制台
订阅者在线
订阅者不在线
控制台
类似微信公众号订阅发布
Transaction:事务
producer提交时的事务
false
只要执行send,就进入到队列中
关闭事务,那第2个签收参数的设置需要有效
true
先执行send再执行commit,消息才被真正提交到队列中
消息需要需要批量提交,需要缓冲处理
事务偏生产者/签收偏消费者
代码
生产者
消费者
Acknowledge:签收
非事务
自动签收(默认)
Session.AUTO_ACKNOWLEDGE
手动签收
Session.CLIENT_ACKNOWLEDGE
客户端调用acknowledge方法手动签收
允许重复消息
事务
生产事务开启,只有commit后才能将全部消息变为已消费
消息生产者
代码
控制台
消息消费者
代码
控制台
签收和事务的关系
JMS的点对点总结
JMS的发布订阅总结
非持久订阅
持久订阅
用哪个?
当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅
5.ActiveMQ的Broker
是什么
回忆阳哥之前讲的Redis,不同config配置文件来模拟不同的实例
嵌入式Broker
POM.XML
Embedoker
队列验证
6.Spring整合ActiveMQ
Maven修改,需要添加Spring支持JMS的包
Spring配置文件
队列(Queue)
生产者
代码
消费者
代码
主题(Topic)
生产者
代码
生产者和消费者都可以通过jmsTemplate对象实时设置目的地等其他信息
消费者
代码
在Spring里面实现消费者不启动,直接通过配置监听完成
说明
Spring配置文件
需要写一个类来实现消息监听
消费者配置了自动监听,就相当于在spring里面后台运行,有消息就运行我们实现监听类里面的方法
7.SpringBoot整合ActiveMQ
队列(Queue)
队列生产者
1.新建Maven工程并设置包名类名
工程名
boot_mq_producer
包名
com.atguigu.boot.activemq
2.POM文件
3.Yml文件
4.配置bean
类似于Spring的ApplicationContext.xml文件
5.Queue_Producer
6.主启动类
7.测试单元
8.新需求:
要求每隔3秒钟,往MQ推送消息
修改Queue_Produce
修改主启动类的MainApp_Producer
直接开启主启动类,间隔发送消息
队列消费者
1.新建Mavaen工程并设置包名类名
2.POM文件
3.Yml文件
4.springboot的消息监听注解
@JmsListener
监听过后会随着springboot一起启动,有消息就执行加了该注解的方法
方法
主题发布订阅(Topic)
Topic生产者
1.新建Maven工程并设置包名类名
2.POM文件
3.Yml文件
4.配置bean
5.Topic_Producer
6.主启动类
先启动消费者,后启动生产者
Topic消费者
非持久版
1.新建Maven工程并设置包名类名
2.POM文件
3.Yml文件
4.Topic_Consumer
5.主启动类
持久版
1.新建Maven工程并设置包名类名
2.POM文件
3.Yml文件
4.配置bean
配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
5.Topic_Consumer
@JmsListener(destination = "${myTopicName}",containerFactory = "jmsListenerContainerFactory")
6.主启动类
8.ActiveMQ的传输协议
面试题
默认的61616端口如何更改
你生产上的连接协议如何配置的?使用tcp吗?
官网
http://activemq.apache.org/configuring-version-5-transports.html
是什么
有哪些
1.Transmission Control Protocol(TCP)默认
2.New I/O API Protocol(NIO)
3.AMQP协议
4.Stomp协议
5.Secure Sockets Layer Protocol(SSL)
6.MQTT协议
扩展Github
7.WS协议(websocket)
8.小总结
nio案例演示
官网
http://activemq.apache.org/configuring-version-5-transports.html
修改配置文件
生产和消费两端协议代码修改
生产者
消费者
运行验证
nio案例演示增强
上诉NIO性能不错了,如何进一步优化?
问题
URI格式以"nio"开头,代表这个端口使用TCP协议为基础的NIO网络模型。
但是这样的设置方式,只能使这个端口支持Openwire协议。
我们怎么能够让这个端口既支持NIO网络模型,又让他支持多个协议呢?
解决
使用auto关键字
使用"+"符号来为端口设置多种特性
如果我们既需要使用某一个端口支持NIO网络模型,又需要它支持多个协议
9.ActiveMQ的消息存储和持久化
官网
http://activemq.apache.org/persistence
是什么
面试题
ActiveMQ持久化机制
回忆Redis持久化机制有几种
RDB
AOF
说明
一句话:ActiveMQ宕机了,消息不会丢失的机制。
有哪些
AMQ Mesage Store(了解)
基于文件的存储方式,是以前的默认消息存储,现在不用了
KahaDB消息存储(默认)
基于日志文件,从ActiveMQ5.4开始默认的持久化插件
官网
kahaDB
验证
说明
http://activemq.aache.org/kahadb
KahaDB的存储原理
JDBC消息存储
消息基于JDBC存储的
LevelDB消息存储(了解)
JDBC Message Store with ActiveMQ Journal
JDBC存储消息
1.MQ+MySQL
http://activemq.apache.org/persistence
2.添加mysql数据库的驱动包到lib文件夹
3.jdbcPersistenceAdapter配置
4.数据库连接池配置
5.建库SQL和创表说明
建一个名为activemq的数据库
三张表的说明
ACTIVEMQ_MSGS
ACTIVEMQ_ACKS
ACTIVEMQ_LOCK
如果新建数据库ok,上述配置ok,代码运行ok,3张表会自动生成
如果表没生成,可能需要自己创建
6.代码运行验证
一定要开启持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
队列
生产者
消费者
主题
生产者
消费者
7.数据库情况
一旦运行生产code
点到点
发布/订阅
mysql
queue
topic
8.小总结
9.开发有坑
JDBC Message store with ActiveMQ Journal
是什么
说明
配置
以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用
ActiveMQ持久化机制小总结
10.ActiveMQ的多节点集群
面试题
引入消息中间件后如何保证其高可用
是什么
基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。
zookeeper+replicated-leveldb-store的主从集群
三种集群方式对比
基于shareFileSystem共享文件系统(KahaDB)
基于JDBC
基于可复制的LevelDB
本次案例采用ZK+Replicated LevelDB Store
ShareFileSystem
是什么
http://activemq.apache.org/replicated-leveldb-store
官网集群原理图
解释
部署规划和步骤
1.环境和版本
2.关闭防火墙并保证各个服务器能够ping通
3.要求具备zk集群并可以成功启动
配置集群教程
4.集群部署规划列表
5.创建3台集群目录
就是一台电脑复制三份ActiveMQ
6.修改管理控制台端口
就是ActiveMQ后台管理页面的访问端口
7.hostname名字映射(如果不映射只需要吧mq配置文件的hostname改成当前主机ip)
8.ActiveMQ集群配置
配置文件里面的BrokerName要全部一致
持久化配置(必须)
01/02/03路径
9.修改各个节点的消息端口
真实的三台机器不用管
10.按顺序启动3个ActiveMQ节点,到这步前提是zk集群已经成功启动运行
先启动Zk 在启动ActiveMQ
11.zk集群节点状态说明
3台Zk连接任意一台验证三台ActiveMQ是否注册上了Zookeeper
查看Master
集群可用性测试
集群需要使用(failover:(tcp://192.168.10.130:61616,tcp://192.168.10.132:61616,tcp://192.168.10.133:61616))配置多个ActiveMQ
11.高级特性和大厂常考重点
引入消息队列之后该如何保证其高可用性
zookeeper+Replicated LevelDB
异步投递Async Sends
异步投递
http://activemq.apache.org/async-sends
说明
对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞的情况,慢消费者适合使用异步发送
是什么
官网配置
异步消息如何确定发送成功?
JmsProduce_AsyncSend
延迟投递和定时投递
http://activemq.apache.org/delay-and-schedule-message-delivery.html
官网说明
四大属性
案例演示
要在activemq.xml中配置schedulerSupport属性为true
Java代码里面封装的辅助消息类型:ScheduledMessage
代码
分发策略
ActiveMQ消息重试机制
面试题
官网
http://activemq.apache.org/redelivery-policy
官网有举例代码我就不写了,很简单,创建类就好了
重发整合Spring
属性说明
死信队列
使用
IndividualDeadLetterStrategy
SharedDeadLetterStrategy
自动删除过期消息
如果保证消息不被重复消费呢?幂等性问题
收藏
0 条评论
下一页