当前位置: 首页 > news >正文

Saga分布式事务框架执行逻辑

Saga分布式事务框架执行逻辑

📋 目录

  • 框架概述
  • 核心组件架构
  • 数据库表设计
  • 完整执行流程
  • 节点发现与调用机制
  • 精简补偿策略设计
  • 总结

框架概述

这是一个基于数据库驱动的Saga分布式事务框架,专门用于解决跨服务间数据同步的一致性问题。框架采用了混合编排模式,结合了集中式任务分解和分布式执行的优势。

核心设计理念

  • 🎯 分层解耦: 任务分解与任务执行完全分离
  • 🌐 节点自治: 消费端节点独立执行和管理任务
  • 📊 状态透明: 完整的执行日志和状态追踪
  • 🔄 容错恢复: 失败重试与自动补偿机制
  • ⚖️ 负载均衡: 基于节点负载的智能调度

业务场景

  • 空间同步开启: 跨服务复制空间、页面、权限等数据
  • 增量数据同步: 已开启同步的项目进行增量更新
  • 同步关闭清理: 关闭同步时清理相关数据

核心组件架构

``mermaid

graph TBsubgraph "业务触发层"A1[空间同步开启] --> B1[业务端拆解步骤]A2[增量数据更新] --> B1A3[同步关闭清理] --> B1endsubgraph "任务分解层"B1 --> C1[存储distribute_event]C1 --> C2[存储distribute_event_step]C2 --> C3[HTTP发送步骤数据]endsubgraph "消费端接收层"C3 --> D1[消费端接收HTTP请求]D1 --> D2[存储distribute_event_step_log]D2 --> D3[返回接收确认]D3 --> D4[业务端更新状态为待消费]endsubgraph "定时执行层"D4 --> E1[定时任务扫描待执行记录]E1 --> E2[2线程并发控制]E2 --> E3[执行具体业务逻辑]E3 --> E4[HTTP回调通知结果]E4 --> E5[业务端更新状态]end

数据库表设计

📋 核心表结构

1. distribute_event (主事务表)

记录顶层业务事务的基本信息和整体状态。

2. distribute_event_step (步骤表)

记录事务分解后的各个原子步骤信息。

3. distribute_event_step_log (执行日志表) ✨ 完整设计

记录消费端节点的执行日志,实现简洁而强大的幂等性保证、重试机制和通知状态管理。

CREATE TABLE distribute_event_step_log (id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键',step_code VARCHAR(64) NOT NULL COMMENT '关联业务端distribute_event_step.code',job_code VARCHAR(64) NOT NULL COMMENT '主事务编码,关联业务端distribute_event.code',consumer_node VARCHAR(50) NOT NULL COMMENT '消费者节点地址',-- 幂等性保证字段execution_key VARCHAR(128) NOT NULL COMMENT '执行唯一键: {step_code}_{consumer_node}_{yyyyMMdd}',business_key VARCHAR(64) COMMENT '业务唯一键,基于业务数据哈希值',-- 执行状态管理exec_status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '执行状态: PENDING, EXECUTING, SUCCESS, FAILED, RETRYING',retry_count INT DEFAULT 0 COMMENT '当前重试次数',max_retry INT DEFAULT 3 COMMENT '最大重试次数',-- 通知状态管理(新增)notify_status VARCHAR(20) DEFAULT 'NOT_REQUIRED' COMMENT '通知状态: NOT_REQUIRED, PENDING, SUCCESS, FAILED',notify_retry_count INT DEFAULT 0 COMMENT '通知重试次数',max_notify_retry INT DEFAULT 3 COMMENT '最大通知重试次数',next_notify_time TIMESTAMP NULL COMMENT '下次通知时间',notify_url VARCHAR(255) COMMENT '通知回调地址',-- 执行信息payload TEXT COMMENT '执行数据载荷',result_data TEXT COMMENT '执行结果数据',error_message TEXT COMMENT '执行错误信息',notify_error_message TEXT COMMENT '通知错误信息',start_time TIMESTAMP NULL COMMENT '开始执行时间',end_time TIMESTAMP NULL COMMENT '结束执行时间',next_retry_time TIMESTAMP NULL COMMENT '下次执行重试时间',-- 回滚支持rollback_data TEXT COMMENT '回滚数据快照,JSON格式',is_rollback TINYINT(1) DEFAULT 0 COMMENT '是否已回滚',create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',-- 幂等性保证索引UNIQUE KEY uk_execution_key (execution_key),UNIQUE KEY uk_business_key (step_code, business_key),-- 查询优化索引INDEX idx_exec_status (exec_status),INDEX idx_notify_status (notify_status, next_notify_time),INDEX idx_step_code (step_code),INDEX idx_consumer_node (consumer_node),INDEX idx_retry_time (next_retry_time),INDEX idx_job_code (job_code)
);

📊 字段设计详解与使用说明

1. 幂等性保证字段

/*** execution_key: 执行唯一键* 用途: 防止同一步骤在同一节点同一天重复执行* 格式: SPACE_SYNC_001_192.168.1.10:8080_20240316* 使用场景: 消费端接收HTTP请求时检查是否已存在相同的execution_key*/
public String generateExecutionKey(String stepCode, String consumerNode) {String dateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));return String.format("%s_%s_%s", stepCode, consumerNode, dateStr);
}/*** business_key: 业务唯一键* 用途: 基于业务数据内容的去重,防止相同业务数据重复处理* 生成: 对payload业务数据进行MD5哈希* 使用场景: 当业务数据完全相同时避免重复执行*/
public String generateBusinessKey(Object payload) {String payloadJson = JSON.toJSONString(payload);return DigestUtils.md5DigestAsHex(payloadJson.getBytes()).substring(0, 8);
}

2. 状态管理字段

/*** exec_status: 执行状态* PENDING: 待执行 - 刚接收到任务,等待定时器扫描* EXECUTING: 执行中 - 正在执行业务逻辑* SUCCESS: 执行成功 - 业务逻辑执行完成* FAILED: 执行失败 - 重试次数耗尽后的最终失败状态* RETRYING: 重试中 - 执行失败后等待重试的状态*//*** notify_status: 通知状态* NOT_REQUIRED: 无需通知 - 执行中或失败时的默认状态* PENDING: 待通知 - 执行成功后需要通知业务端* SUCCESS: 通知成功 - 业务端已收到通知并确认* FAILED: 通知失败 - 通知重试次数耗尽后的状态*/

3. 重试机制字段

/*** retry_count / max_retry: 执行重试控制* 用途: 控制业务逻辑执行的重试次数,避免无限重试* 逻辑: 失败时retry_count+1,超过max_retry则标记为FAILED*//*** notify_retry_count / max_notify_retry: 通知重试控制* 用途: 控制通知业务端的重试次数,确保通知到位* 逻辑: 通知失败时notify_retry_count+1,采用指数退避策略*//*** next_retry_time / next_notify_time: 重试时间控制* 用途: 指数退避算法的时间调度* 计算: delay = Math.pow(2, retryCount) * baseDelaySeconds*/

4. 数据存储字段

/*** payload: 执行数据载荷* 用途: 存储从业务端接收的步骤执行数据* 格式: JSON字符串,包含业务逻辑执行所需的所有参数*//*** result_data: 执行结果数据* 用途: 存储业务逻辑执行后的结果,用于通知业务端* 格式: JSON字符串,包含执行结果、影响的数据ID等*//*** rollback_data: 回滚数据快照* 用途: 存储执行前的原始数据状态,用于失败回滚* 格式: JSON字符串,包含需要删除的数据ID列表等*/

完整执行流程

🎆 整体流程时序图(数据库分离)

sequenceDiagramparticipant BIZ as 业务服务Aparticipant BIZ_DB as 业务端数据库participant BIZ2 as 业务服务B(消费端)participant CONSUMER_DB as 消费端数据库participant TASK as 定时任务Note over BIZ,TASK: 阶段一: 业务触发与任务分解BIZ->>BIZ: 1. 业务触发(空间同步/增量更新等)BIZ->>BIZ: 2. 执行步骤拆解逻辑BIZ->>BIZ_DB: 3. 存储主事务 distribute_eventBIZ->>BIZ_DB: 4. 存储子步骤 distribute_event_stepNote over BIZ,TASK: 阶段二: HTTP任务分发BIZ->>BIZ2: 5. HTTP请求发送步骤数据BIZ2->>CONSUMER_DB: 6. 存储执行日志 distribute_event_step_logBIZ2->>BIZ: 7. 返回接收确认BIZ->>BIZ_DB: 8. 更新步骤状态为'待消费'Note over BIZ,TASK: 阶段三: 定时消费执行(限流2线程)TASK->>CONSUMER_DB: 9. 扫描待执行状态记录TASK->>TASK: 10. 并发控制(最多2线程)TASK->>CONSUMER_DB: 11. 更新exec_status为'EXECUTING'TASK->>TASK: 12. 执行具体业务逻辑alt 执行成功TASK->>CONSUMER_DB: 13a. 更新exec_status为'SUCCESS'TASK->>CONSUMER_DB: 14a. 设置notify_status为'PENDING'Note over BIZ,TASK: 阶段四: 通知业务端(指数退避重试)TASK->>BIZ: 15a. HTTP回调通知执行结果alt 通知成功BIZ->>TASK: 16a. 返回200状态TASK->>CONSUMER_DB: 17a. 更新notify_status为'SUCCESS'BIZ->>BIZ_DB: 18a. 更新distribute_event_step状态BIZ->>BIZ_DB: 19a. 更新distribute_event主事务状态else 通知失败BIZ->>TASK: 16b. 返回非200状态或网络异常TASK->>CONSUMER_DB: 17b. notify_retry_count+1alt 通知重试次数 < 3TASK->>CONSUMER_DB: 18b. 计算next_notify_time(指数退避)Note right of TASK: 等待指数退避时间后重新通知TASK->>BIZ: 19b. 重新发送通知(循环至16a)else 通知重试次数 >= 3TASK->>CONSUMER_DB: 20b. 更新notify_status为'FAILED'Note right of TASK: 通知失败,需人工介入endendelse 执行失败TASK->>CONSUMER_DB: 13c. 更新retry_count+1alt 执行重试次数 < 3TASK->>CONSUMER_DB: 14c. 更新exec_status为'RETRYING'TASK->>CONSUMER_DB: 15c. 删除当前记录TASK->>CONSUMER_DB: 16c. 重新插入新记录(计算next_retry_time)Note right of TASK: 等待指数退避时间后重新执行else 执行重试次数 >= 3TASK->>CONSUMER_DB: 17c. 更新exec_status为'FAILED'TASK->>BIZ: 18c. 通知执行最终失败BIZ->>BIZ: 19c. 触发回滚逻辑(删除重新开始)endend

整体执行流程图

flowchart TDA[业务触发] --> B[拆解步骤存储]B --> C[HTTP发送步骤数据]C --> D[消费端存储日志]D --> E[更新状态为待消费]E --> F[定时任务扫描]F --> G[2线程并发执行]G --> H{执行结果}H -->|成功| I[HTTP回调通知]H -->|失败| J{重试次数}J -->|<3次| K[指数退避重试]J -->|>=3次| L[触发回滚删除]I --> M[更新主事务状态]L --> N[通知失败完成]

节点发现与调用机制

服务节点通过心跳注册,基于负载权重选择最优节点执行任务。

@Service
public class DistributeNodeRegistry {@Scheduled(fixedRate = 30000) // 每30秒心跳public void heartbeat() {String nodeAddress = getLocalNodeAddress();NodeMetadata metadata = collectNodeMetadata(); // 收集CPU、内存、任务数量等nodeRegistryMapper.upsertNode(NodeRegistryRecord.builder().serviceName(getServiceName()).nodeAddress(nodeAddress).status(1) // 在线.lastHeartbeatTime(new Date()).metadata(JSON.toJSONString(metadata)).build());}// 选择最优节点(基于负载权重)public String selectOptimalNode(String serviceName) {List<NodeRegistryRecord> nodes = nodeRegistryMapper.selectAvailableNodes(serviceName);return nodes.stream().min(Comparator.comparing(this::calculateNodeLoad)).map(NodeRegistryRecord::getNodeAddress).orElseThrow(() -> new NoAvailableNodeException("无可用节点"));}
}

🔄 精简补偿策略设计

执行重试与通知重试机制

@Component
public class RetryAndNotificationService {/*** 处理执行失败 - 重试机制(删除重新插入)*/public void handleExecutionFailure(DistributeEventStepLog stepLog, String errorMessage) {int currentRetry = stepLog.getRetryCount();if (currentRetry < stepLog.getMaxRetry()) {// 还有重试机会 - 删除重新插入executeRetryByReinsert(stepLog, errorMessage);} else {// 重试次数耗尽,触发回滚triggerRollback(stepLog, errorMessage);}}/*** 执行重试逻辑:删除重新插入*/private void executeRetryByReinsert(DistributeEventStepLog stepLog, String errorMessage) {try {// 1. 更新状态为 RETRYINGstepLogMapper.updateStatus(stepLog.getId(), "RETRYING", errorMessage);// 2. 计算下次重试时间(指数退避)long delaySeconds = (long) Math.pow(2, stepLog.getRetryCount() + 1) * 30;Timestamp nextRetryTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);// 3. 删除当前记录stepLogMapper.deleteById(stepLog.getId());// 4. 重新插入新记录DistributeEventStepLog newStepLog = DistributeEventStepLog.builder().stepCode(stepLog.getStepCode()).jobCode(stepLog.getJobCode()).consumerNode(stepLog.getConsumerNode()).executionKey(stepLog.getExecutionKey()).businessKey(stepLog.getBusinessKey()).execStatus("PENDING").retryCount(stepLog.getRetryCount() + 1).maxRetry(stepLog.getMaxRetry()).nextRetryTime(nextRetryTime).payload(stepLog.getPayload()).rollbackData(stepLog.getRollbackData()).build();stepLogMapper.insert(newStepLog);log.info("执行重试安排成功,第{}次重试,下次执行时间: {}", newStepLog.getRetryCount(), nextRetryTime);} catch (Exception e) {log.error("执行重试安排失败: {}", stepLog.getStepCode(), e);}}/*** 处理通知失败 - 指数退避重试*/public void handleNotificationFailure(DistributeEventStepLog stepLog, String notifyErrorMessage) {int currentNotifyRetry = stepLog.getNotifyRetryCount();if (currentNotifyRetry < stepLog.getMaxNotifyRetry()) {// 还有通知重试机会 - 指数退避scheduleNotificationRetry(stepLog, notifyErrorMessage);} else {// 通知重试次数耗尽markNotificationFailed(stepLog, notifyErrorMessage);}}/*** 安排通知重试(指数退避)*/private void scheduleNotificationRetry(DistributeEventStepLog stepLog, String notifyErrorMessage) {try {int nextNotifyRetryCount = stepLog.getNotifyRetryCount() + 1;// 指数退避算法: 2^n * 60秒long delaySeconds = (long) Math.pow(2, nextNotifyRetryCount) * 60;Timestamp nextNotifyTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);stepLogMapper.updateNotificationForRetry(stepLog.getId(),nextNotifyRetryCount,nextNotifyTime,notifyErrorMessage);log.info("通知重试安排成功,第{}次重试,下次通知时间: {}", nextNotifyRetryCount, nextNotifyTime);} catch (Exception e) {log.error("通知重试安排失败: {}", stepLog.getStepCode(), e);}}/*** 回滚机制:删除重新开始*/private void triggerRollback(DistributeEventStepLog stepLog, String errorMessage) {try {// 1. 标记为执行失败stepLogMapper.updateStatus(stepLog.getId(), "FAILED", errorMessage);// 2. 执行回滚操作(删除相关数据)executeRollbackAction(stepLog);// 3. 标记回滚完成stepLogMapper.markAsRollback(stepLog.getId());// 4. 通知业务端失败notifyBusinessFailure(stepLog);} catch (Exception e) {log.error("回滚执行失败: {}", stepLog.getStepCode(), e);alertService.sendRollbackFailureAlert(stepLog, e);}}// 执行具体的回滚操作private void executeRollbackAction(DistributeEventStepLog stepLog) {String rollbackData = stepLog.getRollbackData();if (StringUtils.isBlank(rollbackData)) return;RollbackSnapshot snapshot = JSON.parseObject(rollbackData, RollbackSnapshot.class);switch (snapshot.getStepType()) {case "PAGE_CREATE":pageService.deleteById(snapshot.getEntityId());break;case "DATA_COPY":dataService.deleteBatch(snapshot.getDataIds());break;case "PERMISSION_GRANT":permissionService.revoke(snapshot.getPermissionIds());break;}}
}

总结

主要特点
流程清晰: 业务拆解 → HTTP分发 → 定时消费 → 状态同步
幂等性简单: 一天一次执行保证,避免重复处理
重试机制: 最多3次,指数退避,失败后智能回滚
回滚策略: 删除重新开始,简单有效
并发控制: 2线程限流,避免资源争抢
状态追踪: 完整的执行链路监控

核心优势
🎯 设计精简: 去除复杂的多重幂等性策略,采用基于日期的简单方案
💡 实用性强: 回滚即删除,符合业务实际需求
🔧 易于维护: 清晰的代码结构和执行流程
性能优化: 合理的并发控制和索引设计
🛡️ 可靠性高: 完善的重试和回滚机制

http://www.wxhsa.cn/company.asp?id=5647

相关文章:

  • 在Android开发中实现两个Intent跳转及数据交换的方法
  • ARC188 做题记
  • AT_arc145_d [ARC145D] Non Arithmetic Progression Set
  • Microsoft AI Genius | 第三集实战课正式开启:用 Copilot Studio 定制你的专属智能体
  • C# 多线程编程核心要点:不只是Thread和lock
  • 基于MATLAB的图像融合拼接GUI系统设计
  • Python使用多线程和异步调用
  • 研究生学术英语读写教程(中国科学院大学出版) Unit10 TextA 原文以及翻译(仅供学习)
  • 基于Python+Vue开发的蛋糕商城管理系统源码+运行步骤
  • 某运营商智慧协同平台——构建高效、敏捷的运营管理新模式
  • go使用反射获取http.Request参数到结构体 - 实践
  • 基于MATLAB/Simulink的TI2000系列DSP模型设计
  • 挖矿木马病毒清理手册
  • nginx 常用参数
  • Python常见函数和代码示例
  • Java开发电脑开荒软件
  • 69-SQLite应用 - 详解
  • mysql 源码下载,从获取到安装的完整指南
  • docker中centos7配置
  • centos7虚拟机下系统环境配置
  • CefSharp高版本问题
  • 前缀和pre,如何求总和:pre(r) - pre(l)(1 = l = r = n),以及|pre(r) - pre(l)|
  • P11537 [NOISG 2023 Finals] Toxic Gene 题解
  • keil5中stm32相关记录
  • centos7中mysql环境配置
  • centos7中php环境配置
  • Symfony学习笔记 - 利用Doctrine开发一个学生信息的增删查改
  • 函数计算进化之路:AI Sandbox 新基座
  • linux通过smb共享文件夹,windows进行连接
  • 强制Apache Web服务器始终使用https