springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

(1) 發(fā)送方向 MQ 服務(wù)端發(fā)送消息。(2) MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。(3) 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。(4) 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。(5) 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過(guò)固定時(shí)間后MQ Server 將對(duì)該消息發(fā)起消息回查。(6) 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。(7) 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對(duì)半消息進(jìn)行操作。
2 工程
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent> <properties><java.version>1.8</java.version> </properties> <dependencies><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.2</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version></dependency><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version></dependency> </dependencies> <build><plugins> <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.0.RELEASE</version> </plugin> <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration> <source>1.8</source> <target>1.8</target></configuration> </plugin></plugins> </build>2.2 application.yml
rocketmq: name-server: 192.168.38.50:9876 producer: group: transcation-group2.3 TransactionListenerImpl
@RocketMQTransactionListener(txProducerGroup = 'transaction-producer-group')@Slf4jpublic class TransactionListenerImpl implements RocketMQLocalTransactionListener { private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>(); /** * 執(zhí)行業(yè)務(wù)邏輯 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try { System.out.println('用戶A賬戶減500元.'); System.out.println('用戶B賬戶加500元.'); STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) { e.printStackTrace();}STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.UNKNOWN; } /** * 回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);log.info('回查消息 -> transId ={} , state = {}', transId, STATE_MAP.get(transId));return STATE_MAP.get(transId); }}2.4 SpringTransactionProducer
@Component@Slf4jpublic class SpringTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發(fā)送消息 * */ public void sendMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.sendMessageInTransaction('transaction-producer-group', topic, message, null);log.info('發(fā)送成功'); }}2.5 SpringTxConsumer
@Component@RocketMQMessageListener(topic = 'pay_topic',consumerGroup = 'transaction-consumer-group',selectorExpression = '*')@Slf4jpublic class SpringTxConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) {log.info('接收到消息 -> {}', msg); }}2.6 ProducerController
@RestController@RequestMapping('/producer')public class ProducerController { @Autowired private SpringTransactionProducer springTransactionProducer; @GetMapping('/sendMsg') public String sendMsg() {springTransactionProducer.sendMsg('pay_topic', '用戶A賬戶減500元,用戶B賬戶加500元。');return '發(fā)送成功'; }}2.7 RocketApplication
@SpringBootApplicationpublic class RocketApplication { public static void main(String[] args) {SpringApplication.run(RocketApplication.class); }}3 測(cè)試3.1 正常消費(fèi)測(cè)試
描述: 正常啟動(dòng)及可。


描述: 執(zhí)行本地事務(wù)時(shí)添加異常,重啟測(cè)試,發(fā)現(xiàn)消費(fèi)者沒(méi)有收到消息。



到此這篇關(guān)于springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)springboot 分布式事務(wù)內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
1. CentOS郵箱服務(wù)器搭建系列——SMTP服務(wù)器的構(gòu)建( Postfix )2. PHP基礎(chǔ)之生成器4——比較生成器和迭代器對(duì)象3. ASP新手必備的基礎(chǔ)知識(shí)4. Docker 啟動(dòng)Redis 并設(shè)置密碼的操作5. asp文件用什么軟件編輯6. 通過(guò)IEAD+Maven快速搭建SSM項(xiàng)目的過(guò)程(Spring + Spring MVC + Mybatis)7. JS中6個(gè)對(duì)象數(shù)組去重的方法8. vue+element開發(fā)一個(gè)谷歌插件的全過(guò)程9. 利用CSS制作3D動(dòng)畫10. Vue axios獲取token臨時(shí)令牌封裝案例

網(wǎng)公網(wǎng)安備