ActiveMQ
2020-02-20 10:12:42 0 举报
ActiveMQ 源码解析
作者其他创作
大纲/内容
run()线程中异步执行
失败重试次数 > span style=\
broker n
Destination
oneway无作用
deliveredMessages
EXPIRED_ACK_TYPE消息过期
deliveredMessages放到集合中(表示未确认 ACK),对事务性会话进行 ACK 提交
TransportFactory
broker 2
push 模式(默认)
create
ActiveMQMessageProducerSupport
TcpTransportFactory
oneway实现异步请求
send设置消息属性,发送
findTransportFactoryspan style=\
N
ActiveMQ
Y
broker 1
消息数量是否达到 0.65* prefetchSize 或者超过了 optimizeAcknowledgeTimeOut 超时时间
单个确认
MessageDispatch
①
ActiveMQSession
TransportFilter
oneway格式转换
省略了一些包装类 ...
send发送消息
消息过期
transport
beforeMessageIsConsumed()消息消费之前的一些准备工作
从 deliveredMessages 集合中获取值
unconsumedMessages
DUPS_OK_ACKNOWLEDG 且不是队列
②
doConnect()
Connection
消费成功
TransportThreadSupport
createTransport()创建 transport,其就是操作底层 socket
是否开启优化提交
TcpTransport
message
POSION_ACK_TYPE错误消息,会放到死信队列中
Sesssion
send
ActiveMQConnection
broker
asyncSendPacket异步发送消息
ActiveMQConnectionFactory
pull 模式
createActiveMQConnection()创建 ActiveMQ 连接
队列为空
Connection Factory
createTransport()创建 transport
block 阻塞获取
start()省略了包装类
prefetchSizeSize == 0&&unconsumedMessages.isEmpty()
DELIVERED_ACK_TYPE 接收未处理
doStart()启动 TcpTransport 线程
receive
onCommand()对不同的 command 进行不同的处理
MutexTransport
oneway实现写锁
consumer.span style=\
INDIVIDUAL_ACK_TYPE只确认单条消息
REDELIVERED_ACK_TYPE重发消息
抛出异常
doStart()连接 socket启动传输层
STANDARD_ACK_TYPE处理成功,broker 可以删除了
readCommand()从 socket 中获取对象封装成 command
afterMessageIsConsumed()根据策略判断是否需要进行 ACK 提交
ResponseCorrelator
AUTO_ACKNOWLEDGE 或者DUPS_OK_ACKNOWLEDGE且是队列
Consumer
WireFormatNegotiator
oneway通过 SocketOutputStream 将数据写出
ActiveMQMessageProducer
事务会话
...
CLIENT_ACKNOWLEDGE 或者INDIVIDUAL_ACKNOWLEDGE
configureConnection()配置 Connection 参数
结束
connect()
0 条评论
下一页