ActiveMQ学习笔记
2021-06-21 15:29:26 0 举报
AI智能生成
ActiveMQ学习笔记
作者其他创作
大纲/内容
第一部分:JMS基础
01. ActiveMQ相关背景
中间件
让异构的组件协同工作,通信层
MOM
面向消息的中间件(Message-Oriented Middleware)
四元素
消息传递提供者
目的地
客户端(发送方或接收方)
消息
JMS
Java消息服务(Java Message Service)
Java的一套API标准
ActiveMQ
JMS 的一个实现
02. JMS客户端编程模型
消息传送模式
PTP
点对点(Point-to-Point)
Pub/Sub
发布/订阅(Publish/Subscribe)
JMS API
统一域和特定于域的API
编程模型
对象简介
连接工厂(ConnectionFactory)
客户端使用连接工厂对象(ConnectionFactory)创建连接
连接(Connection)
连接对象 (Connection) 表示客户端与代理之间的活动连接
会话(Session)
会话标记客户端与代理之间的单次对话
消息(Message)
消息封装一次通信的内容。消息由三部分组成:消息头、属性和主体
消息生成方(MessageProducer)
由Session创建,负责发送Message到目的地
消息使用方(MessageConsumer)
由Session创建,负责从目的地中消费Message
目的地(Destination)
JMS Provider负责维护,用于对Message进行管理的对象
03. 消息的接收方式
同步接收
主线程阻塞式等待下一个消息的到来,可以设置timeout,超时则返回null
获取MessageConsumer实例之后调用API
receive():Message
获取下一个消息。这个调用将导致无限期的阻塞,直到有新的消息产生。
receive(long timeout):Message
获取下一个消息。这个调用可能导致一段时间的阻塞,直到超时或者有新的消息产生。超时则返回null
receiveNoWait():Message
获取下一个消息。这个调用不会导致阻塞,如果没有下一个消息,直接返回null
异步接收
主线程设置MessageListener,然后继续做自己的事,子线程负责监听
获取MessageConsumer实例之后调用API
setMessageListener(MessageListener):void
设置消息监听器
MessageListener.onMessage(Message message):void
这是一个回调方法,当有新的消息产生,这个方法会被自动调用
注意:暂停退出
主线程挂起来,子线程才能继续监听
04. 消息选择器
为Consumer指定基于Message属性的筛选条件
javax.jms.Message 的API
setIntProperty(String name, int value):void
setStringProperty(String name, String value):void
setObjectProperty(String name, Object value):void
接收消息eg
05. 客户端模板化
06. 可靠消息传送
故障分析
1.两个跃点
发送跃点
接收跃点
2.三个隐患阶段
发送阶段
Producer发送消息到目的地,可能会失败
消息缓存阶段
JMS Provider异常导致目的地中缓存的数据丢失
接收阶段
Consumer从目的地接收消息,可能会失败
可靠传送保障机制
1.Acknowledge - 确认
1.1.对于Producer
send()方法会被阻塞,直到JMS Provider发回确认
透明
1.2.对于Consumer
消费者从目的地接收Message之后,发送接收确认给JMS Provider
JMS Provider收到确认后,将从目的地中删除该消息,发送处理确认给消费者
3种确认机制
AUTO_ACKNOWLEDGE
自动确认
每次通信
CLIENT_ACKNOWLEDGE
客户端确认
所有之前
DUPS_OK_ACKNOWLEDGE
消息可重复确认
1.3.确认机制的设置
javax.jms.Connection.createSession(boolean transacted, int acknowledgeMode):Session
1.4.确认机制的获取
2.Transaction - 事务
PK
确认机制
本质
确保消费者能够获取到消息,在收到消费者的确认之后,才会将消息从目的地移除
客户端编程
在Producer端是透明的,只能在Consumer端进行控制
功能性
确认消息已收到
事务机制
本质
将多个操作绑在一起,成为原子操作,从而有相同的操作结果(成功或失败)
客户端编程
可以在Producer和Consumer端实施控制
功能性
事务的commit,提供类似于批量确认的功能
互斥
Connection.createSession(boolean transacted, int acknowledgeMode)
第一个参数为true
启用事务,第二个参数就会被忽略(建议传入Session.SESSION_TRANSACTED)
第一个参数为false
不启用事务,第二个参数用于设置确认模式
3.持久化模式
目的
用来处理消息在目的地阶段的安全隐患
3.1.持久化设置
javax.jms.MessageProducer
设置此producer实例的默认递送模式
setDeliveryMode(int):void
以默认的deliveryMode发送消息
send(Message message):void
以指定的deliveryMode发送消息
send(Message message, int deliveryMode, int priority, long timeToLive):void
javax.jms.DeliveryMode
非持久化模式
static int NON_PERSISTENT = 1
持久化模式
static int PERSISTENT = 2
07. 持久性订阅
第二部分:Spring-JMS
08. Spring-JmsTemplate之发送
必需的资源
ConnectionFactory资源
凭借JmsTemplete提供的构造器或setter方法
Destination资源
提供一个默认的Destination
public void setDefaultDestination(Destination destination)
供一个明确的Destination
public void send(Destination destination, MessageCreator messageCreator)
基于Destination解析器、Destination类型、Destination名字的获取
3种发送
基本的发送
调用API
将消息发送到指定的Destination
public void send(Destination destination, MessageCreator messageCreator)
将消息发送到指定的destinationName
public void send(String destinationName, MessageCreator messageCreator)
将消息发送到defaultDestination
public void send(MessageCreator messageCreator)
提前调用
setDefaultDestination(Destination destination)
setDefaultDestinationName(String destinationName)
使用MessageCreator创建消息
javax.jms.Message
BytesMessage
MapMessage
ObjectMessage
StreamMessage
TextMessage
javax.jms.Session创建消息
Message createMessage(Session session)
转换并发送
转换接口
org.springframework.jms.support.converter.MessageConverter
Message toMessage(Object object, Session session)
Object fromMessage(Message message)
JmsTemplate设置Converter
public void setMessageConverter(MessageConverter messageConverter)
Spring 自动赋值一个SimpleMessageConverter
调用API
public void convertAndSend(Destination destination, Object message)
public void convertAndSend(String destinationName, Object message)
public void convertAndSend(Object message)
转换、后处理再发送
目的
为javax.jms.Message消息添加头部信息或属性
实现接口
org.springframework.jms.core.MessagePostProcessor
调用API
public void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor)
public void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor)
public void convertAndSend(Object message, MessagePostProcessor postProcessor)
09. Spring-JmsTemplate之接收
基本的接收
接收并转换
带有选择器的接收
选择接收并转换
10. Spring-JmsTemplate之浏览
概述
浏览是指获取消息而消息依然保持在broker中
仅Queue
JMS API
javax.jms.Session
QueueBrowser createBrowser(Queue queue)
QueueBrowser createBrowser(Queue queue, java.lang.String messageSelector)
javax.jms.QueueBrowser
java.util.Enumeration getEnumeration()
void close()
JmsTemplate的API
基本的浏览
带有选择器的浏览
11. Spring-JmsTemplate之执行
关于回调接口
JmsTemplate接管了整个过程,但在敏感的点给我们自主行为的机会
发送
MessageCreator
MessagePostProcessor
扩展增强
BrowserCallback
ProducerCallback
SessionCallback
执行的方法
JmsTemplate
public <T> T execute(ProducerCallback<T> action)
public <T> T execute(Destination destination, ProducerCallback<T> action)
public <T> T execute(String destinationName, ProducerCallback<T> action)
public <T> T execute(SessionCallback<T> action)
public <T> T execute(SessionCallback<T> action, boolean startConnection)
12. Spring-JmsTemplate特性设置
消息的递送模式
PK
非持久化
高吞吐,易丢失
持久化
低吞吐,不易丢失
JmsTemplate相关API
public void setDeliveryMode(int deliveryMode)
设置是否持久化要发送的消息:1-非持久化;2-持久化
public int getDeliveryMode()
获取持久化模式的设置:1-非持久化;2-持久化
public void setDeliveryPersistent(boolean deliveryPersistent)
设置是否持久化要发送的消息,true-持久化;false-非持久化
消息的优先级
JMS API为消息定义了10个优先级,0最低,9最高
0-4作为一般的优先级
5-9作为加速优先级
JmsTemplate相关API
public void setPriority(int priority)
public int getPriority()
消息的存活时间
默认
永久有效
JmsTemplate相关API
public void setTimeToLive(long timeToLive)
public long getTimeToLive()
服务质量开关
开启
消息的递送模式、优先级和存活时间的设置--才有效
JmsTemplate相关API
public void setExplicitQosEnabled(boolean explicitQosEnabled)
public boolean isExplicitQosEnabled()
接收等待时间
默认
一直阻塞等待,直到接收到了消息
JmsTemplate相关API
public void setReceiveTimeout(long receiveTimeout)
public long getReceiveTimeout()
超时后返回null
13. Spring-jms的配置
第三部分:实践方案
14. 基于ZooKeeper + ActiveMQ + replicatedLevelDB的主从部署
15. 基于ActiveMQ的统一日志服务
第四部分:测试记录
16. 确认机制的测试
默认
重发6次(共发7次)
然后移到ActiveMQ.DLQ队列;DLQ - dead letter queue
配置
在brokerUrl中指定参数jms.redeliveryPolicy.maximumRedeliveries=3
重发3次(共4次)
17. 事务的测试
18. 持久化的测试
第五部分:其他
19. 问题解决 - 控制Atomikos的日志输出
20. 使用Apache ActiveMQBrowser监控ActiveMQ
21. 异步发送
默认
用异步发送的模式async
未使用事务的前提下发送持久化的消息
同步sync
适用场景
高性能的应用,允许在失败的情况下有少量的数据丢失
异步发送配置
在连接的URI中配置
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
在ConnectionFactory层配置
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
在Connection层配置
((ActiveMQConnection)connection).setUseAsyncSend(true);
0 条评论
下一页