ActiveMQ知识架构梳理
2021-09-26 16:52:32 0 举报
AI智能生成
ActiveMQ知识架构梳理
作者其他创作
大纲/内容
安装
Linux
ActiveMQ是什么
应用场景
**异步通信**
**缓冲**
**解耦**
**冗余**
扩展性
过载保护
数据流处理
异构平台
常用消息中间件对比
ActiveMQ
RabbitMQ
RocketMQ
Kafka
JMS中的一些角色
角色
Broker
消息服务器,一个实例,一个节点
provider
消息提交者,生产者
Consumer
消息消费者
消息模型
p2p
基于点对点的消息模型
pub/sub
基于订阅/发布的消息模型
ConnectionFactory
连接工厂,jms中用它创建连接
连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。
连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。
Connection
支持多钟连接协议
JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。
Destination
消息的目的地
Queue
消息队列
队列存储,常用与点对点消息模型
默认只能由唯一的一个消费者处理。一旦处理消息删除。
默认只能由唯一的一个消费者处理。一旦处理消息删除。
Topic
主题
主题存储,用于订阅/发布消息模型
主题中的消息,会发送给所有的消费者同时处理。
只有在消息可以重复处 理的业务场景中可使用。
主题中的消息,会发送给所有的消费者同时处理。
只有在消息可以重复处 理的业务场景中可使用。
Session
Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
JMS的消息格式
JMS的消息由
以下三部分组成的
以下三部分组成的
消息头
JMS消息头使用的所有方法
自动头信息
手动头信息
消息属性
如果需要除消息头字段以外的值,那么可以使用消息属性。
消息体
JMS定义的消息类型有
TextMessage、
MapMessage、
BytesMessage、
StreamMessage和ObjectMessage。
TextMessage、
MapMessage、
BytesMessage、
StreamMessage和ObjectMessage。
消息类型
TextMessage
文本消息
MapMessage
k/v
BytesMessage
字节流
StreamMessage
java原始的数据流
ObjectMessage
序列化的java对象
特性
消息可靠性机制
确认 JMS消息
应答模式 ACKNOWLEDGE Mode
参数有3个可选值
参数有3个可选值
AUTO_ACKNOWLEDGE
当客户端成功的从receive方法返回的时候,或者从MessageListener.onMessage
方法成功返回的时候,会话自动确认客户收到的消息
方法成功返回的时候,会话自动确认客户收到的消息
CLIENT_ACKNOWLEDGE
客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
DUPS_ACKNOWLEDGE
该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。
持久化
JMS 支持以下两种消息提交模式:
PERSISTENT
指示JMS Provider持久保存消息,以保证消息不会因为JMS Provider的失败而丢失。
NON_PERSISTENT
不要求JMS Provider持久保存消息。
优先级
可以使用消息优先级来指示JMS Provider首先提交紧急的消息。
优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。
需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。
优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。
需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。
消息过期
可以设置消息在一定时间后过期,默认是永不过期。
临时目的地
可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。
它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。
它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。
持久订阅
首先消息生产者必须使用PERSISTENT提交消息。
客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,
该方法的第一个参数必须是一个topic,第二个参数是订阅的名称。
该方法的第一个参数必须是一个topic,第二个参数是订阅的名称。
JMS Provider会存储发布到持久订阅对应的topic上的消息。
如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、
相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。
相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。
JMS Provider会象客户发送客户处于非激活状态时所发布的消息。
持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。
本地事务
commit
rollback
Broker
存储 persistenceAdapter
KahaDB
B-Tree
db.data
db.redo
db-.log
APPEND的方式
lock
AMQ
JDBC
表字段解释
activemq_msgs
activemq_acks
activemq_lock
dbcp连接池
数据源
LevelDB
LevelDB持久化性能高于KahaDB,虽然目前默认的持久化方式仍然是KahaDB。并且,在ActiveMQ 5.9版本提供 了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。 但是在ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB
Memory
顾名思义,基于内存的消息存储,就是消息存储在内存中。persistent=”false”,
表示不设置持 久化存储,直接存储到内存中 在broker标签处设置。
表示不设置持 久化存储,直接存储到内存中 在broker标签处设置。
JDBC Message store with ActiveMQ Journal
连接协议 transportConnector
TCP
阻塞式
NIO
异步
UDP
SSL
Http(s)
VM
HelloWorld开发流程
下载
http://activemq.apache.org/
安装启动
解压后直接执行
`bin/win64/activemq.bat`
`bin/win64/activemq.bat`
web控制台
http://localhost:8161/
通过8161端口访问
通过8161端口访问
修改访问端口
修改 ActiveMQ 配置文件:/usr/local/activemq/conf/jetty.xml
**jettyport节点**
配置文件修改完毕,保存并重新启动 ActiveMQ 服务。
**jettyport节点**
配置文件修改完毕,保存并重新启动 ActiveMQ 服务。
开发
maven坐标
sender
Receiver
Active MQ的安全机制
web控制台安全
功能
Queues
Topics
Subscribers
Connections
Network
Scheduled
Send
ActiveMQ服务监控 Hawtio
独立jar包的形式运行
集成ActiveMQ
消息安全机制
开发
consumer.receive()
同步接收阻塞式
消息监听器
异步接收
API
事务
session.commit();
session.rollback();
优先级
producer.setPriority
修改配置文件
消息超时/过期
producer.setTimeToLive
整合SpringBoot
pom
yml
连接池设置
对象序列化 信任包
packages:
trust
Config类
@JmsListener
jmsMessagingTemplate
jmsTemplate
收
发
集群
主备集群
Shared File System Master Slave
**JDBC Master Slave**
**Replicated LeveDB Store**
需要使用zookeeper
高性能集群
静态网络配置
multicast协议动态网络配置
failover 故障转移协议
**可配置选项**
Transport Options
消息回流
replayWhenNoConsumers
消息副本
高级特性
死信队列
修改死信队列名称
让非持久化的消息也进入死信队列
过期消息不进死信队列
独占消费者
API
设置优先级
消息发送原理
同步与异步
强行异步发送
producer的producerWindowSize
brokerUrl
destinationUri
延迟消息投递
配置文件
api
延迟发送
带间隔的重复发送
Cron表达式定时发送
消息过滤
selector
queue browser
批量确认
消费缓冲与消息积压/慢消费者 prefetchSize
消息到底是推还是拉?
EIP Camel
Request/Response模型实现
QueueRequestor 同步消息
**JMSReplyTo**
JMSCorrelationID
TemporaryQueue
Topic加强 可追溯消息
消费端设置
保留固定字节的消息 fixedSizedSubscriptionRecoveryPolicy
保留固定数量的消息 fixedCountSubscriptionRecoveryPolicy
保留时间 timedSubscriptionRecoveryPolicy
保留最后一条 lastImageSubscriptionRecoveryPolicy
下一代 ActiveMQ 6?Artemis
常见问题
ActiveMQ如何防止消息丢失?会不会丢消息?
如何防止重复消费
如何保证消费顺序?
Out of memory
慢速消费
SlowConsumerStrategy
直接中断 **AbortSlowConsumerStrategy abortConnection**
设置阈值 AbortSlowConsumerStrategy maxTimeSinceLastAck
消息限制策略 PendingMessageLimitStrategy
constantPendingMessageLimitStrategy
prefetchRatePendingMessageLimitStrategy
消息堆积内存上涨
prefetchSize影响消费倾斜
prefetchSize造成消费者内存溢出
AUTO_ACKNOWLEDGE造成消息丢失/乱序
exclusive 和selector有可能造成消息堆积
0 条评论
下一页