文章首发于公众号《程序员果果》
地址 : https://mp.weixin.qq.com/s/dYqGd9zi2mNelsNNLIribg
导入依赖:
class="language-markup"><dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.yml:
rocketmq:
name-server: 172.16.250.129:9876
producer:
group: myGroup
原理:
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
应用场景:
这种可靠性同步地发送方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
示例代码:
public void sendMsg() throws Exception {
Message message = new Message(
// 普通消息所属的Topic
"Topic-Normal",
// Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列 RocketMQ 的服务器过滤。
"TagA",
// Message Body可以是任何二进制形式的数据。
"Hello MQ".getBytes()
);
rocketMQTemplate.getProducer().send( message );
// 等同于上面的方式(常用)
//rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());
}
原理:
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。RocketMQ异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
应用场景:
异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
示例代码:
public void sendAsyncMsg() {
Map<String , Object> map = new HashMap<>();
map.put( "name" , "zs" );
map.put( "age" , 20);
rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功。
log.info( "async send success" );
}
@Override
public void onException(Throwable throwable) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
log.info( "async send fail" );
}
} );
}
public void sendOrderlyMsg() {
//根据指定的hashKey按顺序发送
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
// 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
// 全局顺序消息,该字段可以设置为任意非空字符串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey );
// 发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId());
}
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed");
e.printStackTrace();
}
}
}
概念:
Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
适用场景:
消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
示例代码:
public void sendDelayMsg() {
rocketMQTemplate.syncSend( "Topic-Delay",
MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(),
3000,
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
3 );
}
概念:
分布式事务消息的优势:
消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
典型场景:
在电商购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅消息队列RocketMQ的交易订单消息,做相应的业务处理,即可保证最终的数据一致性。
事务消息交互流程如下图所示:
事务消息发送步骤如下:
事务消息回查步骤如下:
示例代码:
发送事务消息包含以下两个步骤:
/**
* 事务消息
*/
public void sendTransactionMsg() {
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
"Topic-Tx:TagA",
MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(),
null );
SendStatus sendStatus = transactionSendResult.getSendStatus();
LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
System.out.println( new Date() + " Send mq message status "+ sendStatus +" , localTransactionState "+ localTransactionState );
}
@Component
@RocketMQTransactionListener
public class TxProducerListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
System.out.println("TX message listener execute local transaction");
RocketMQLocalTransactionState result;
try {
// 业务代码( 例如下订单 )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
System.out.println("execute local transaction error");
result = RocketMQLocalTransactionState.UNKNOWN;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务( 例如检查下订单是否成功 )
System.out.println("TX message listener check local transaction");
RocketMQLocalTransactionState result;
try {
//业务代码( 根据检查结果,决定是COMMIT或ROLLBACK )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 异常就回滚
System.out.println("check local transaction error");
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
}
@Component
@Slf4j
@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")
public class TxConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive message:{}" , message);
}
}
https://github.com/gf-huanchupk/SpringBootLearning
RocketMQ 简介
RocketMQ 安装
?
?