The Aggregate executes commands, encapsulates domain logic and produce events.
An Aggregate is the transaction boundary. This means consistency is only guaranteed for state transfers within one aggregate.
The only way to address an Aggregate is by the unique Aggregate Id. It is not possible to load an Aggregate by a value of the Aggregate State.
Aggregate State
The Aggregate State is the data that has been applied to the Aggregate State from the Event Payload.
The Aggregate State should only contain data that is relevant for state transfers in the command execution.
Create: This option will create a new aggregate with a new unique ID.
Load: This will load an existing aggregate by ID and will throw an error if the aggregate is not found.
Upsert: This will try to load an aggregate by ID and will create a new aggregate with that ID if the aggregate is not found.
For Aggregate Load Types Load and Upsert, you need to connect a data source to the aggregate id
in the aggregate load condition. See Load Condition.
For more information on how to set a data source, see Data Flow.
The aggregate load condition is configured
To change the Load Condition start again with Step 1.
Schema
The properties of the Aggregate method are planned with the Codebricks Planner.
These properties of the Signature are Value Objects.
For more details, see Schema.
Value Sources
The Aggregate method Property Value Sources define how each property is initialized.
For more details, see Data Flow.
The State properties build the data schema for the aggregate state.
For more details, see Schema.
The aggregate class is located at: Task/src/shared/domain/aggregate/TaskAggregate.ts
1import { Aggregate, Event, OverwriteProtectionBody, UuidGenerator, Clock, ConflictError, PreconditionFailedError, ValidationError } from "@codebricks/typebricks";2import { TaskAggregateState, defaultState } from "./TaskAggregateState";3import { AddTaskProperties, CompleteTaskProperties } from "./taskMethodsProperties";4import { TaskAddedEvent } from "shared/domain/events/TaskAddedEvent";5import { TaskStatusEnum } from "shared/domain/enums/TaskStatusEnum";6import { TaskCompletedEvent } from "shared/domain/events/TaskCompletedEvent";78export class TaskAggregate extends Aggregate<TaskAggregateState> {9 static readonly aggregateName: string = 'Task';10 readonly applyMethods = {11 [TaskAddedEvent.name]: this.applyTaskAddedEvent.bind(this),12 [TaskCompletedEvent.name]: this.applyTaskCompletedEvent.bind(this)13 };1415 constructor(id: string) {16 super(id, 0, defaultState);17 }1819 @OverwriteProtectionBody(false)20 apply(event: Event<any>): TaskAggregateState {21 return this.applyMethods[event.constructor.name](event);22 }2324 @OverwriteProtectionBody(false)25 async addTask(properties: AddTaskProperties): Promise<void> {26 const taskAddedEvent = new TaskAddedEvent(27 {28 id: UuidGenerator.uuid(),29 aggregateId: this.id,30 aggregateVersion: this.version + 1,31 payload: {32 title: properties.title.value,33 description: properties.description.value,34 assigneeId: properties.assigneeId.value,35 },36 occurredAt: Clock.now()37 }38 );39 this.addEvent(taskAddedEvent);40 }4142 @OverwriteProtectionBody(false)43 applyTaskAddedEvent(event: TaskAddedEvent): TaskAggregateState {44 const newAggregateState: TaskAggregateState = { ...this.state };4546 newAggregateState.status = TaskStatusEnum.Open;4748 return newAggregateState;49 }5051 @OverwriteProtectionBody(false)52 async completeTask(properties: CompleteTaskProperties): Promise<void> {53 const taskCompletedEvent = new TaskCompletedEvent(54 {55 id: UuidGenerator.uuid(),56 aggregateId: this.id,57 aggregateVersion: this.version + 1,58 payload: {59 },60 occurredAt: Clock.now()61 }62 );63 this.addEvent(taskCompletedEvent);64 }6566 @OverwriteProtectionBody(false)67 applyTaskCompletedEvent(event: TaskCompletedEvent): TaskAggregateState {68 const newAggregateState: TaskAggregateState = { ...this.state };6970 newAggregateState.status = TaskStatusEnum.Completed;7172 return newAggregateState;73 }74}
Properties and Methods:
id
, version 0
, and the default state (from Aggregate State).addTask
, completeTask
): Implement business logic and create events.applyTaskAddedEvent
): Handle state changes when events are applied to the aggregate.Aggregate method for a command is the place where business logic is implemented. They create events based on the command and apply them to the aggregate.
The addTask
method:
1@OverwriteProtectionBody(false)2addTask(properties: AddTaskProperties): void {3 const taskAddedEvent = new TaskAddedEvent({4 id: UuidGenerator.uuid(),5 aggregateId: this.id,6 aggregateVersion: this.version + 1,7 payload: {8 title: properties.title.value,9 description: properties.description.value,10 assigneeId: properties.assigneeId.value,11 },12 occurredAt: Clock.now()13 });14 this.addEvent(taskAddedEvent);15}
The AddTaskProperties
interface of the addTask
method:
1export interface AddTaskProperties {2 title: TitleValueObject;3 description: DescriptionValueObject;4 assigneeId: AssigneeIdValueObject;5}
Aggregates methods should produce the errors ConflictError
, PreconditionFailedError
, and ValidationError
to ensure robust command execution, as these Errors are rendered for you by default.
Apply methods handle state transitions when an event is applied. For example, the applyTaskAddedEvent
method:
1@OverwriteProtectionBody(false)2applyTaskAddedEvent(event: TaskAddedEvent): TaskAggregateState {3 const newAggregateState: TaskAggregateState = { ...this.state };4 // Modify newAggregateState based on event payload5 return newAggregateState;6}
The aggregate state class is located at: Task/src/shared/domain/aggregate/TaskAggregateState.ts
1import { Clock } from "@codebricks/typebricks";2import { TaskStatusEnum } from "shared/domain/enums/TaskStatusEnum";34export interface TaskAggregateState {5 status: TaskStatusEnum;6}78export const defaultState: TaskAggregateState = {9 status: TaskStatusEnum.Open,10};
Properties:
TaskAggregateState
interface: Defines the values persisted inside the aggregate state.defaultState
constant: Sets the default state for your aggregate.