消息中间件之ActiveMQ
2022-06-21 08:50:09 1 举报
AI智能生成
为你推荐
查看更多
消息中间件之ActiveMQ
作者其他创作
大纲/内容
1.在何种场景下需要使用消息中间件
2.为什么要在系统里引入消息中间件
...
前言
系统之间接口耦合比较严重
面对大流量并发时,容易被冲垮
等待同步存在性能问题
你如何解决
系统之间直接调用实际工程落地和存在的问题
上述问题引出的产生背景
需要有一种东东能摆平上述情况
从生活Case到实际生产案例
定义
采用异步处理模式
发送者和接受者不必了解对方,只需要确认消息
发送者和接受者不必同时在线
应用系统之间解耦合
分支主题
整体架构
特点
是什么
解耦
削峰
异步
能干嘛
http://activemq.apache.org/
ActiveMQ官网
去哪下
实现高可用,高性能,可伸缩,易用和安全的企业级面向消息服务的系统
最重要的功能
异步消息的消费和处理
控制消息的消费顺序
可以和Spring或者SpringBoot整合简化代码
配置集群容错的MQ集群
怎么玩
1.入门概述
官网下载
默认端口为61616
普通启动
普通关闭
带日志启动
Linux安装
http://127.0.0.1:8161/admin
默认的用户名和密码是admin/admin
Apache ActiveMQ控制台
ActiveMQ采用61616端口提供JMS服务
ActiveMQ采用8161端口提供管理控制台服务
备注
2.ActiveMQ安装和控制台
IDEA创建Maven工程
POM.xml文件
回忆一下以前的JDBC编码套路
JMS编码总体架构
两大模式特性
粗说目的地Destination队列(Queue)和主题(Topic)
代码
控制台
消息生产者
控制台说明
代码1-阻塞式消费者
代码2-异步监听式消费者
消息消费者
JMS开发的基本步骤
两种消费方式
编码小总结
上手案例
发布主题生产者
订阅主题消费者
两大模式特效
两大模式比较
小总结
3.Java编码实现ActiveMQ通讯
JavaEE
Java Message Service(Java消息服务是JavaEE中的一个技术
JMS
消息队列的详细比较
MQ中间件的其他落地产品
实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器
JMS Provider
消息生产者,创建和发送JMS消息的客户端应用
JMS Producer
消息消费者,接收和处理JMS消息的客户端应用
JMS Consumer
消息发送的目的地,主要是指Queue和Topic
JMSDestination
JMSDeliveryMode
JMSExpiration
JMSPriority
唯一标识每个消息的标识由MQ产生。
JMSMessageID
消息头
封装具体的消息数据
普通字符串消息,包含一个String
TxtMessage
一个Map类型的消息,key为Strng类型,而值为Java基本类型
MapMessage
二进制数组消息,包含一个byte[]
BytesMessage
Java数据流消息,用标准流操作来顺序填充和读取
StreamMessage
对象消息,包含一个可序列化的Java对象
ObjectMessage
5种消息格式
发送和接收的消息体类型必须一致对应
消息属性
如果需要除消息字段以外的值,那么可以使用消息属性
识别/去重/重点标注等操作非常有用的方法
消息体
JSM Message
JMS的组成结构和特点
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
非持久化:当服务器宕机,消息不存在。
非持久
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
持久化:当服务器宕机,消息依然存在。
持久
Queue默认是持久
参数设置说明
演示
结论
持久的Queue
先启动定阅消费者再启动定阅生产者
订阅者在线
订阅者不在线
持久的发布主题生产者
持久的定阅主题消费者
类似微信公众号订阅发布
持久的Topic
PERSISTENT:持久性
只要执行send,就进入到队列中
关闭事务,那第2个签收参数的设置需要有效
false
先执行send再执行commit,消息才被真正提交到队列中
消息需要需要批量提交,需要缓冲处理
true
producer提交时的事务
事务偏生产者/签收偏消费者
生产者
消费者
Transaction:事务
Session.AUTO_ACKNOWLEDGE
自动签收(默认)
Session.CLIENT_ACKNOWLEDGE
客户端调用acknowledge方法手动签收
手动签收
允许重复消息
非事务
生产事务开启,只有commit后才能将全部消息变为已消费
事务
签收和事务的关系
Acknowledge:签收
JMS的可靠性
JMS的点对点总结
非持久订阅
持久订阅
当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅
用哪个?
JMS的发布订阅总结
4.JMS规范和落地产品
回忆阳哥之前讲的Redis,不同config配置文件来模拟不同的实例
POM.XML
Embedoker
队列验证
嵌入式Broker
5.ActiveMQ的Broker
Maven修改,需要添加Spring支持JMS的包
Spring配置文件
队列(Queue)
生产者和消费者都可以通过jmsTemplate对象实时设置目的地等其他信息
主题(Topic)
说明
需要写一个类来实现消息监听
消费者配置了自动监听,就相当于在spring里面后台运行,有消息就运行我们实现监听类里面的方法
在Spring里面实现消费者不启动,直接通过配置监听完成
6.Spring整合ActiveMQ
boot_mq_producer
工程名
com.atguigu.boot.activemq
包名
1.新建Maven工程并设置包名类名
2.POM文件
3.Yml文件
类似于Spring的ApplicationContext.xml文件
4.配置bean
5.Queue_Producer
6.主启动类
7.测试单元
修改Queue_Produce
修改主启动类的MainApp_Producer
队列生产者
1.新建Mavaen工程并设置包名类名
@JmsListener
方法
4.springboot的消息监听注解
队列消费者
5.Topic_Producer
Topic生产者
4.Topic_Consumer
5.主启动类
非持久版
配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
@JmsListener(destination = \"${myTopicName}\
5.Topic_Consumer
持久版
Topic消费者
主题发布订阅(Topic)
7.SpringBoot整合ActiveMQ
默认的61616端口如何更改
你生产上的连接协议如何配置的?使用tcp吗?
面试题
http://activemq.apache.org/configuring-version-5-transports.html
官网
1.Transmission Control Protocol(TCP)默认
2.New I/O API Protocol(NIO)
3.AMQP协议
4.Stomp协议
5.Secure Sockets Layer Protocol(SSL)
扩展Github
6.MQTT协议
7.WS协议(websocket)
8.小总结
有哪些
修改配置文件
生产和消费两端协议代码修改
运行验证
nio案例演示
上诉NIO性能不错了,如何进一步优化?
URI格式以\"nio\"开头,代表这个端口使用TCP协议为基础的NIO网络模型。但是这样的设置方式,只能使这个端口支持Openwire协议。
问题
我们怎么能够让这个端口既支持NIO网络模型,又让他支持多个协议呢?
使用auto关键字
使用\"+\"符号来为端口设置多种特性
解决
nio案例演示增强
8.ActiveMQ的传输协议
http://activemq.apache.org/persistence
ActiveMQ持久化机制
RDB
AOF
回忆Redis持久化机制有几种
一句话:ActiveMQ宕机了,消息不会丢失的机制。
基于文件的存储方式,是以前的默认消息存储,现在不用了
AMQ Mesage Store(了解)
基于日志文件,从ActiveMQ5.4开始默认的持久化插件
kahaDB
验证
http://activemq.aache.org/kahadb
KahaDB的存储原理
KahaDB消息存储(默认)
消息基于JDBC存储的
JDBC消息存储
LevelDB消息存储(了解)
JDBC Message Store with ActiveMQ Journal
1.MQ+MySQL
2.添加mysql数据库的驱动包到lib文件夹
3.jdbcPersistenceAdapter配置
4.数据库连接池配置
建一个名为activemq的数据库
ACTIVEMQ_MSGS
ACTIVEMQ_ACKS
ACTIVEMQ_LOCK
三张表的说明
如果新建数据库ok,上述配置ok,代码运行ok,3张表会自动生成
如果表没生成,可能需要自己创建
5.建库SQL和创表说明
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
一定要开启持久化
队列
主题
6.代码运行验证
点到点
发布/订阅
一旦运行生产code
queue
topic
mysql
7.数据库情况
9.开发有坑
JDBC存储消息
配置
以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用
JDBC Message store with ActiveMQ Journal
ActiveMQ持久化机制小总结
9.ActiveMQ的消息存储和持久化
引入消息中间件后如何保证其高可用
基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。
基于shareFileSystem共享文件系统(KahaDB)
基于JDBC
基于可复制的LevelDB
三种集群方式对比
ShareFileSystem
http://activemq.apache.org/replicated-leveldb-store
解释
官网集群原理图
1.环境和版本
2.关闭防火墙并保证各个服务器能够ping通
配置集群教程
3.要求具备zk集群并可以成功启动
4.集群部署规划列表
就是一台电脑复制三份ActiveMQ
5.创建3台集群目录
就是ActiveMQ后台管理页面的访问端口
6.修改管理控制台端口
7.hostname名字映射(如果不映射只需要吧mq配置文件的hostname改成当前主机ip)
配置文件里面的BrokerName要全部一致
持久化配置(必须)
01/02/03路径
8.ActiveMQ集群配置
真实的三台机器不用管
9.修改各个节点的消息端口
先启动Zk 在启动ActiveMQ
3台Zk连接任意一台验证三台ActiveMQ是否注册上了Zookeeper
查看Master
11.zk集群节点状态说明
部署规划和步骤
集群可用性测试
本次案例采用ZK+Replicated LevelDB Store
zookeeper+replicated-leveldb-store的主从集群
10.ActiveMQ的多节点集群
zookeeper+Replicated LevelDB
引入消息队列之后该如何保证其高可用性
http://activemq.apache.org/async-sends
异步投递
官网配置
JmsProduce_AsyncSend
异步消息如何确定发送成功?
异步投递Async Sends
http://activemq.apache.org/delay-and-schedule-message-delivery.html
四大属性
官网说明
要在activemq.xml中配置schedulerSupport属性为true
Java代码里面封装的辅助消息类型:ScheduledMessage
案例演示
延迟投递和定时投递
分发策略
http://activemq.apache.org/redelivery-policy
重发整合Spring
属性说明
ActiveMQ消息重试机制
IndividualDeadLetterStrategy
SharedDeadLetterStrategy
自动删除过期消息
使用
死信队列
如果保证消息不被重复消费呢?幂等性问题
11.高级特性和大厂常考重点
ActiveMQ
收藏
0 条评论
回复 删除
下一页