当前位置: 首页 > news >正文

命令模式的深度解析:从标准实现到TPL Dataflow高性能架构

命令模式是对一类对象公共操作的抽象,它们具有相同的方法签名,所以具有类似的操作,可以被抽象出来,成为一个抽象的命令对象。实际操作的调用者就不是和一组对象打交道,它是需要以来这个命令对象的方法签名,并根据这个签名调用相关的方法。

以上是命令模式的大概含义,这里可以联想到事件驱动,command和handler,也可以联想到AOP的思想。联想到数据流的操作我就写了个数据流操作类库。

Snipaste_2025-09-14_15-03-02

Snipaste_2025-09-14_15-03-15

之前写了一些有关AOP的,但是感觉还是差点意思,补上这次的可能在项目中会弥补一些短板回来,就是灵活性。
但是该项目重点是数据流的处理,所以web端来实现只是一个例子,大量数据的处理最主要的是后台任务吧,通过接口调用只是一个实例展示。

有关数据流这块代码核心如下:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Common.Bus.Core;
using Common.Bus.Monitoring;namespace Common.Bus.Implementations
{/// <summary>/// 基于TPL数据流的高性能CommandBus实现/// 支持并行处理、背压控制和监控/// </summary>public class DataflowCommandBus : ICommandBus, IDisposable{private readonly IServiceProvider _provider;private readonly ILogger<DataflowCommandBus>? _logger;private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();// 数据流网络private ActionBlock<DataflowCommandRequest> _commandProcessor = null!;// 背压控制private readonly SemaphoreSlim _concurrencyLimiter;private readonly int _maxConcurrency;// 监控指标private long _processedCommands;private long _failedCommands;private long _totalProcessingTime;public DataflowCommandBus(IServiceProvider serviceProvider, ILogger<DataflowCommandBus>? logger = null, int? maxConcurrency = null){_provider = serviceProvider;_logger = logger;_maxConcurrency = maxConcurrency ?? Environment.ProcessorCount * 2;_concurrencyLimiter = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);// 创建数据流网络
            CreateDataflowNetwork();}private void CreateDataflowNetwork(){// 创建命令处理器_commandProcessor = new ActionBlock<DataflowCommandRequest>(async request =>{try{await _concurrencyLimiter.WaitAsync();var startTime = DateTime.UtcNow;// 执行完整的命令处理管道var result = await ProcessCommandPipeline(request);var processingTime = DateTime.UtcNow - startTime;Interlocked.Add(ref _totalProcessingTime, processingTime.Ticks);Interlocked.Increment(ref _processedCommands);request.TaskCompletionSource.SetResult(result);}catch (Exception ex){Interlocked.Increment(ref _failedCommands);_logger?.LogError(ex, "Command processing failed for {CommandType}", request.CommandType.Name);request.TaskCompletionSource.SetException(ex);}finally{_concurrencyLimiter.Release();}},new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxConcurrency,BoundedCapacity = _maxConcurrency * 2});}public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) where TCommand : ICommand<TResult>{var commandType = typeof(TCommand);var requestId = Guid.NewGuid();var tcs = new TaskCompletionSource<object>();var request = new DataflowCommandRequest(requestId, commandType, typeof(TResult), command, tcs);// 发送到数据流网络if (!_commandProcessor.Post(request)){throw new InvalidOperationException("Unable to queue command for processing - system may be overloaded");}try{var result = await tcs.Task.WaitAsync(ct);return (TResult)result;}catch (OperationCanceledException) when (ct.IsCancellationRequested){_logger?.LogWarning("Command {CommandType} was cancelled", commandType.Name);throw;}}private async Task<object> ProcessCommandPipeline(DataflowCommandRequest request){// 使用反射调用泛型方法var method = typeof(DataflowCommandBus).GetMethod(nameof(ProcessCommandPipelineGeneric), BindingFlags.NonPublic | BindingFlags.Instance);var genericMethod = method!.MakeGenericMethod(request.CommandType, request.ResultType);var task = (Task)genericMethod.Invoke(this, new object[] { request })!;await task;var resultProperty = task.GetType().GetProperty("Result");return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");}private async Task<TResult> ProcessCommandPipelineGeneric<TCommand, TResult>(DataflowCommandRequest request) where TCommand : ICommand<TResult>{// 获取处理器和行为的工厂函数var handlerFactory = GetCachedHandler<TCommand, TResult>(request.CommandType);var behaviorsFactory = GetCachedBehaviors<TCommand, TResult>(request.CommandType);// 创建处理器和行为的实例var handler = handlerFactory();var behaviors = behaviorsFactory();// 构建处理管道Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);// 按顺序应用管道行为foreach (var behavior in behaviors.Reverse()){var currentBehavior = behavior;var currentPipeline = pipeline;pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);}return await pipeline();}private async Task<object> ExecuteBehavior<TCommand, TResult>(ICommandPipelineBehavior<TCommand, TResult> behavior, TCommand command, Func<Task<TResult>> next) where TCommand : ICommand<TResult>{try{var result = await behavior.Handle(command, next, CancellationToken.None);return result!;}catch (Exception ex){throw new InvalidOperationException($"Error executing behavior {behavior.GetType().Name}: {ex.Message}", ex);}}private Func<ICommandHandler<TCommand, TResult>> GetCachedHandler<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>{return (Func<ICommandHandler<TCommand, TResult>>)_handlerCache.GetOrAdd(commandType, _ =>{return new Func<ICommandHandler<TCommand, TResult>>(() =>{using var scope = _provider.CreateScope();var handler = scope.ServiceProvider.GetService<ICommandHandler<TCommand, TResult>>();if (handler == null)throw new InvalidOperationException($"No handler registered for {commandType.Name}");return handler;});});}private Func<ICommandPipelineBehavior<TCommand, TResult>[]> GetCachedBehaviors<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>{return (Func<ICommandPipelineBehavior<TCommand, TResult>[]>)_behaviorsCache.GetOrAdd(commandType, _ =>{return new Func<ICommandPipelineBehavior<TCommand, TResult>[]>(() =>{using var scope = _provider.CreateScope();var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();return behaviors;});});}private async Task<TResult> ExecuteHandler<TCommand, TResult>(ICommandHandler<TCommand, TResult> handler, TCommand command) where TCommand : ICommand<TResult>{return await handler.HandleAsync(command, CancellationToken.None);}private async Task<object> ExecuteHandler(object handler, object command){var handlerType = handler.GetType();var handleMethod = handlerType.GetMethod("HandleAsync");if (handleMethod == null)throw new InvalidOperationException($"Handler {handlerType.Name} does not have HandleAsync method");var task = (Task)handleMethod.Invoke(handler, new object[] { command, CancellationToken.None })!;await task;var resultProperty = task.GetType().GetProperty("Result");return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");}private Func<object> GetCachedHandler(Type commandType){return _handlerCache.GetOrAdd(commandType, _ =>{// 获取命令类型实现的ICommand<TResult>接口var commandInterface = commandType.GetInterfaces().FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));if (commandInterface == null)throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");var resultType = commandInterface.GetGenericArguments()[0];var handlerType = typeof(ICommandHandler<,>).MakeGenericType(commandType, resultType);// 返回一个工厂函数,而不是直接返回处理器实例return new Func<object>(() =>{using var scope = _provider.CreateScope();var handler = scope.ServiceProvider.GetService(handlerType);if (handler == null)throw new InvalidOperationException($"No handler registered for {commandType.Name}");return handler;});});}private Func<object[]> GetCachedBehaviors(Type commandType){return _behaviorsCache.GetOrAdd(commandType, _ =>{// 获取命令类型实现的ICommand<TResult>接口var commandInterface = commandType.GetInterfaces().FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));if (commandInterface == null)throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");var resultType = commandInterface.GetGenericArguments()[0];var behaviorType = typeof(ICommandPipelineBehavior<,>).MakeGenericType(commandType, resultType);// 返回一个工厂函数,而不是直接返回行为实例return new Func<object[]>(() =>{using var scope = _provider.CreateScope();var behaviors = scope.ServiceProvider.GetServices(behaviorType).Where(b => b != null).ToArray();return behaviors!;});});}// 监控和统计方法public DataflowMetrics GetMetrics(){return new DataflowMetrics{ProcessedCommands = Interlocked.Read(ref _processedCommands),FailedCommands = Interlocked.Read(ref _failedCommands),TotalProcessingTime = TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime)),AverageProcessingTime = _processedCommands > 0 ? TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime) / _processedCommands): TimeSpan.Zero,AvailableConcurrency = _concurrencyLimiter.CurrentCount,MaxConcurrency = _maxConcurrency,InputQueueSize = _commandProcessor.InputCount};}public void ClearCache(){_handlerCache.Clear();_behaviorsCache.Clear();}public void Dispose(){_commandProcessor?.Complete();_concurrencyLimiter?.Dispose();}}// 辅助类internal class DataflowCommandRequest{public Guid Id { get; }public Type CommandType { get; }public Type ResultType { get; }public object Command { get; }public TaskCompletionSource<object> TaskCompletionSource { get; }public DataflowCommandRequest(Guid id, Type commandType, Type resultType, object command, TaskCompletionSource<object> tcs){Id = id;CommandType = commandType;ResultType = resultType;Command = command;TaskCompletionSource = tcs;}}}

 

其他普通或者批量操作就参考其他代码:
exercisebook/AOP/EventBusAOP/AopNew at main · liuzhixin405/exercisebook


一下是项目更详细介绍,如有错误多多指正:

# CommandBus AOP 项目

这是一个基于AOP(面向切面编程)的CommandBus项目,使用TPL Dataflow进行数据流处理优化,支持多种CommandBus实现和实时监控。

## 项目结构

```
AopNew/
├── Common.Bus/                    # 核心库
│   ├── Core/                      # 核心接口和抽象
│   │   ├── ICommand.cs           # 命令接口
│   │   ├── ICommandBus.cs        # 命令总线接口
│   │   ├── ICommandHandler.cs    # 命令处理器接口
│   │   ├── ICommandPipelineBehavior.cs # 管道行为接口
│   │   ├── ICommandProcessor.cs  # 命令处理器接口
│   │   ├── ICommandRequest.cs    # 命令请求接口
│   │   └── CommandBusType.cs     # CommandBus类型枚举
│   ├── Implementations/          # 具体实现
│   │   ├── CommandBus.cs         # 标准CommandBus
│   │   ├── DataflowCommandBus.cs # TPL Dataflow CommandBus
│   │   ├── BatchDataflowCommandBus.cs # 批处理Dataflow CommandBus
│   │   ├── TypedDataflowCommandBus.cs # 类型安全Dataflow CommandBus
│   │   ├── MonitoredCommandBus.cs # 带监控的CommandBus
│   │   └── CommandBusServiceLocator.cs # 服务定位器
│   ├── Monitoring/               # 监控相关
│   │   ├── IDataflowMetrics.cs   # 数据流指标接口
│   │   ├── IMetricsCollector.cs  # 指标收集器接口
│   │   ├── DataflowMetrics.cs    # 数据流指标实现
│   │   └── BatchDataflowMetrics.cs # 批处理指标实现
│   └── Extensions/               # 扩展方法
│       └── ServiceCollectionExtensions.cs # DI扩展方法
└── WebApp/                       # Web应用程序
    ├── Commands/                 # 命令定义
    │   ├── ProcessOrderCommand.cs
    │   ├── CreateUserCommand.cs
    │   └── SendEmailCommand.cs
    ├── Handlers/                 # 命令处理器
    │   ├── ProcessOrderHandler.cs
    │   ├── CreateUserHandler.cs
    │   └── SendEmailHandler.cs
    ├── Behaviors/                # 管道行为
    │   ├── LoggingBehavior.cs
    │   ├── ValidationBehavior.cs
    │   └── TransactionBehavior.cs
    ├── Controllers/              # API控制器
    │   ├── StandardCommandBusController.cs
    │   ├── DataflowCommandBusController.cs
    │   ├── BatchDataflowCommandBusController.cs
    │   ├── TypedDataflowCommandBusController.cs
    │   ├── MonitoredCommandBusController.cs
    │   └── MonitoringController.cs
    ├── Program.cs                # 应用程序入口
    ├── WebApp.csproj            # 项目文件
    └── WebApp.http              # HTTP测试文件
```

## CommandBus实现类型

### 1. Standard CommandBus
- **类型**: `CommandBusType.Standard`
- **特点**: 标准同步处理,适合简单场景
- **控制器**: `StandardCommandBusController`

### 2. Dataflow CommandBus
- **类型**: `CommandBusType.Dataflow`
- **特点**: 基于TPL Dataflow的异步并发处理,适合高并发场景
- **控制器**: `DataflowCommandBusController`

### 3. Batch Dataflow CommandBus
- **类型**: `CommandBusType.BatchDataflow`
- **特点**: 支持批量处理,适合大批量数据场景
- **控制器**: `BatchDataflowCommandBusController`

### 4. Typed Dataflow CommandBus
- **类型**: `CommandBusType.TypedDataflow`
- **特点**: 强类型安全,适合复杂业务场景
- **控制器**: `TypedDataflowCommandBusController`

### 5. Monitored CommandBus
- **类型**: `CommandBusType.Monitored`
- **特点**: 包含性能监控,适合生产环境
- **控制器**: `MonitoredCommandBusController`

## 使用方法

### 1. 依赖注入配置

在`Program.cs`中一次性注册所有CommandBus实现:

```csharp
// 一次性注册所有CommandBus实现
builder.Services.AddAllCommandBusImplementations();

// 注册命令处理器
builder.Services.AddScoped<ICommandHandler<ProcessOrderCommand, string>, ProcessOrderHandler>();
builder.Services.AddScoped<ICommandHandler<CreateUserCommand, int>, CreateUserHandler>();
builder.Services.AddScoped<ICommandHandler<SendEmailCommand, bool>, SendEmailHandler>();

// 注册管道行为
builder.Services.AddScoped(typeof(ICommandPipelineBehavior<,>), typeof(LoggingBehavior<,>));
builder.Services.AddScoped(typeof(ICommandPipelineBehavior<,>), typeof(ValidationBehavior<,>));
builder.Services.AddScoped(typeof(ICommandPipelineBehavior<,>), typeof(TransactionBehavior<,>));
```

### 2. 在控制器中使用

每个控制器直接注入对应的CommandBus实现:

```csharp
public class StandardCommandBusController : ControllerBase
{
    private readonly CommandBus _commandBus;

    public StandardCommandBusController(CommandBus commandBus)
    {
        _commandBus = commandBus;
    }

    [HttpPost("process-order")]
    public async Task<IActionResult> ProcessOrder([FromBody] ProcessOrderCommand command)
    {
        var result = await _commandBus.SendAsync<ProcessOrderCommand, string>(command);
       
        return Ok(new {
            Success = true,
            Result = result,
            BusType = "Standard"
        });
    }
}
```

### 3. 专用控制器端点

每个CommandBus实现都有专门的控制器端点:

```bash
# 标准CommandBus
POST /api/StandardCommandBus/process-order
POST /api/StandardCommandBus/create-user
POST /api/StandardCommandBus/send-email

# Dataflow CommandBus
POST /api/DataflowCommandBus/process-order
POST /api/DataflowCommandBus/create-user
POST /api/DataflowCommandBus/send-email

# 批处理Dataflow CommandBus
POST /api/BatchDataflowCommandBus/process-order
POST /api/BatchDataflowCommandBus/create-user
POST /api/BatchDataflowCommandBus/send-email

# 类型安全Dataflow CommandBus
POST /api/TypedDataflowCommandBus/process-order
POST /api/TypedDataflowCommandBus/create-user
POST /api/TypedDataflowCommandBus/send-email

# 带监控的CommandBus
POST /api/MonitoredCommandBus/process-order
POST /api/MonitoredCommandBus/create-user
POST /api/MonitoredCommandBus/send-email
```

## API端点

### CommandBus专用控制器

每个CommandBus实现都有专门的控制器:

- `StandardCommandBusController` - 标准CommandBus演示
- `DataflowCommandBusController` - TPL Dataflow CommandBus演示
- `BatchDataflowCommandBusController` - 批处理Dataflow CommandBus演示
- `TypedDataflowCommandBusController` - 类型安全Dataflow CommandBus演示
- `MonitoredCommandBusController` - 带监控的CommandBus演示

### 监控控制器

- `MonitoringController` - 实时监控和SSE数据流
  - `GET /api/Monitoring/dashboard` - 监控面板
  - `GET /api/Monitoring/stream` - SSE实时数据流
  - `GET /api/Monitoring/metrics` - 获取当前指标

## 示例请求

### 处理订单

```json
POST /api/TypedDataflowCommandBus/process-order
Content-Type: application/json

{
    "product": "笔记本电脑",
    "quantity": 2,
    "priority": 1
}
```

### 创建用户

```json
POST /api/TypedDataflowCommandBus/create-user
Content-Type: application/json

{
    "name": "张三",
    "email": "zhangsan@example.com",
    "age": 25
}
```

### 发送邮件

```json
POST /api/MonitoredCommandBus/send-email
Content-Type: application/json

{
    "to": "user@example.com",
    "subject": "测试邮件",
    "body": "这是一封测试邮件"
}
```

## 运行项目

1. 克隆项目到本地
2. 在项目根目录运行:
   ```bash
   dotnet build
   dotnet run --project WebApp
   ```
3. 访问 `https://localhost:5056` 查看Swagger文档
4. 访问 `https://localhost:5056/api/Monitoring/dashboard` 查看监控面板

## 技术特性

- **多种CommandBus实现**: 支持标准、Dataflow、批处理、类型安全、监控等多种实现
- **枚举驱动选择**: 通过枚举类型轻松切换不同的CommandBus实现
- **AOP支持**: 内置日志、验证、事务等管道行为
- **实时监控**: 支持SSE实时数据流和性能指标监控
- **类型安全**: 强类型命令和处理器,编译时类型检查
- **高并发**: 基于TPL Dataflow的异步并发处理
- **批量处理**: 支持批量命令处理,提高吞吐量
- **依赖注入**: 完整的DI支持,易于测试和扩展

## 扩展指南

### 添加新的CommandBus实现

1. 实现`ICommandBus`接口
2. 在`ServiceCollectionExtensions.AddAllCommandBusImplementations`中注册
3. 创建对应的控制器

### 添加新的管道行为

1. 实现`ICommandPipelineBehavior<TCommand, TResult>`接口
2. 在`Program.cs`中注册服务
3. 行为将自动应用到所有命令处理

### 添加新的命令和处理器

1. 在`Commands`目录中定义命令
2. 在`Handlers`目录中实现处理器
3. 在`Program.cs`中注册服务

 

 

http://www.wxhsa.cn/company.asp?id=3629

相关文章:

  • JavaWeb
  • 读书笔记:为什么数据在磁盘上的存放顺序如此重要?
  • Rcc_APBPeriphClockCmd()
  • 故障处理:ORA-19809: limit exceeded for recovery files
  • ORA-01555系列:二、ORA-01555的场景分析与解决方案
  • PySimpleGUI常用控件
  • 25.09.14 与其感慨路难行,不如马上出发
  • GCC工具链应用学习笔记
  • 初始化 MCP 环境 创建 MCP Server (一)
  • 博客园格式设置
  • [总结/备赛]备战 CSP-S 2025 初赛总结
  • win11 系统如何进行硬盘分区?固态硬盘怎么分区?SSD 固态硬盘是分区好还是不分区好?
  • 逆序数及其应用
  • 豆豆守护如何下载?
  • Java运行时jar时终端输出的中文日志是乱码
  • ZK2真空发生器日常清理
  • Nacos服务注册与发现
  • 马的遍历
  • 20231310王宏邦《密码系统设计》第1周
  • 新学期第一次随笔:慢慢学,总会有进步
  • 详细介绍:【C语言】第四课 指针与内存管理
  • 知识点错题整理
  • 202311_陇剑杯预赛_tcpdump
  • Linux学习记录(六):添加/删除用户
  • python 链式调用 合并 __setattr__ __getattribute__ in nested object()
  • 分享一个稳定好用的免费云服务——阿贝云体验
  • 年化439%,回撤7%,卡玛比率62.5,附本地运行的完整策略python代码 - 详解
  • 接口测试---PyMysql
  • My First Blog
  • 设置基础软件仓库时出错