RocketMQ+SpringBoot的基础使用
2025-03-02 20:52:35 0 举报
AI智能生成
RocketMQ+SpringBoot的基础使用,包括生产者、消费者、本地事务、broker回查事务等
作者其他创作
大纲/内容
生产者
普通模式
rocketMQTemplate.convertAndSend(topic, msg)
将msg字符串转换为Message对象,然后发送
topic: 订阅主题
msg: 消息对象
msg: 消息对象
rocketMQTemplate.syncSend(topic, msg)
同步发送,通过返回结果SendResult,获取发送状态,效率低,逐条执行
rocketMQTemplate.asyncSend(topic, msg, SendCallback)
异步发送,效率高,执行结果可以在SendCallback中判断
事务模式
在类上加注解@RocketMQTransactionListener,并实现RocketMQLocalTransactionListener接口
实现本地事务方法,用于处理半提交后的业务(与rabbitMQ服务器握手成功)
实现broker回查时执行的方法,用于实现和消费者的联动
如果存在多个事务处理机制,可以实现多个RocketMQTemplate,然后在注解上指定template即可@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
注意:在发送消息时,要使用extRocketMQTemplate
注意:在发送消息时,要使用extRocketMQTemplate
消费者
在spring容器管理的类中,增加注解@RocketMQMessageListener,并实现RocketMQListener<T>接口的onMessage方法
注解参数
topic = 订阅的消息主题,
consumerGroup = 消费者组,
selectorType = SelectorType.TAG, // 指定消息过滤的类型,基于消息标签(TAG)进行过滤,也可以用sql,但是效率低,一般不用
selectorExpression = "TagA", // 指定过滤表达式,此处只接收TagA的数据,需要和selectorType配合
consumeMode = ConsumeMode.ORDERLY // 设置顺序消费,CONCURRENTLY多个线程同时消费消息,ORDERLY逐条顺序消费
consumerGroup = 消费者组,
selectorType = SelectorType.TAG, // 指定消息过滤的类型,基于消息标签(TAG)进行过滤,也可以用sql,但是效率低,一般不用
selectorExpression = "TagA", // 指定过滤表达式,此处只接收TagA的数据,需要和selectorType配合
consumeMode = ConsumeMode.ORDERLY // 设置顺序消费,CONCURRENTLY多个线程同时消费消息,ORDERLY逐条顺序消费
onMessage方法参数
Message:可以指定tag,事务id等属性
0 条评论
下一页