上篇大致介绍过了领域驱动的主要概念,内容并不详尽,相关方面的知识大家可以参考园子里汤雪华和陈晴阳的博客,上篇有说过,领域驱动设计重点是建立正确的领域模型,这取决于对业务的理解和抽象能力,本篇将以一个简单的订单流程来实践领域驱动设计,希望能够给想实践DDD的人提供一种实现思路。
这是一个简化了的订单流程,实际情况还有很多细节要考虑。但这不妨碍本文的一个演示目的。
图中的发布事件即为发布消息至消息队列,为了达到EventSourcing的目的会在每次发布消息前将其持久化到数据库。
示例源码在本文最下面。
我们以领域驱动设计的经典分层架构来搭建我们的解决方案。如下图
Applicaiton:应用层,在这里我们用ServiceStack实现的Web服务来作为应用层(实际情况该层承担的应该是应用功能的划分和协调,但作为示例将其合并在同一层次)。
Domain:领域层,包含了业务所涉及的领域对象(实体、值对象),技术无关性。
Infrastructure:基础设施层,数据库持久化,消息队列实现,业务无关性。
SampleTests:在这里我们用单元测来作为表现层。
1:首先定义出领域模型
领域模型有一个聚合根的概念,定义模型之前我们先定义一个聚合根的接口。
class="c#:firstline[1]:showcolumns">namespace Infrastructure.Database { /// <summary> /// 聚合根 /// </summary> public interface IAggregateRoot { /// <summary> // 每个聚合根必须拥有一个全局的唯一标识,往往是GUID。 /// </summary> Guid Id { get; set; } } }我们为该聚合根定义一个抽象的实现类,通过使用[Key, DatabaseGenerated(DatabaseGeneratedOption.Identity)]可以使主键按排序规则生成。
namespace Infrastructure.Database { /// <summary> /// 实体基类 /// </summary> public abstract class EntityBase<TKey> { [Key, DatabaseGenerated(DatabaseGeneratedOption.Identity)] public TKey Id { get; set; } } /// <summary> /// 实体基类(GUID) /// </summary> public abstract class EntityBase :EntityBase<Guid>, IAggregateRoot { } }
2:CQRS接口及定义
首先是Command
namespace Infrastructure.Commands { public interface ICommand : IReturn<CommandResult> { Guid CommandId { get; } } } namespace Infrastructure.Commands { public interface ICommandHandler<in TCommand> : IHandler<TCommand> where TCommand : ICommand { } } namespace Infrastructure.Commands { public class CommandResult { public CommandResult() { } public CommandResult(bool result = true,string msg = "") { this.Result = result; this.Msg = msg; } public bool Result { get; set; } public string Msg { get; set; } } } namespace Infrastructure.Commands { public abstract class CommandHandlerBase { protected async Task DoHandle<TMessage>(Func<TMessage, Task> handlerAction, TMessage message) where TMessage : ICommand { try { await handlerAction.Invoke(message); } //catch MoreException catch (Exception e) { throw new Exception(e.Message); } } } }然后是Event
namespace Infrastructure.Events { public interface IEvent : IReturnVoid { Guid EventId { get; } } } namespace Infrastructure.Events { public interface IEventHandler<in TEvent> : IHandler<TEvent> where TEvent : IEvent { } } namespace Infrastructure.Events { public abstract class EventHandlerBase { public virtual async Task DoHandle<TMessage>(Func<TMessage, Task> handlerAction, TMessage message) where TMessage : IEvent { try { await handlerAction.Invoke(message); } //catch MoreException catch (Exception e) { throw new Exception(e.Message); } } } }最后是Bus
namespace Infrastructure.Bus { public interface IEventBus { void Publish<T>(T message) where T : IEvent; } } namespace Infrastructure.Bus { public interface ICommandBus { CommandResult Excute<T>(T command) where T : ICommand; Task<CommandResult> ExcuteAsync<T>(T command) where T : ICommand; } }
基础设施层到这里就算完成了,还有个仓储的实现上篇有说明,有点要说明的是本文的示例Domain层中并没有做到真正的纯净,譬如数据库持久化采用的EF实现,且把上下文放置在了Domain层,若作为开发框架是不建议这样做的,要达到完全解耦可以参考陈晴阳的开源项目Apworks。
首先定义出所需要的领域模型
namespace Domain.Entitys { /// <summary> /// 订单实体类 /// </summary> public class Order : EntityBase { public string OrderNo { get; set; } public decimal OrderAmount { get; set; } public DateTime OrderTime { get; set; } public string ProductNo { get; set; } public string UserIdentifier { get; set; } public bool IsPaid { get; set; } } /// <summary> /// 订单支付实体类 /// </summary> public partial class PayOrder : EntityBase { public decimal PayAmount { get; set; } public string PayResult { get; set; } public string OrderNo { get; set; } } }此处订单模型继承抽象类,如此可以保持模型的纯净,你甚至可以根据业务差异性定义多个基类,通常我们会将通用的一些属性及方法定义在基类中,如IsDeleted【逻辑删除】、CreateTime、Timestamp【并发控制】等。
为简化流程示例中仅包含两个操作,【生成订单】和【支付订单】,我们将其定义在领域服务内。
namespace Domain.DomainServices { public interface IOrderService { Task OrderBuild(Order order); Task Pay(Order order); } } namespace Domain.DomainServices { public class OrderService : IOrderService { public IRepository<Order> OrderRepository { private get; set; } public IRepository<PayOrder> PayOrderRepository { private get; set; } public IRepository<EventStore> EventStoreRepository { private get; set; } public IEventBus EventBus { private get; set; } public async Task OrderBuild(Order order) { //生成订单 await OrderRepository.AddAsync(order); //toEventStore await EventStoreRepository.AddAsync(order.ToBuildOrderReadyEvent().ToEventStore()); //发布生成订单事件 EventBus.Publish(order.ToBuildOrderReadyEvent()); } public async Task Pay(Order order) { var payOrder = new PayOrder { OrderNo = order.OrderNo, PayAmount = order.OrderAmount, PayResult = "pay success!" }; //支付成功 await PayOrderRepository.AddAsync(payOrder); //更新订单 var findOrder = await OrderRepository.GetByKeyAsync(order.Id); findOrder.IsPaid = true; await OrderRepository.UpdateAsync(findOrder); //toEventStore await EventStoreRepository.AddAsync(payOrder.ToPaySuccessReadyEvent().ToEventStore()); //发布支付成功事件 EventBus.Publish(payOrder.ToPaySuccessReadyEvent()); } } }要驱动整个流程的订单发起,我们需要定义一个Command【OrderBuild】,它通常是根据调用端数据DTO转化而来。
namespace Domain.Commands { [Route("/BuildOrder", "Post")] public class BuildOrder : Command { public string OrderNo { get; set; } public decimal OrderAmount { get; set; } public string ProductNo { get; set; } public string UserIdentifier { get; set; } public Order ToOrder() { return new Order { OrderNo = OrderNo, OrderAmount = OrderAmount, OrderTime = DateTime.Now, ProductNo = ProductNo, UserIdentifier = UserIdentifier, IsPaid = false }; } } }有了Command,接着定义出该命令的处理程序
namespace Domain.Commands.Handlers { public class OrderCommandHandler :CommandHandlerBase, ICommandHandler<BuildOrder> { public IOrderService OrderService { private get; set; } public async Task Handle(BuildOrder command) { await DoHandle(async c => { await OrderService.OrderBuild(command.ToOrder()); }, command); } } }由上面定义的领域服务中可见,OrderBuild和Pay中都发布有事件,事件及其处理程序如下
namespace Domain.Events { public class BuildOrderReady : Event { public Order Entity { get; set; } public EventStore ToEventStore() { return new EventStore { Timestamp = DateTime.Now, Body = JsonConvert.SerializeObject(Entity) }; } } } namespace Domain.Events { public class PaySuccessReady : Event { public PayOrder Entity { get; set; } public EventStore ToEventStore() { return new EventStore { Timestamp = DateTime.Now, Body = JsonConvert.SerializeObject(Entity) }; } } } namespace Domain.Events.Handlers { public class OrderEventHandler : EventHandlerBase, IEventHandler<BuildOrderReady>, IEventHandler<PaySuccessReady> { public IOrderService OrderService { private get; set; } public async Task Handle(BuildOrderReady @event) { await DoHandle(async c => { await OrderService.Pay(@event.Entity); }, @event); } public async Task Handle(PaySuccessReady @event) { //Send Email.. //Send SMS.. } } }可以看到在两个Event中都包含有ToEventStore方法,此处仅为模拟出将当前Event序列化保存,以供EventSourcing使用。这里有较成熟的框架可以使用,如NEventStore、http://geteventstore.com/。
开头有说过应用层采用ServiceStack实现的Web服务,优点有3
1:ServiceStack强调数据交换需定义出RequestDto及ResponseDto,这很符合我们CQRS的一个Command机制
2:示例中Event即消息,Publish的Event将在MQ中,通过订阅去消费,示例采用的消息队列是RabbitMq(跨平台),这样一来可以使用其他平台的语言去订阅该消息并消费,ServiceStack将Rabbitmq的部分功能集成在内。
3:其实是第二点的衍生,当事件经过MQ,有些消息我们可以消费即ACK掉,有些消息我们可以将其存储在队列中,如此一来我们可以基于订阅MQ来实现系统对业务的一个分析和数据处理,如下图
ServiceStack中服务的定义只需要继承ServiceStack.Service或者IService,如下
namespace Application.Services { public partial class CommandService : Service { public async Task<CommandResult> Any(BuildOrder command) { return await Handler(command); } } } namespace Application.Services { public partial class EventService : Service { public async Task Any(BuildOrderReady @event) { await Handler(@event); } public async Task Any(PaySuccessReady @event) { await Handler(@event); } } }关于方法名定义成Any是推荐的做法,若要控制其Post或Get等可以在其RequestDto上以 [Route("/BuildOrder", "Post")]标签的形式拓展。
因ServiceStack要求RequestDto定义的同时须要指定其ResponseDto,以继承IReturn<ResponseDto>接口来声明。
示例中我们的Command都是继承自IReturn<CommandResult>,Event都是继承自IReturnVoid,如下
namespace Infrastructure.Commands { public interface ICommand : IReturn<CommandResult> { Guid CommandId { get; } } } namespace Infrastructure.Events { public interface IEvent : IReturnVoid { Guid EventId { get; } } }在ServiceStack中需要定义一个继承自AppHostBase的服务宿主类(姑且这样叫吧),通常取名叫AppHost,如下
namespace Application.Services.Config { public class AppHost : AppHostBase { public AppHost() : base("CQRS Demo", typeof(AppHost).Assembly) { } public override void Configure(Funq.Container container) { //SwaggerUI配置用于调试 AddPlugin(new SwaggerFeature()); //IOC配置 ServiceLocatorConfig.Configura(container); //rabbitmq配置 var mq = new RabbitMqServer(ConfigurationManager.AppSettings.Get("EventProcessorAddress")) { AutoReconnect = true, DisablePriorityQueues = true, RetryCount = 0 }; container.Register<IMessageService>(c => mq); var mqServer = container.Resolve<IMessageService>(); //注册eventHandler mq.RegisterHandler<BuildOrderReady>(ServiceController.ExecuteMessage, 1); mq.RegisterHandler<PaySuccessReady>(ServiceController.ExecuteMessage, 1); mqServer.Start(); } } }构造函数中的两个参数分别代表,服务显示名称和指定当前服务定义所在的程序集。
SwaggerUI用于调试服务接口是非常方便的,内置的依赖注入框架Funq功能也不错。
另外就是rabbitmq的使用需要在Nuget中另外安装,全称是:ServiceStack.RabbitMq。值得一提的是服务启动时,ServiceStack会在你指定的Rabbitmq服务端创建对应的队列,通常是根据你定义的Event创建如下。
可以看到每个Event创建了4个队列,分别代表的意思是:
dlq:没有对应的处理程序或处理失败的消息。
inq:还未被消费的消息。
outq:处理完毕的消息。
priorityq:优先队列。
更详细的可以到ServiceStack Wiki上查看。
优点:在注册EventHandler时可以指定处理线程个数,如上面指定的是1,此时若同样的服务有两个,分别部署在不同服务器上且都订阅相同消息时,将根据线程数来消费MQ中的消息来达到负载均衡的目的。
//注册eventHandler mq.RegisterHandler<BuildOrderReady>(ServiceController.ExecuteMessage, 1); mq.RegisterHandler<PaySuccessReady>(ServiceController.ExecuteMessage, 1);但其实针对Rabbitmq封装一个Client并不麻烦,我们可以按项目需要去实现其Exchanges和Queues,并且可以很灵活的控制Ack等。
在单元测试中我们按如下方式调试。
也可以使用SwaggerUI调试,服务运行之后将打开如下页面
点击SwaggerUI打开调试页面
点击Try it out!按钮
此时数据库中应包含一条Order记录一条PayOrder记录和两条EventStore记录
RabbitMq中
源码地址:https://github.com/yanghongjie/DomainDrivenDesignSample