Saga分布式事务框架执行逻辑
📋 目录
- 框架概述
- 核心组件架构
- 数据库表设计
- 完整执行流程
- 节点发现与调用机制
- 精简补偿策略设计
- 总结
框架概述
这是一个基于数据库驱动的Saga分布式事务框架,专门用于解决跨服务间数据同步的一致性问题。框架采用了混合编排模式,结合了集中式任务分解和分布式执行的优势。
核心设计理念
- 🎯 分层解耦: 任务分解与任务执行完全分离
- 🌐 节点自治: 消费端节点独立执行和管理任务
- 📊 状态透明: 完整的执行日志和状态追踪
- 🔄 容错恢复: 失败重试与自动补偿机制
- ⚖️ 负载均衡: 基于节点负载的智能调度
业务场景
- 空间同步开启: 跨服务复制空间、页面、权限等数据
- 增量数据同步: 已开启同步的项目进行增量更新
- 同步关闭清理: 关闭同步时清理相关数据
核心组件架构
``mermaid
graph TBsubgraph "业务触发层"A1[空间同步开启] --> B1[业务端拆解步骤]A2[增量数据更新] --> B1A3[同步关闭清理] --> B1endsubgraph "任务分解层"B1 --> C1[存储distribute_event]C1 --> C2[存储distribute_event_step]C2 --> C3[HTTP发送步骤数据]endsubgraph "消费端接收层"C3 --> D1[消费端接收HTTP请求]D1 --> D2[存储distribute_event_step_log]D2 --> D3[返回接收确认]D3 --> D4[业务端更新状态为待消费]endsubgraph "定时执行层"D4 --> E1[定时任务扫描待执行记录]E1 --> E2[2线程并发控制]E2 --> E3[执行具体业务逻辑]E3 --> E4[HTTP回调通知结果]E4 --> E5[业务端更新状态]end
数据库表设计
📋 核心表结构
1. distribute_event (主事务表)
记录顶层业务事务的基本信息和整体状态。
2. distribute_event_step (步骤表)
记录事务分解后的各个原子步骤信息。
3. distribute_event_step_log (执行日志表) ✨ 完整设计
记录消费端节点的执行日志,实现简洁而强大的幂等性保证、重试机制和通知状态管理。
CREATE TABLE distribute_event_step_log (id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键',step_code VARCHAR(64) NOT NULL COMMENT '关联业务端distribute_event_step.code',job_code VARCHAR(64) NOT NULL COMMENT '主事务编码,关联业务端distribute_event.code',consumer_node VARCHAR(50) NOT NULL COMMENT '消费者节点地址',-- 幂等性保证字段execution_key VARCHAR(128) NOT NULL COMMENT '执行唯一键: {step_code}_{consumer_node}_{yyyyMMdd}',business_key VARCHAR(64) COMMENT '业务唯一键,基于业务数据哈希值',-- 执行状态管理exec_status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '执行状态: PENDING, EXECUTING, SUCCESS, FAILED, RETRYING',retry_count INT DEFAULT 0 COMMENT '当前重试次数',max_retry INT DEFAULT 3 COMMENT '最大重试次数',-- 通知状态管理(新增)notify_status VARCHAR(20) DEFAULT 'NOT_REQUIRED' COMMENT '通知状态: NOT_REQUIRED, PENDING, SUCCESS, FAILED',notify_retry_count INT DEFAULT 0 COMMENT '通知重试次数',max_notify_retry INT DEFAULT 3 COMMENT '最大通知重试次数',next_notify_time TIMESTAMP NULL COMMENT '下次通知时间',notify_url VARCHAR(255) COMMENT '通知回调地址',-- 执行信息payload TEXT COMMENT '执行数据载荷',result_data TEXT COMMENT '执行结果数据',error_message TEXT COMMENT '执行错误信息',notify_error_message TEXT COMMENT '通知错误信息',start_time TIMESTAMP NULL COMMENT '开始执行时间',end_time TIMESTAMP NULL COMMENT '结束执行时间',next_retry_time TIMESTAMP NULL COMMENT '下次执行重试时间',-- 回滚支持rollback_data TEXT COMMENT '回滚数据快照,JSON格式',is_rollback TINYINT(1) DEFAULT 0 COMMENT '是否已回滚',create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',-- 幂等性保证索引UNIQUE KEY uk_execution_key (execution_key),UNIQUE KEY uk_business_key (step_code, business_key),-- 查询优化索引INDEX idx_exec_status (exec_status),INDEX idx_notify_status (notify_status, next_notify_time),INDEX idx_step_code (step_code),INDEX idx_consumer_node (consumer_node),INDEX idx_retry_time (next_retry_time),INDEX idx_job_code (job_code)
);
📊 字段设计详解与使用说明
1. 幂等性保证字段
/*** execution_key: 执行唯一键* 用途: 防止同一步骤在同一节点同一天重复执行* 格式: SPACE_SYNC_001_192.168.1.10:8080_20240316* 使用场景: 消费端接收HTTP请求时检查是否已存在相同的execution_key*/
public String generateExecutionKey(String stepCode, String consumerNode) {String dateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));return String.format("%s_%s_%s", stepCode, consumerNode, dateStr);
}/*** business_key: 业务唯一键* 用途: 基于业务数据内容的去重,防止相同业务数据重复处理* 生成: 对payload业务数据进行MD5哈希* 使用场景: 当业务数据完全相同时避免重复执行*/
public String generateBusinessKey(Object payload) {String payloadJson = JSON.toJSONString(payload);return DigestUtils.md5DigestAsHex(payloadJson.getBytes()).substring(0, 8);
}
2. 状态管理字段
/*** exec_status: 执行状态* PENDING: 待执行 - 刚接收到任务,等待定时器扫描* EXECUTING: 执行中 - 正在执行业务逻辑* SUCCESS: 执行成功 - 业务逻辑执行完成* FAILED: 执行失败 - 重试次数耗尽后的最终失败状态* RETRYING: 重试中 - 执行失败后等待重试的状态*//*** notify_status: 通知状态* NOT_REQUIRED: 无需通知 - 执行中或失败时的默认状态* PENDING: 待通知 - 执行成功后需要通知业务端* SUCCESS: 通知成功 - 业务端已收到通知并确认* FAILED: 通知失败 - 通知重试次数耗尽后的状态*/
3. 重试机制字段
/*** retry_count / max_retry: 执行重试控制* 用途: 控制业务逻辑执行的重试次数,避免无限重试* 逻辑: 失败时retry_count+1,超过max_retry则标记为FAILED*//*** notify_retry_count / max_notify_retry: 通知重试控制* 用途: 控制通知业务端的重试次数,确保通知到位* 逻辑: 通知失败时notify_retry_count+1,采用指数退避策略*//*** next_retry_time / next_notify_time: 重试时间控制* 用途: 指数退避算法的时间调度* 计算: delay = Math.pow(2, retryCount) * baseDelaySeconds*/
4. 数据存储字段
/*** payload: 执行数据载荷* 用途: 存储从业务端接收的步骤执行数据* 格式: JSON字符串,包含业务逻辑执行所需的所有参数*//*** result_data: 执行结果数据* 用途: 存储业务逻辑执行后的结果,用于通知业务端* 格式: JSON字符串,包含执行结果、影响的数据ID等*//*** rollback_data: 回滚数据快照* 用途: 存储执行前的原始数据状态,用于失败回滚* 格式: JSON字符串,包含需要删除的数据ID列表等*/
完整执行流程
🎆 整体流程时序图(数据库分离)
sequenceDiagramparticipant BIZ as 业务服务Aparticipant BIZ_DB as 业务端数据库participant BIZ2 as 业务服务B(消费端)participant CONSUMER_DB as 消费端数据库participant TASK as 定时任务Note over BIZ,TASK: 阶段一: 业务触发与任务分解BIZ->>BIZ: 1. 业务触发(空间同步/增量更新等)BIZ->>BIZ: 2. 执行步骤拆解逻辑BIZ->>BIZ_DB: 3. 存储主事务 distribute_eventBIZ->>BIZ_DB: 4. 存储子步骤 distribute_event_stepNote over BIZ,TASK: 阶段二: HTTP任务分发BIZ->>BIZ2: 5. HTTP请求发送步骤数据BIZ2->>CONSUMER_DB: 6. 存储执行日志 distribute_event_step_logBIZ2->>BIZ: 7. 返回接收确认BIZ->>BIZ_DB: 8. 更新步骤状态为'待消费'Note over BIZ,TASK: 阶段三: 定时消费执行(限流2线程)TASK->>CONSUMER_DB: 9. 扫描待执行状态记录TASK->>TASK: 10. 并发控制(最多2线程)TASK->>CONSUMER_DB: 11. 更新exec_status为'EXECUTING'TASK->>TASK: 12. 执行具体业务逻辑alt 执行成功TASK->>CONSUMER_DB: 13a. 更新exec_status为'SUCCESS'TASK->>CONSUMER_DB: 14a. 设置notify_status为'PENDING'Note over BIZ,TASK: 阶段四: 通知业务端(指数退避重试)TASK->>BIZ: 15a. HTTP回调通知执行结果alt 通知成功BIZ->>TASK: 16a. 返回200状态TASK->>CONSUMER_DB: 17a. 更新notify_status为'SUCCESS'BIZ->>BIZ_DB: 18a. 更新distribute_event_step状态BIZ->>BIZ_DB: 19a. 更新distribute_event主事务状态else 通知失败BIZ->>TASK: 16b. 返回非200状态或网络异常TASK->>CONSUMER_DB: 17b. notify_retry_count+1alt 通知重试次数 < 3TASK->>CONSUMER_DB: 18b. 计算next_notify_time(指数退避)Note right of TASK: 等待指数退避时间后重新通知TASK->>BIZ: 19b. 重新发送通知(循环至16a)else 通知重试次数 >= 3TASK->>CONSUMER_DB: 20b. 更新notify_status为'FAILED'Note right of TASK: 通知失败,需人工介入endendelse 执行失败TASK->>CONSUMER_DB: 13c. 更新retry_count+1alt 执行重试次数 < 3TASK->>CONSUMER_DB: 14c. 更新exec_status为'RETRYING'TASK->>CONSUMER_DB: 15c. 删除当前记录TASK->>CONSUMER_DB: 16c. 重新插入新记录(计算next_retry_time)Note right of TASK: 等待指数退避时间后重新执行else 执行重试次数 >= 3TASK->>CONSUMER_DB: 17c. 更新exec_status为'FAILED'TASK->>BIZ: 18c. 通知执行最终失败BIZ->>BIZ: 19c. 触发回滚逻辑(删除重新开始)endend
整体执行流程图
flowchart TDA[业务触发] --> B[拆解步骤存储]B --> C[HTTP发送步骤数据]C --> D[消费端存储日志]D --> E[更新状态为待消费]E --> F[定时任务扫描]F --> G[2线程并发执行]G --> H{执行结果}H -->|成功| I[HTTP回调通知]H -->|失败| J{重试次数}J -->|<3次| K[指数退避重试]J -->|>=3次| L[触发回滚删除]I --> M[更新主事务状态]L --> N[通知失败完成]
节点发现与调用机制
服务节点通过心跳注册,基于负载权重选择最优节点执行任务。
@Service
public class DistributeNodeRegistry {@Scheduled(fixedRate = 30000) // 每30秒心跳public void heartbeat() {String nodeAddress = getLocalNodeAddress();NodeMetadata metadata = collectNodeMetadata(); // 收集CPU、内存、任务数量等nodeRegistryMapper.upsertNode(NodeRegistryRecord.builder().serviceName(getServiceName()).nodeAddress(nodeAddress).status(1) // 在线.lastHeartbeatTime(new Date()).metadata(JSON.toJSONString(metadata)).build());}// 选择最优节点(基于负载权重)public String selectOptimalNode(String serviceName) {List<NodeRegistryRecord> nodes = nodeRegistryMapper.selectAvailableNodes(serviceName);return nodes.stream().min(Comparator.comparing(this::calculateNodeLoad)).map(NodeRegistryRecord::getNodeAddress).orElseThrow(() -> new NoAvailableNodeException("无可用节点"));}
}
🔄 精简补偿策略设计
执行重试与通知重试机制
@Component
public class RetryAndNotificationService {/*** 处理执行失败 - 重试机制(删除重新插入)*/public void handleExecutionFailure(DistributeEventStepLog stepLog, String errorMessage) {int currentRetry = stepLog.getRetryCount();if (currentRetry < stepLog.getMaxRetry()) {// 还有重试机会 - 删除重新插入executeRetryByReinsert(stepLog, errorMessage);} else {// 重试次数耗尽,触发回滚triggerRollback(stepLog, errorMessage);}}/*** 执行重试逻辑:删除重新插入*/private void executeRetryByReinsert(DistributeEventStepLog stepLog, String errorMessage) {try {// 1. 更新状态为 RETRYINGstepLogMapper.updateStatus(stepLog.getId(), "RETRYING", errorMessage);// 2. 计算下次重试时间(指数退避)long delaySeconds = (long) Math.pow(2, stepLog.getRetryCount() + 1) * 30;Timestamp nextRetryTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);// 3. 删除当前记录stepLogMapper.deleteById(stepLog.getId());// 4. 重新插入新记录DistributeEventStepLog newStepLog = DistributeEventStepLog.builder().stepCode(stepLog.getStepCode()).jobCode(stepLog.getJobCode()).consumerNode(stepLog.getConsumerNode()).executionKey(stepLog.getExecutionKey()).businessKey(stepLog.getBusinessKey()).execStatus("PENDING").retryCount(stepLog.getRetryCount() + 1).maxRetry(stepLog.getMaxRetry()).nextRetryTime(nextRetryTime).payload(stepLog.getPayload()).rollbackData(stepLog.getRollbackData()).build();stepLogMapper.insert(newStepLog);log.info("执行重试安排成功,第{}次重试,下次执行时间: {}", newStepLog.getRetryCount(), nextRetryTime);} catch (Exception e) {log.error("执行重试安排失败: {}", stepLog.getStepCode(), e);}}/*** 处理通知失败 - 指数退避重试*/public void handleNotificationFailure(DistributeEventStepLog stepLog, String notifyErrorMessage) {int currentNotifyRetry = stepLog.getNotifyRetryCount();if (currentNotifyRetry < stepLog.getMaxNotifyRetry()) {// 还有通知重试机会 - 指数退避scheduleNotificationRetry(stepLog, notifyErrorMessage);} else {// 通知重试次数耗尽markNotificationFailed(stepLog, notifyErrorMessage);}}/*** 安排通知重试(指数退避)*/private void scheduleNotificationRetry(DistributeEventStepLog stepLog, String notifyErrorMessage) {try {int nextNotifyRetryCount = stepLog.getNotifyRetryCount() + 1;// 指数退避算法: 2^n * 60秒long delaySeconds = (long) Math.pow(2, nextNotifyRetryCount) * 60;Timestamp nextNotifyTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);stepLogMapper.updateNotificationForRetry(stepLog.getId(),nextNotifyRetryCount,nextNotifyTime,notifyErrorMessage);log.info("通知重试安排成功,第{}次重试,下次通知时间: {}", nextNotifyRetryCount, nextNotifyTime);} catch (Exception e) {log.error("通知重试安排失败: {}", stepLog.getStepCode(), e);}}/*** 回滚机制:删除重新开始*/private void triggerRollback(DistributeEventStepLog stepLog, String errorMessage) {try {// 1. 标记为执行失败stepLogMapper.updateStatus(stepLog.getId(), "FAILED", errorMessage);// 2. 执行回滚操作(删除相关数据)executeRollbackAction(stepLog);// 3. 标记回滚完成stepLogMapper.markAsRollback(stepLog.getId());// 4. 通知业务端失败notifyBusinessFailure(stepLog);} catch (Exception e) {log.error("回滚执行失败: {}", stepLog.getStepCode(), e);alertService.sendRollbackFailureAlert(stepLog, e);}}// 执行具体的回滚操作private void executeRollbackAction(DistributeEventStepLog stepLog) {String rollbackData = stepLog.getRollbackData();if (StringUtils.isBlank(rollbackData)) return;RollbackSnapshot snapshot = JSON.parseObject(rollbackData, RollbackSnapshot.class);switch (snapshot.getStepType()) {case "PAGE_CREATE":pageService.deleteById(snapshot.getEntityId());break;case "DATA_COPY":dataService.deleteBatch(snapshot.getDataIds());break;case "PERMISSION_GRANT":permissionService.revoke(snapshot.getPermissionIds());break;}}
}
总结
主要特点:
✅ 流程清晰: 业务拆解 → HTTP分发 → 定时消费 → 状态同步
✅ 幂等性简单: 一天一次执行保证,避免重复处理
✅ 重试机制: 最多3次,指数退避,失败后智能回滚
✅ 回滚策略: 删除重新开始,简单有效
✅ 并发控制: 2线程限流,避免资源争抢
✅ 状态追踪: 完整的执行链路监控
核心优势:
🎯 设计精简: 去除复杂的多重幂等性策略,采用基于日期的简单方案
💡 实用性强: 回滚即删除,符合业务实际需求
🔧 易于维护: 清晰的代码结构和执行流程
⚡ 性能优化: 合理的并发控制和索引设计
🛡️ 可靠性高: 完善的重试和回滚机制