CQRS之旅——旅程3(订单和注册限界上下文)

2022-10-20,,,,

旅程3:订单和注册限界上下文

cqrs之旅的第一站

“寓言家和鳄鱼是一样的,只是名字不同” --约翰·劳森

描述:

订单和注册上下文有一部分职责在会议预订的过程中,在此上下文中,一个人(注册者)可以购买特定会议的座位。还可以为已购买的座位分配与会者的名称(这在第5章“准备发布v1版本”中进行了描述)。

这是我们cqrs旅程的第一站,因此团队决定实现一个核心的、但自包含的系统部分——订单和注册。对与会者来说,注册过程必须尽可能地轻松。该流程必须确保业务客户能够预订到尽可能多的座位,并为他们提供灵活的,在会议上为不同类型的座位设置价格的功能。

因为这是团队处理的第一个限界上下文,所以我们还实现了系统的一些基础设施来支持领域域的功能。包括命令和事件消息总线以及聚合的持久化机制。

备注:本章描述的contoso会议管理系统并不是该系统的最终版本。本此旅程描述的是一个过程,因此一些设计决策和实现细节在过程的后期会发生变化。这些变化将在后面的章节中描述。

在将来的某个旅程中,对这个限界上下文的改进计划包括支持等待列表(如果没有足够的座位可用,对座位的请求将放在等待列表中),以及允许业务客户为座位类型设置各种类型的折扣。

备注:在这个版本中没有实现等待列表,但是社区成员正在开发这个特性和其他特性。任何带外发布和更新都将在“cqrs之旅”网站上公布。

本章的工作术语定义:

本章使用了一些术语,我们将在后面定义它们。有关更多细节和可能的替代定义,请参阅参考指南中的“深入cqrs和es”。

  • command(命令):命令是要求系统执行更改系统状态的操作。命令是必须服从(执行)的一种指令,例如:makeseatreservation。在这个限界上下文中,命令要么来自用户发起请求时的ui,要么来自流程管理器(当流程管理器指示聚合执行某个操作时)。单个接收方处理一个命令。命令总线(command bus)传输命令,然后命令处理程序将这些命令发送到聚合。发送命令是一个没有返回值的异步操作。

    gary(cqrs专家)发言:

    有一些讨论是关于优化的可能性,这涉及到命令不同的定义,这些不同点是微小的。请参阅第6章“”。

  • event(事件):事件就是系统中发生的一些事情,通常是一个命令的结果。领域模型中的聚合会引发(raise)事件。多个事件订阅者(subscribers)可以处理特定的事件。聚合将事件发布到事件总线, 处理程序订阅特定类型的事件,事件总线(event bus)将事件传递给订阅者。在这个限界上下文中,唯一的订阅者是流程管理器。

  • 流程管理器。在这个限界上下文中,流程管理器是一个协调领域域中聚合行为的类。流程管理器订阅聚合引发的事件,然后遵循一组简单的规则来确定发送一个或一组命令。流程管理器不包含任何业务逻辑,它唯一的逻辑是确定下一个发送的命令。流程管理器被实现为一个状态机,因此当它响应一个事件时,除了发送一个新命令外,还可以更改其内部状态。

    gregor hohpe和bobby woolf合著的《enterprise integration patterns: designing, building, and deploying messaging solutions》(addison-wesley professional, 2003)书中312页讲述了流程管理器实现模式。我们的流程管理器就是依照这个模式实现的。

    markus(软件开发人员)发言:

    对于刚接触代码的人来说,跟踪命令和事件在系统中的流动是很困难的。第4章“”中的“对测试的影响”一节,讨论了怎样可以帮助你搞清楚它们。

    在这个限界上下文中,流程管理器可以接收命令,也可以订阅事件。

    gary(cqrs专家)发言:

    在订单和注册限界上下文中,起初,提到流程管理器,团队把它当作一个saga(一种可用于处理事务的模式),要了解后来为什么我们决定更改术语,请参阅本章后面的“模式和概念”一节。

    注:参考指南里包含了cqrs相关术语的附加定义和解释。

领域定义(通用语言)

下面的列表定义了团队在开发此订单和注册有界上下文时使用的关键领域相关术语。

  • 与会者:与会者是有权参加会议的人。与会者可以与系统交互,比如管理议程、打印徽标以及会后提供反馈。与会者也可以是不花钱参加会议的人,比如志愿者、演讲者或享受100%折扣的人。一位与会者可以有多种相关的与会者类型(如演讲者、学生、志愿者、track chair等等)。

  • 注册者:注册者是与系统交互下订单并为这些订单付款的人。注册者还创建与订单关联的注册。注册者也可以是与会者。

  • 用户:用户是与会议相关的参会者、注册者、演讲者或志愿者。每个用户都有一个惟一的记录定位器代码,用户可以使用该代码访问系统中特定于用户的信息。例如,注册者可以使用记录定位器代码访问她的订单,与会者可以使用记录定位器代码访问他的个性化会议议程。

    carlos(领域专家)发言:

    我们刻意实现了一个记录定位器机制,以通过该机制返回以前提交的订单数据。这消除了用户必须注册,登录系统才能获得访问权限的烦人要求。我们的客户坚定要求这样。

  • 座位分配:座位分配将与会者和按确定顺序排列的座位相关联。一个订单可能有一个或多个与其相关的座位分配。

  • 订单:当一位注册者与系统交互时,系统创建一个订单来管理预订、付款和注册。当注册者已成功支付订单项下的款项时,订单将被确认。订单包含一个或多个订单项。

  • 订单项:订单项包含座位类型和数量,并与订单关联。订单项有三种状态:创建、保留或拒绝。订单项最初处于创建状态。如果系统保留了登记人要求的座位类型的座位数量,则订单项处于保留状态。如果系统无法保留登记人要求的座位类型的座位数量,则订单项处于拒绝状态。

  • 座位:一个座位代表着被允许参加一个会议或进入一个特定会议的权利,如鸡尾酒会、教学或研讨会。业务客户可以更改每次会议的座位配额。业务客户还可以更改每个小型会议的座位配额。

  • 预订:预订是一个或多个座位的临时预订。订单程序将创建预订。当注册者开始订购时,系统会根据注册者要求的座位数量进行预订。因此,其他注册者无法预订这些座位。预订将保留n分钟,在此期间,注册者可以通过支付这些座位的费用来完成订购过程。如果注册者在n分钟内没有付款,系统将取消预订,其他登记人可以预订座位。

  • 座位可用性:每个会议都将追踪每种类型座位的可用性。最初,所有的座位都可以预订和购买。当一个座位被预订时,该类型的可用座位数量将减少。如果系统取消预订,则增加该类型的可用座位数量。业务客户可以定义每种可用座位类型的初始数量,这是会议的一个特点。会议所有者可以根据不同的座位类型调整数量。

  • 会议网站:您可以使用唯一的url访问系统中定义的每个会议。注册者可以从这个网站开始订购过程。

这里定义的每个术语都是通过开发团队和领域专家之间的积极讨论制定的。下面是开发人员和领域专家之间的一个示例对话,演示了团队如何定义术语。

开发人员1:这里是对与会者定义的初步尝试。与会者是指花钱参加会议的人。与会者可以与系统交互,比如管理议程、打印徽标以及会后提供反馈。”

领域专家1:并不是所有与会者都会付费参加会议。例如,一些会议会有志愿者,而且演讲者通常不付钱。而且,在某些情况下,出席者可以获得100%的折扣。

领域专家1:别忘了付费的不是与会者。这是由注册者完成的。

开发者1:所以我们需要说与会者是被授权参加会议的人?

开发人员2:我们需要注意这里的用词。授权这个术语会让一些人想到安全性、身份验证和授权。

开发人员1:如何命名?

领域专家1:当系统执行诸如打印徽标之类的任务时,它需要知道徽标是用于哪种类型的与会者。例如,演讲者、志愿者、付费参与者等等。

开发人员1:现在我们有了这个定义,它捕获了我们讨论过的所有内容。与会者是有权参加会议的人。与会者可以与系统交互,比如管理议程、打印徽标以及会后提供反馈。与会者也可以是不花钱参加会议的人,比如志愿者、演讲者或享受100%折扣的人。与会者可以有多种相关的与会者类型(演讲者、学生、志愿者、track chair等等)。

创建订单的需求

一位注册者是指在会议上预订座位并支付(订座)费用的人。订购过程分为两个阶段:首先,注册者预订一些座位,然后支付座位的费用来确认预订。如果注册者没有完成付款,预定的座位将在一段固定时间后过期,系统将为其他注册者预留座位。

下图展示了了团队用于探索座位预定的一些早期ui原型图。

订单功能的用户界面原型图

这些ui原型图在几个方面帮助了团队,允许他们:

  • 将核心团队对系统的愿景传达给第三方公司独立团队中的ui设计师。
  • 向开发人员传达领域专家的知识。
  • 使用通用语言提炼,细化术语的定义。
  • 探索“如果发生xxx又怎样xxx”的场景,研究替代方案。
  • 构建基础的系统验收测试套件。

架构

此应用程序设计为部署到microsoft azure。到旅程的这个阶段,应用程序将包含一个asp.net mvc web应用程序和消息处理程序以及领域模型对象。应用程序使用azure sql database实例进行数据存储,读和写两者都包括。应用程序使用azure service bus来进行消息传递。

译者注:鉴于azure国内版瘸腿,国际版速度奇慢。而且都价格喜人。后续的实战中,架构会根据当前的实际情况进行调整。主要是学习原文的思想。

在研究和测试解决方案时,可以在本地运行它,可以使用azure compute emulator,也可以直接运行mvc web应用程序,并运行承载消息处理程序和领域域对象的控制台应用程序。在本地运行应用程序时,可以使用本地sql server express数据库,并使用一个在sql server express数据库实现的简单的消息传递基础设施。

有关运行应用程序的选项的更多信息,请参见附录1“”。

gary(cqrs专家)发言:

cqrs模式的一个经常被援引的优势是,它使您能够独立的伸缩应用程序的读端和写端,以支持不同的使用模式。然而,在这个限界上下文中,来自ui的读操作的数量不太可能超过写操作的数量:这个限界上下文中关注的是创建订单的注册者。因此,读端和写端将部署到同一个azure工作者角色,而不是部署到两个可以独立伸缩的独立工作者角色。

模式和概念

为了保持简单,团队决定在不使用事件源(event sourcing)的情况下先实现第一个限界上下文。当然,他们也确定,如果将来确定事件源能为这个限界上下文带来特定的好处,那么他们将重新考虑这个决定。

备注:有关event sourcing如何与cqrs模式关联的描述,请参阅参考指南中的“”。

小组进行的一项重要讨论是选择它们将实现的聚合和实体。以下来自团队白板的图片说明了他们最初的一些想法,以及他们通过一个替代方法(一个真实的会议座位预定场景)来尝试理解这里有什么优缺点。

“我认为开发人员需要收获一个观念,那就是把对象的属性存储在关系型数据库中是不重要的。教会他们避免将领域模型作为关系存储,我认为这样将会更容易介绍和理解领域驱动设计(ddd)和cqrs” --josh elster, cqrs advisors mail list

gary(cqrs专家)发言:

这些图刻意排除了系统如何通过命令和事件处理程序处理命令和事件的细节。这些图主要关注领域中的聚合之间的逻辑关系。

此场景考虑当注册者试图在会议上预订多个座位时会发生什么。系统必须:

  1. 检查是否有足够的座位。
  2. 记录注册详情。
  3. 更新会议预订的座位总数。
我们刻意保持场景简单,以避免在团队检查其他方案时分心。这些示例没有描述这个限界上下文的最终实现。

团队考虑的第一种方法(如下图所示)是使用两个分开的聚合。

方法1:两个分开的聚合

图中的数字对应于以下步骤:

  1. 从ui发送一个命令用来注册参会者x和y到157号会议,这个命令被路由到一个新的订单(order)聚合。
  2. 订单聚合引发(raise)一个事件,该事件报告已经创建了一个订单。这个事件被路由到可用座位(seatsavailability)聚合。
  3. id为157的可用座位(seatsavailability)聚合是从数据库中取回还原(re-hydrated)的。
  4. 可用座位(seatsavailability)聚合更新它自己的预定座位总数。
  5. 更新后的可用座位(seatsavailability)聚合被持久化到数据库中。
  6. id为4239的新的订单聚合被持久化到数据库中。

    markus(软件开发人员)发言:

    术语re-hydrated是指从数据库中反序列化聚合实例的过程。

    jana(软件架构师)发言:

    你可以考虑使用memento模式来处理持久化和rehydration。

团队考虑的第二种方法(如下图所示)是使用单个聚合来代替两个聚合。

方法2:单个聚合

图中的数字对应于以下步骤:

  1. 从ui发送一个命令用来注册参会者x和y到157号会议,这个命令被路由到会议(conference)聚合,聚合id为157。
  2. id为157的会议(conference)聚合从数据库中取回还原(rehydrated)。
  3. 订单(order)实体将校验本次预订(它将查询可用座位(seatsavailability)实体以查看是否还有足够的座位),然后调用方法更新在会议(conference)实体上预订的座位数量。
  4. 可用座位(seatsavailability)实体更新自己已预订的座位总数。
  5. 更新后的会议(conference)聚合的被持久化到数据库中。

团队考虑的第三种方法(如下图所示)是使用流程管理器来协调两个聚合之间的交互。

方法3:使用一个流程管理器

图中的数字对应于以下步骤:

  1. 从ui发送一个命令用来注册参会者x和y到157号会议,这个命令被路由到订单(order)聚合。
  2. 这个新的订单(order)聚合,id为4239,被持久化到数据库中
  3. 订单(order)聚合引发(raise)一个事件,这个事件将被registrationprocessmanager类处理
  4. registrationprocessmanager类将发送一个命令到id为157的可用座位(seatsavailability)聚合
  5. 这个可用座位(seatsavailability)聚合从数据库中取回还原(rehydrated)
  6. 可用座位(seatsavailability)聚合更新自己的预定座位总数,然后持久化回数据库

    gary(cqrs专家)发言:

    流程管理器或saga,起初,团队将registrationprocessmanager类看做一个saga,但是,当他们重新阅读hector garcia-molina和kenneth salem合著的《saga》一文中对“saga”的最初定义后,他们修改了自己的决定。主要原因是预定流程并不包含明确的补偿步骤,所以并不需要一个长生命周期的事务。

有关流程管理器和saga的更多信息,请参见参考指南中的第6章“a saga on sagas”

团队还明确了下列问题:

  • 在哪里验证是否有足够的座位可供注册?在订单(order)聚合里还是可用座位(seatsavailability)聚合里?
  • 事务边界在哪里?
  • 当多个注册者试图同时下订单时,该模型如何处理并发问题?
  • 聚合根是什么?

验证

在登记人可以预订座位之前,系统必须检查是否有足够的座位。虽然ui中的逻辑可以在发送命令之前验证是否有足够的可用座位,但是领域中的业务逻辑也必须执行检查。这是因为在ui执行验证之后到系统将命令发送到领域中的聚合时,状态可能会发生变化。

jana(软件架构师)发言:

当我们在这里谈ui验证时,我们指的是模型-视图-控制器(mvc)执行的验证,而不是浏览器前端。

在第一个模型中,验证要么在订单(order)聚合里,要么在可用座位(seatsavailability)聚合里。如果是前者,则订单(order)聚合必须在预订之前和引发事件之前从可用座位(seatsavailability)聚合中检查当前的座位可用性。如果是后者,那么可用座位(seatsavailability)聚合必须以某种方式通知订单(order)聚合它不能预订座位,并且订单(order)聚合必须撤消(或弥补)它迄今为止完成的任何工作。

beth(业务经理)发言:

撤销只是现实生活中发生的许多弥补操作之一,弥补操作不仅仅局限于系统内,甚至可以是系统外的人工操作,例如:一个contoso的职员或客户经理打电话给注册者们,告诉他们系统发生了一个错误,请他们忽略contoso发来的最终确认邮件。

第二个模型的行为类似,除了订单(order)聚合和可用座位(seatsavailability)聚合是在会议(conference)聚合里协作的。

在第三个模型中,使用了流程管理器,聚合通过流程管理器互相传递关于注册者是否可以在当前时间进行预订的消息。

所有这三个模型都需要实体就验证过程进行通信,但是与流程管理器进行通信的第三个模型看起来比其他两个模型更复杂一些。

事务边界

在ddd中,聚合表示一致性边界。因此,具有两个聚合的第一个模型,级别具有两个聚合和一个流程管理器的第三个模型将涉及两个事务:一个在系统持久化新的订单(order)聚合时,另一个在系统持久化更新的可用座位(seatsavailability)聚合时。

备注:术语“一致性边界”指的是你可以假设所有元素始终保持一致的边界。

为了确保注册者创建订单时系统的一致性,两个事务都必须成功。为了保证这一点,我们必须采取步骤,通过确保基础设施可靠地向聚合传递消息,从而确保系统最终是一致的。

在第二个模型中,使用单一聚合,当注册者下订单时,我们只有一个事务。这似乎是三种模型里最简单的一种。

并发

注册过程发生在多用户环境中,许多注册者可以尝试同时购买座位。团队决定使用来解决注册过程中的并发问题。在这种情况下,这意味着为注册者最初保留了座位(然后其他注册者无法使用这些座位)。如注册者在超时时间内完成付款,系统保留预订,否则,系统将取消预订。

此预订系统引入了对附加消息类型的需求,例如,报告注册者已付款的事件,或报告超时发生的事件。

这个超时还要求系统在某个地方添加一个计时器来跟踪预订何时过期。

对这种使用消息序列和需要计时器的复杂模型,最好的办法就是使用流程管理器。

聚合和聚合根

在订单(order)聚合和可用座位(seatsavailability)聚合这种两个聚合里,团队很容易识别出组成聚合的实体和聚合根。在单一聚合的模型中,选择不是很明确:通过seatsavailability实体访问order,或者通过order实体访问seatsavailability,这似乎都不太自然。创建作为聚合根的新实体似乎没有必要。

团队决定采用包含流程管理器的模型,因为它提供了在这个限界上下文中处理并发需求的最佳方法。

实现细节

本节介绍订单和注册限界上下文中实现的一些重要特性。您或许需要获取一份代码的拷贝,这样就可以跟随我们的脚步。您可以从download center下载它,或者在github:上得到它

不要期望代码示例与参考实现中的代码完全匹配。本章描述了cqrs过程中的一个步骤,随着我们了解更多并重构代码,实现可能会发生变化。

高层架构

正如我们在上一节中描述的,团队最初决定使用cqrs模式在会议管理系统中实现预订,但不使用事件源(event sourcing)。下图显示了实现的关键元素:mvc web应用程序、使用azure sql数据库实例实现的数据存储、读写模型和一些基础设施组件。

备注:我们将在本节稍后的部分描述读写模型中发生的事情。

注册限界上下文的高层架构

下面的部分与上图中的数字相关,并提供了关于体系结构中各个元素的更多细节。

  1. 使用读模型(read model)查询数据

    conferencecontroller类包含一个名为display的action,该action创建一个包含特定会议信息的视图(view)。这个控制器类使用以下代码从读模型里查询:

    public actionresult display(string conferencecode)
    {
        var conference = this.getconference(conferencecode);
    
        return view(conference);
    }
    
    private conference.web.public.models.conference getconference(string conferencecode)
    {
        var repo = this.repositoryfactory();
        using (repo as idisposable)
        {
            var conference = repo.query<conference>().first(c => c.code == conferencecode);
    
            var conferencemodel =
                new conference.web.public.models.conference { code = conference.code, name = conference.name, description = conference.description };
    
            return conferencemodel;
        }
    }

读模型(read model)从数据存储中检索信息,并使用数据传输对象(dto)将信息返回给控制器。

  1. 发出命令

    web应用通过命令总线(command bus)向写模型(write model)发送命令。命令总线是系统中的可靠消息传递基础设施组件。在这个场景中,它异步将命令发送给接受者,并且只发送一次。

    registrationcontroller类可以向写模型(write model)发送registertoconference命令,此命令发送一个请求,请求在会议上注册一个或多个席位,然后,registrationcontroller类轮询读模型(read model),以发现注册请求是否成功。参见第6节:“轮询读模型(read model)”以获得更多细节。

    下面的代码示例展示了registrationcontroller如何发送registertoconference命令:

    var viewmodel = this.updateviewmodel(conferencecode, contentmodel);
    
    var command =
        new registertoconference
        {
            orderid = viewmodel.id,
            conferenceid = viewmodel.conferenceid,
            seats = viewmodel.items.select(x => new registertoconference.seat { seattypeid = x.seattypeid, quantity = x.quantity }).tolist()
        };
    
    this.commandbus.send(command);

    备注:所有的命令都是异步发送的,不需要等待返回。

  2. 处理命令

    命令处理程序在命令总线上注册,然后,命令总线可以将命令转发给正确的处理程序。

    ordercommandhandler类处理从ui发送的registertoconference命令。通常,处理程序负责调用领域里的某些业务逻辑,并将某些状态更新持久化到数据存储中。

    下面的代码示例展示了ordercommandhandler类如何处理registertoconference命令:

    public void handle(registertoconference command)
    {
        var repository = this.repositoryfactory();
    
        using (repository as idisposable)
        {
            var seats = command.seats.select(t => new orderitem(t.seattypeid, t.quantity)).tolist();
    
            var order = new order(command.orderid, guid.newguid(), command.conferenceid, seats);
    
            repository.save(order);
        }
    }
  3. 在领域中初始化业务逻辑

    在前面的代码示例中,ordercommandhandler类创建了一个新的order实例。order对象是一个聚合根,它的构造函数包含初始化领域逻辑的代码。有关此聚合根执行哪些操作的详细信息,请参阅下面的“在写模型内部”一节。

  4. 把改动持久化

    在前面的代码示例中,命令处理程序通过调用repository类中的save方法来持久化一个新的订单(order)聚合。这个save方法还将在命令总线(command bus)上发布订单(order)聚合引发的各种事件。

  5. 轮询读模型(read model)

    要向用户提供反馈,ui端必须能够检查registertoconference命令是否成功。与系统中的所有命令一样,此命令异步执行,不返回结果。ui端通过轮询读模型(read model)来检查命令是否成功。

    下面的代码示例展示了一个初始实现,其中registrationcontroller类里的waituntilupdated方法轮询读模型,直到它发现订单已经被持久化成功或超时。

    [httppost]
    public actionresult startregistration(string conferencecode, orderviewmodel contentmodel)
    {
        ...
    
        this.commandbus.send(command);
    
        var draftorder = this.waituntilupdated(viewmodel.id);
    
        if (draftorder != null)
        {
            if (draftorder.state == "booked")
            {
                return redirecttoaction("specifypaymentdetails", new { conferencecode = conferencecode, orderid = viewmodel.id });
            }
            else if (draftorder.state == "rejected")
            {
                return view("reservationrejected", viewmodel);
            }
        }
    
        return view("reservationunknown", viewmodel);
    }

    后来,团队用post-redirect-get模式的实现替换了这种检查系统是否保存订单的机制。下面的代码示例展示了startregistration方法的新版本。

    备注:更多关于post-redirect-get模式的信息,请在wikipedia查看post/redirect/get

    [httppost]
    public actionresult startregistration(string conferencecode, orderviewmodel contentmodel)
    {
        ...
    
        this.commandbus.send(command);
    
        return redirecttoaction("specifyregistrantdetails", new { conferencecode = conferencecode, orderid = command.id });
    }

    新的startregistration action方法现在发送命令后立即重定向到specifyregistrantdetails action。下面的代码示例显示了specifyregistrantdetails action如何在返回视图之前轮询数据库中的订单。

    [httpget]
    public actionresult specifyregistrantdetails(string conferencecode, guid orderid)
    {
        var draftorder = this.waituntilupdated(orderid);
    
        ...
    }

    新方法的优点:使用post-redirect-get模式而不是startregistration post action,能让浏览器的“前进”和“后退”导航按钮工作的更好,并在控制器开始轮询之前给命令处理程序更多时间来处理命令。

    译者注:此文章编写时间较早。文中的ui端指的是asp.net mvc web应用程序。现在流行的方式是前后端分离。后端入口服务一般是一个web api程序。而且基于轮询的方案也不太理想。可以在web api端通过消息总线订阅数据持久化的各类事件,当在数据存储层引发这些事件时。web api中的事件处理程序可以接收到视图模型改变的数据。再将数据通过signalr或web socket方式推送至前端。

在写模型内部

聚合

下面是订单(order)聚合的代码示例:

public class order : iaggregateroot, ieventpublisher
{
    public static class states
    {
        public const int created = 0;
        public const int booked = 1;
        public const int rejected = 2;
        public const int confirmed = 3;
    }

    private list<ievent> events = new list<ievent>();

    ...

    public guid id { get; private set; }

    public guid userid { get; private set; }

    public guid conferenceid { get; private set; }

    public virtual observablecollection<ticketorderline> lines { get; private set; }

    public int state { get; private set; }

    public ienumerable<ievent> events
    {
        get { return this.events; }
    }

    public void markasbooked()
    {
        if (this.state != states.created)
            throw new invalidoperationexception();

        this.state = states.booked;
    }

    public void reject()
    {
        if (this.state != states.created)
            throw new invalidoperationexception();

        this.state = states.rejected;
    }
}

注意类的属性没有全被标记为virtual。在这个类的原始版本中,属性id、userid、conferenceid和state都被标记为virtual。下面是两个开发人员之间的讨论:

  • 开发人员1:我确信你不应该使属性都成为虚拟的,除非对象关系映射(orm)层需要。如果只是出于测试目的,实体和聚合根永远不能用mock测试。如果你需要mock来测试实体和聚合根,那么很明显,设计中有问题。

  • 开发人员2:在默认情况下,我更喜欢开放和可扩展性。你永远不知道将来会出现什么需求,把属性标记为virtual并不费什么事。这当然是有争议的,在.net中有点不标准。这样吧,我们可能只需要给延迟加载的集合属性标记为virtual。

  • 开发人员1:使用cqrs模式通常会使延迟加载的效果消失,所以你也不应该需要它。这样会让代码更简单。

  • 开发人员2:cqrs并没有说要使用事件源(event sourcing),但如果使用包含对象的聚合根,无论如何都需要它,对吗?

  • 开发人员1:这不是关于event sourcing的,而是关于ddd的。当聚合边界正确时,你就不需要延迟加载。

  • 开发人员2:需要明确的是,聚合边界在这里是为了将应该一起更改的内容分组,以保持一致性。延迟加载就意味着已经分组在一起的东西其实并不需要分组。

  • 开发人员1:我同意。我发现在命令端延迟加载意味着建模错误。如果我不需要命令端的值,那么它就不应该在那里。此外,我不喜欢virtual,除非它们有特定的用途(或者对象关系映射(orm)工具的需求)。在我看来,这违反了开闭原则:你以各种可能有意也可能无意的方式敞开了自己接受修改的大门,而且即使发生了什么影响,也可能无法立即发现。

    译者注:orm要求属性必须为虚,java里著名的hibernate就是这么搞得,所以nhibernate也是这样的。

  • 开发人员2:模型中的订单聚合有一个订单项列表。确定我们不需要加载就能把它标记为已订好的吗?我们建立的模型有问题吗?

  • 开发人员1:orderitems列表很长吗?如果是,那么建模可能是错误的,因为你并不一定需要那个级别的事务。通常,较晚的来回获取和更新orderitems的成本可能比预先加载它们要高,你应该评估列表的通常大小,并进行一些性能度量。首先让它变得简单,其次如果需要的话进行优化。

    -感谢jeremie chassaing和craig wilson

聚合和流程管理器

下图展示了写模型(write model)中存在的对象。有两个聚合,order和seatsavailability,每个都包含多个实体类型。此外,还有一个registrationprocessmanager类来管理聚合之间的交互。

下图中的表展示了流程管理器在给定当前状态和特定类型消息时的行为。

写模型中的领域对象

注册会议的过程从ui发送registertoconference命令开始。基础设施将此命令传递给订单(order)聚合。这个命令的结果是:系统创建了一个新的订单(order)聚合实例,并且这个新实例引发了一个orderordered事件。订单(order)聚合类中的构造函数中的以下代码示例展示了这种情况。请注意系统如何使用guid来标识不同的实体。

public order(guid id, guid userid, guid conferenceid, ienumerable<orderitem> lines)
{
    this.id = id;
    this.userid = userid;
    this.conferenceid = conferenceid;
    this.lines = new observablecollection<orderitem>(items);

    this.events.add(
        new orderplaced
        {
            orderid = this.id,
            conferenceid = this.conferenceid,
            userid = this.userid,
            seats = this.lines.select(x => new orderplaced.seat { seattypeid = x.seattypeid, quantity = x.quantity }).toarray()
        });
}

备注:要查看基础设施组件如何传递命令和事件,在后面的图里有。

系统创建一个新的registrationprocessmanager实例来管理新订单。下面来自registrationprocessmanager类的代码示例展示了流程管理器如何处理事件。

public void handle(orderplaced message)
{
    if (this.state == processstate.notstarted)
    {
        this.orderid = message.orderid;
        this.reservationid = guid.newguid();
        this.state = processstate.awaitingreservationconfirmation;

        this.addcommand(
            new makeseatreservation
            {
                conferenceid = message.conferenceid,
                reservationid = this.reservationid,
                numberofseats = message.items.sum(x => x.quantity)
            });
    }
    else
    {
        throw new invalidoperationexception();
    }
}

代码示例显示流程管理器如何更改其状态,并发送一个由seatsavailability聚合处理的新的makeseatreservation命令。代码示例还演示了如何将流程管理器实现为接收消息、更改其状态并发送新消息的状态机。

markus(软件开发人员)发言:

注意我们生成一个新的全局惟一标识符(guid)来标识新的预订。我们使用这些guid将消息关联到正确的流程管理器实例和聚合实例。

当seatsavailability聚合接收到makereservation命令时,如果有足够的可用座位,它将进行预订。下面的代码示例显示了seatsavailability类如何根据是否有足够的座位引发不同的事件。

public void makereservation(guid reservationid, int numberofseats)
{
    if (numberofseats > this.remainingseats)
    {
        this.events.add(new reservationrejected { reservationid = reservationid, conferenceid = this.id });
    }
    else
    {
        this.pendingreservations.add(new reservation(reservationid, numberofseats));
        this.remainingseats -= numberofseats;
        this.events.add(new reservationaccepted { reservationid = reservationid, conferenceid = this.id });
    }
}

流程管理器registrationprocessmanager类处理预订的接收和拒绝事件。这是一个临时的座位预订,让用户有机会进行支付。流程管理器在购买完成或预订超时过期时释放预订。下面的代码示例显示流程管理器如何处理这两种事件。

public void handle(reservationaccepted message)
{
    if (this.state == processstate.awaitingreservationconfirmation)
    {
        this.state = processstate.awaitingpayment;

        this.addcommand(new markorderasbooked { orderid = this.orderid });
        this.commands.add(
            new envelope<icommand>(new expireorder { orderid = this.orderid, conferenceid = message.conferenceid })
            {
                delay = timespan.fromminutes(15),
            });
    }
    else
    {
        throw new invalidoperationexception();
    }
}

public void handle(reservationrejected message)
{
    if (this.state == processstate.awaitingreservationconfirmation)
    {
        this.state = processstate.completed;
        this.addcommand(new rejectorder { orderid = this.orderid });
    }
    else
    {
        throw new invalidoperationexception();
    }
}

如果预订被接受,流程管理器将通过向自身发送expireorder命令启动计时器,并向订单(order)聚合发送markorderasbooked命令。否则,它将向订单(order)聚合发送一条reservationrejected消息。

前面的代码示例显示了流程管理器如何发送expireorder命令。基础设施负责将消息保存在队列中,等待15分钟的延迟。

您可以借鉴seatsavailability和registrationprocessmanager类里的代码,以查看其他消息处理程序是如何实现的。它们都遵循相同的模式:接收消息、执行一些逻辑并发送消息。

jana(软件架构师)发言:

本章展示的代码示例都来自会议管理系统的早期版本。下一章将展示当团队持续探索该领域以及学习了更多cqrs模式的知识之后,设计和实现是如何随之发展的。

基础设施

下面的序列图展示了基础设施组件如何与领域对象交互消息的。

当ui中的mvc控制器使用命令总线发送消息时,典型的交互就开始了。消息发送方异步调用命令总线上的send方法。然后命令总线存储消息,直到消息接收者收到消息并将其转发给适当的处理程序。系统包含许多命令处理程序,这些命令处理程序向命令总线注册,以处理特定类型的命令。例如,ordercommandhandler类为registertoconference、markorderasbooking和rejectorder命令定义了处理程序方法。下面的代码示例显示了markorderasbooking命令的处理程序方法。处理程序方法负责寻找正确的聚合实例,调用该实例上的方法,然后保存该实例。

public void handle(markorderasbooked command)
{
    var repository = this.repositoryfactory();

    using (repository as idisposable)
    {
        var order = repository.find<order>(command.orderid);

        if (order != null)
        {
            order.markasbooked();
            repository.save(order);
        }
    }
}

实现irepository接口的类负责在事件总线上持久化聚合对象并发布聚合里引发的任何事件,所有的这些都是事务的一部分。

carlos(领域专家)发言:

稍后,当团队试图使用azure服务总线作为消息传递基础设施时,发现了一个问题。azure服务总线不支持带有数据库的分布式事务。有关这个问题的讨论,请参阅第5章“准备发布v1版本”。

在注册限界上下文中,惟一的事件订阅者是registrationprocessmanager类。它的router订阅者从订阅事件总线订阅,来处理特定的事件,下面的代码示例展示了registrationprocessmanager类。

我们使用了术语handler来指代处理命令和事件并将它们转发给聚合实例的类,使用术语router来指代处理事件和命令并将它们转发给流程管理器实例的类。
public void handle(reservationaccepted @event)
{
    var repo = this.repositoryfactory.invoke();
    using (repo as idisposable)
    {
        lock (lockobject)
        {
            var process = repo.find<registrationprocessmanager>(@event.reservationid);
            process.handle(@event);

            repo.save(process);
        }
    }
}

通常,事件处理程序方法获取流程管理器实例,将事件传递给流程管理器,然后保存流程管理器实例。在本例中,irepository实例负责持久化流程管理器实例,并负责将任何命令从流程管理器实例发送到命令总线。

使用azure服务总线(service bus)

为了传输命令和事件,团队决定使用azure服务总线来提供底层消息传递基础设施。本节描述了系统如何使用azure服务总线,以及团队在设计阶段考虑的一些替代方案和权衡。

jana(软件架构师)发言:

contoso的开发团队决定使用azure服务总线,因为它为会议管理系统中的消息传递场景提供了开箱即用的支持。这将最小化团队需要编写的代码量,并提供健壮的、可伸缩的消息传递基础设施。该团队计划使用重复消息检测和保证消息排序等功能。要了解azure服务总线和azure队列之间的区别,请参阅msdn上的“microsoft azure queues and microsoft azure service bus queues - compared and contrasted”。

下图显示了命令和事件消息如何在系统中流动。mvc控制器和领域对象使用commandbus和eventbus实例将brokeredmessage消息发送给azure服务总线中的两个topic之一。接收消息时,消息处理类是commandprocessor和eventprocessor实例,commandprocessor类确定哪个处理程序应该接收命令消息,eventprocessor类确定哪些处理程序应该接收事件消息。后者使用subscriptionreceiver类从topic获取事件。处理程序实例负责调用领域对象上的方法。

azure服务总线的topic可以有多个订阅者。azure服务总线将发送到topic的消息传递给它的所有订阅者。因此,一条消息可以有多个接收者。

在最初的实现中,commandbus和eventbus类非常相似。send方法和publish方法之间的惟一区别是,send方法期望消息被包装在envelope类中。envelope类允许发送方指定消息传递的时间延迟。

事件可以有多个接收者。在上图的示例中,reservationrejected事件被发送到registrationprocessmanager、waitlistprocessmanager和另一个目的地。eventprocessor类通过检查已注册的处理程序列表来标识收到事件的处理程序列表。

命令只有一个接收者。在上图中,makeseatreservation被发送到可用座位(seatsavailability)聚合。只有一个为该命令注册的处理程序。commandprocessor类通过检查已注册的处理程序列表来标识收到命令的处理程序。

这一实现带来了一些问题:

  • 如何将命令的传递限制为单个接收?
  • 如果commandbus和eventbus类如此相似,为什么要分别使用它们呢?
  • 这种实现的可伸缩性如何?
  • 这种实现的健壮性如何?
  • 怎么划分topic和订阅的粒度?
  • 命令和事件如何序列化?

下面几节将讨论这些问题。

将命令传递给单个接收者

本讨论假设您已经基本了解了azure服务总线队列和topic之间的区别。有关azure服务总线的介绍,请参阅参考指南中的“”。

使用上图所示的实现,有两件事是必要的,以确保命令只有单个处理程序。首先,azure服务总线中应该保证只有一个会议/命令topic的订阅。请记住,azure服务总线主题是可以有多个订阅者的。其次,commandprocessor应该为它接收到的每个命令调用一个处理程序。azure服务总线中没有办法将主题限制为单个订阅。因此,开发人员必须自己小心的为命令的topic创建单个订阅。

gary(cqrs专家)发言:

另一个问题是确保处理程序从topic获取命令后只处理一次。您必须确保命令是幂等的,或者系统保证只处理命令一次。该团队将在旅程的后期处理这个问题。有关更多信息,请参见旅程第7章“”。

备注:可能会运行多个subscriptionreceiver实例,因为可以同时部署运行多个工作服务。如果多个subscriptionreceiver实例可以接收来自同一主题订阅的消息,那么第一个调用subscriptionclient对象上的receive方法的实例将获取并处理该命令。

另一种方法是使用azure服务总线队列代替topic来传递命令。azure服务总线队列与topic的不同之处在于,它们的设计目的是将消息传递给单个接收者,而不是通过多个订阅传递给多个接收者。开发人员计划更详细的评估这个方案,以便在项目的稍后部分用此方案来实现。

下面来自subscriptionreceiver类的代码示例显示了它如何接收来自topic订阅的消息。

private subscriptionclient client;

...

private void receivemessages(cancellationtoken cancellationtoken)
{
    while (!cancellationtoken.iscancellationrequested)
    {
        brokeredmessage message = null;

        try
        {
            message = this.receiveretrypolicy.executeaction<brokeredmessage>(this.doreceivemessage);
        }
        catch (exception e)
        {
            trace.traceerror("an unrecoverable error occurred while trying to receive a new message:\r\n{0}", e);

            throw;
        }

        try
        {
            if (message == null)
            {
                thread.sleep(100);
                continue;
            }

            this.messagereceived(this, new brokeredmessageeventargs(message));
        }
        finally
        {
            if (message != null)
            {
                message.dispose();
            }
        }
    }
}

protected virtual brokeredmessage doreceivemessage()
{
    return this.client.receive(timespan.fromseconds(10));
}

jana(软件架构师)发言:

此代码示例展示了系统如何使用transient fault handling application block可靠地从topic获取消息。

azure服务总线subscriptionclient类使用peek/lock技术从订阅中获取消息。在代码示例中,receive方法在订阅时锁定消息。当消息被锁定时,其他客户端无法看到它。然后receive方法尝试处理消息。如果客户端成功处理消息,则调用complete方法:这将从订阅中删除消息。否则,如果客户端未能成功处理该消息,则调用abandon方法:这将释放消息上的锁,然后相同的客户端或不同的客户端就可以继续接收它。如果客户端在固定的时间内没有调用complete方法或abandon方法,则也会释放消息上的锁。

messagereceived事件将一个引用传递给subscriptionreceiver实例,以便处理程序在处理消息时可以调用complete方法或abandon方法。

下面来自messageprocessor类的代码示例展示了如何使用brokeredmessage实例作为messagereceived事件的参数以及如何使用它调用complete和abandon方法。

private void onmessagereceived(object sender, brokeredmessageeventargs args)
{
    var message = args.message;

    object payload;
    using (var stream = message.getbody<stream>())
    using (var reader = new streamreader(stream))
    {
        payload = this.serializer.deserialize(reader);
    }

    try
    {
        ...

        processmessage(payload);

        ...
    }
    catch (exception e)
    {
        if (args.message.deliverycount > maxprocessingretries)
        {
            trace.tracewarning("an error occurred while processing a new message and will be dead-lettered:\r\n{0}", e);
            message.safedeadletter(e.message, e.tostring());
        }
        else
        {
            trace.tracewarning("an error occurred while processing a new message and will be abandoned:\r\n{0}", e);
            message.safeabandon();
        }

        return;
    }

    trace.traceinformation("the message has been processed and will be completed.");
    message.safecomplete();
}

备注:本示例使用可靠的transient fault handling application block,并使用扩展方法调用brokeredmessage的complete方法和abandon方法。

为什么分为commandbus和eventbus?

尽管在会议管理系统开发的早期阶段,commandbus和eventbus类的实现非常相似,您可能想知道为什么我们同时拥有这两个分开的类,因为团队预计它们在未来会出现区别。

markus(软件开发人员)发言:

在调用处理程序的方式和为它们捕获什么样的上下文方面可能存在差异:命令可能希望捕获额外的运行时状态,而事件通常不需要这样做。由于这些潜在的未来差异,我不想统一实现。我以前也遇到过这种情况,一旦有进一步的要求时,我就把它们分开。

这个方案的可扩展性如何?

使用这种方案,您可以在不同的azure工作角色实例中运行subscriptionreceiver类的多个实例和各种处理程序,这使您能够扩展您的解决方案。您还可以在不同的azure工作角色实例中拥有commandbus、eventbus和topicsender类的多个实例。

有关扩展azure服务总线基础设施的信息,请参阅msdn上的best practices for performance improvements using service bus brokered messaging

这个方案的健壮性如何?

方案使用azure服务总线的代理消息传递选项来提供异步消息传递。服务总线总是可靠地存储消息,直到用户连接并获取这些消息。

另外,从队列或topic订阅获取消息的peek/lock方法为消息消费者在处理消息失败的场景中增加了可靠性。如果消费者在调用complete方法之前失败,则当消费者重新启动时,任然可以处理该消息。

怎么划分topic和订阅的粒度?

当前的实现是系统中的所有命令都使用一个topic(会议/命令),为系统中的所有事件也使用一个topic(会议/事件)。每个topic都有一个订阅,每个订阅接收发送到该主题的所有消息。commandprocessor和eventprocessor类负责将消息传递给正确的处理程序。

将来,团队会研究使用多个topic,例如,为每个限界上下文使用单独的命令topic和多个订阅(一个事件类型一个订阅)。这些替代方案可以简化代码,并促进扩展应用程序跨多个azure工作角色,来工作。

jana(软件架构师)发言:

使用多个topic、订阅或队列没有额外的成本。azure服务总线是根据发送的消息数量和从azure子区域传输的数据量来进行计费的。

命令和事件如何序列化?

contoso会议管理系统使用json.net来序列化和反序列化。有关应用程序如何使用序列化工具的详细信息,请参阅参考指南中的“”

您应该考虑是否需要为命令使用azure服务总线。命令通常使用在有边界的上下文中,您可能不需要跨进程边界发送它们(在写入端,您可能不需要额外的层),在这种情况下,您可以使用内存队列来传递命令。” -- greg young,与模式与实践团队的对话

对测试的影响

因为这是团队处理的第一个限界上下文,所以关键一点是,如果团队希望采用测试驱动开发(tdd),那么如何进行测试。下面是两名开发人员之间的对话,他们讨论了在没有事件源(es)的情况下实现cqrs模式时如何进行tdd,对话总结了他们的想法:

  • 开发人员1:如果我们使用事件源(es),那么在创建领域对象时使用tdd方法将会很容易。测试的输入将是一个命令(可能起源于ui),然后我们可以测试领域对象是否触发了预期的事件。然而,如果我们不使用事件源,我们就没有任何事件,领域对象的行为是通过orm层将其更改持久化到数据存储中的。

  • 开发人员2:那么我们为什么不发起事件呢?我们没有使用事件源(es)并不意味着我们的领域对象不能引发事件。让领域对象引发事件,然后我们可以按照通常的方法设计测试,以检查响应命令时触发的正确事件。

  • 开发人员1:这难道不是让事情变得比需要的更复杂了吗?使用cqrs的动机之一就是简化事情!现在我们有了领域对象,它们需要使用orm层来持久化它们的状态。然后我们又要引发事件来报告它们所持久化的内容,因为这样我们就可以运行单元测试了。

  • 开发人员2:我明白你的意思。

  • 开发人员1:我们可能在如何进行测试上遇到了瓶颈。也许我们不应该基于领域对象的预期行为来设计测试,而是应该考虑在领域对象处理命令之后测试它们的状态。

  • 开发人员2:这应该很容易做到,毕竟,领域对象把我们想要检查的所有数据都存储在属性中,以便orm可以将正确的信息持久化到存储中。

  • 开发人员1:所以我们只需要考虑在这个场景中使用另一种不同的风格进行测试。

  • 开发人员2:我们还需要考虑这个问题的另一个方面:我们可能有一组测试来测试领域对象,并且所有这些测试都可能通过。我们还可能有一组测试来验证orm层是否能够成功地保存和获取对象。但是,我们还必须测试领域对象在orm层上运行时是否正确。领域对象有可能执行正确的业务逻辑,但无法正确的持久化其状态,这可能是因为orm处理特定数据类型的方式存在问题。

有关这里讨论的两种测试方法的更多信息,请参阅martin fowler的文章“mocks aren't stubs”和steve freeman、nat pryce和joshua kerievsky编写的“point/counterpoint”。

备注:解决方案中包含的测试是使用xunit.net编写的。

下面的代码示例展示了使用上面讨论的行为方法编写的两个测试示例。

markus(软件开发人员)发言:

这些是我们刚开始时使用的测试,但是我们随后用基于状态的测试替换了它们。

public seatsavailability given_available_seats()
{
    var sut = new seatsavailability(seattypeid);
    sut.addseats(10);
    return sut;
}

[testmethod]
public void when_reserving_less_seats_than_total_then_succeeds()
{
    var sut = this.given_available_seats();
    sut.makereservation(guid.newguid(), 4);
}

[testmethod]
[expectedexception(typeof(argumentoutofrangeexception))]
public void when_reserving_more_seats_than_total_then_fails()
{
    var sut = this.given_available_seats();
    sut.makereservation(guid.newguid(), 11);
}

这两个测试共同验证了可用座位(seatsavailability)聚合的行为。在第一个测试中,预期的行为是makereservation方法成功,并且不会抛出异常。在第二个测试中,makereservation方法的预期行为是抛出异常,因为没有足够的空闲座位来完成预订。

如果没有聚合引发事件,则很难以任何其他方式测试行为。例如,检查是否进行了正确的调用以将聚合持久化到数据存储里,如果您试图用这个来测试行为,那么测试就会和数据存储实现耦合(这是一种坏气味):如果希望更改数据存储的实现,那么就需要更改领域模型中对聚合的测试。

下面的代码示例展示了使用被测试对象的状态编写的测试示例。这是在项目中使用的一种测试风格。

public class given_available_seats
{
    private static readonly guid seattypeid = guid.newguid();

    private seatsavailability sut;
    private ipersistenceprovider sutprovider;

    protected given_available_seats(ipersistenceprovider sutprovider)
    {
        this.sutprovider = sutprovider;
        this.sut = new seatsavailability(seattypeid);
        this.sut.addseats(10);

        this.sut = this.sutprovider.persistreload(this.sut);
    }

    public given_available_seats()
        : this(new nopersistenceprovider())
    {
    }

    [fact]
    public void when_reserving_less_seats_than_total_then_seats_become_unavailable()
    {
        this.sut.makereservation(guid.newguid(), 4);
        this.sut = this.sutprovider.persistreload(this.sut);

        assert.equal(6, this.sut.remainingseats);
    }

    [fact]
    public void when_reserving_more_seats_than_total_then_rejects()
    {
        var id = guid.newguid();
        sut.makereservation(id, 11);

        assert.equal(1, sut.events.count());
        assert.equal(id, ((reservationrejected)sut.events.single()).reservationid);
    }
}

这里展示的两个测试在调用makereservation方法后测试可用座位(seatsavailability)聚合的状态。第一个用来测试有足够座位可用的场景。第二个用来测试没有足够的座位可用的场景。第二个测试可以利用可用座位(seatsavailability)聚合的行为,因为如果该聚合拒绝预订,它确实会引发一个事件。

汇总

在旅程的第一阶段,我们探索了实现cqrs模式的一些基础知识,并为下一阶段做了一些准备。

下一章将描述我们如何扩展和增强已经完成的工作,为订单和注册限界上下文添加更多的特性和功能。我们还将研究一些额外的测试技术,以了解它们可能如何帮助我们实现这一目标。

《CQRS之旅——旅程3(订单和注册限界上下文).doc》

下载本文的Word格式文档,以方便收藏与打印。