Aggregate

The Aggregate executes commands, encapsulates domain logic and produce events.

An Aggregate is the transaction boundary. This means consistency is only guaranteed for state transfers within one aggregate.

The only way to address an Aggregate is by the unique Aggregate Id. It is not possible to load an Aggregate by a value of the Aggregate State.

Aggregate State

The Aggregate State is the data that has been applied to the Aggregate State from the Event Payload.

The Aggregate State should only contain data that is relevant for state transfers in the command execution.

Plan Aggregate Method

Load Type

  • Create: This option will create a new aggregate with a new unique ID.

  • Load: This will load an existing aggregate by ID and will throw an error if the aggregate is not found.

  • Upsert: This will try to load an aggregate by ID and will create a new aggregate with that ID if the aggregate is not found.

    For Aggregate Load Types Load and Upsert, you need to connect a data source to the aggregate id in the aggregate load condition. See Load Condition.

For more information on how to set a data source, see Data Flow.

Load Condition

1
Start adding condition
  • Click plus to add the condition
2
Select a Value Source
  • The Value Source is a property of the Command or a Source Readmodel.
Only properties of the type UUID are valid Value Sources for the load condition

The aggregate load condition is configured

To change the Load Condition start again with Step 1.

Signature

Schema

The properties of the Aggregate method are planned with the Codebricks Planner.

These properties of the Signature are Value Objects.

For more details, see Schema.

Value Sources

The Aggregate method Property Value Sources define how each property is initialized.

For more details, see Data Flow.

Plan Aggregate State

State

The State properties build the data schema for the aggregate state.

For more details, see Schema.

Implement Aggregate

Aggregate

The aggregate class is located at: Task/src/shared/domain/aggregate/TaskAggregate.ts

1import { Aggregate, Event, OverwriteProtectionBody, UuidGenerator, Clock, ConflictError, PreconditionFailedError, ValidationError } from "@codebricks/typebricks";
2import { TaskAggregateState, defaultState } from "./TaskAggregateState";
3import { AddTaskProperties, CompleteTaskProperties } from "./taskMethodsProperties";
4import { TaskAddedEvent } from "shared/domain/events/TaskAddedEvent";
5import { TaskStatusEnum } from "shared/domain/enums/TaskStatusEnum";
6import { TaskCompletedEvent } from "shared/domain/events/TaskCompletedEvent";
7
8export class TaskAggregate extends Aggregate<TaskAggregateState> {
9 static readonly aggregateName: string = 'Task';
10 readonly applyMethods = {
11 [TaskAddedEvent.name]: this.applyTaskAddedEvent.bind(this),
12 [TaskCompletedEvent.name]: this.applyTaskCompletedEvent.bind(this)
13 };
14
15 constructor(id: string) {
16 super(id, 0, defaultState);
17 }
18
19 @OverwriteProtectionBody(false)
20 apply(event: Event<any>): TaskAggregateState {
21 return this.applyMethods[event.constructor.name](event);
22 }
23
24 @OverwriteProtectionBody(false)
25 async addTask(properties: AddTaskProperties): Promise<void> {
26 const taskAddedEvent = new TaskAddedEvent(
27 {
28 id: UuidGenerator.uuid(),
29 aggregateId: this.id,
30 aggregateVersion: this.version + 1,
31 payload: {
32 title: properties.title.value,
33 description: properties.description.value,
34 assigneeId: properties.assigneeId.value,
35 },
36 occurredAt: Clock.now()
37 }
38 );
39 this.addEvent(taskAddedEvent);
40 }
41
42 @OverwriteProtectionBody(false)
43 applyTaskAddedEvent(event: TaskAddedEvent): TaskAggregateState {
44 const newAggregateState: TaskAggregateState = { ...this.state };
45
46 newAggregateState.status = TaskStatusEnum.Open;
47
48 return newAggregateState;
49 }
50
51 @OverwriteProtectionBody(false)
52 async completeTask(properties: CompleteTaskProperties): Promise<void> {
53 const taskCompletedEvent = new TaskCompletedEvent(
54 {
55 id: UuidGenerator.uuid(),
56 aggregateId: this.id,
57 aggregateVersion: this.version + 1,
58 payload: {
59 },
60 occurredAt: Clock.now()
61 }
62 );
63 this.addEvent(taskCompletedEvent);
64 }
65
66 @OverwriteProtectionBody(false)
67 applyTaskCompletedEvent(event: TaskCompletedEvent): TaskAggregateState {
68 const newAggregateState: TaskAggregateState = { ...this.state };
69
70 newAggregateState.status = TaskStatusEnum.Completed;
71
72 return newAggregateState;
73 }
74}

Properties and Methods:

  • aggregateName property: Required to persist your events correctly.
  • applyMethods property: Maps events to their respective apply methods.
  • Constructor: Initializes the aggregate with an id, version 0, and the default state (from Aggregate State).
  • Command methods (e.g., addTask, completeTask): Implement business logic and create events.
  • Apply methods (e.g., applyTaskAddedEvent): Handle state changes when events are applied to the aggregate.

Aggregate Method

Aggregate method for a command is the place where business logic is implemented. They create events based on the command and apply them to the aggregate.

The addTask method:

1@OverwriteProtectionBody(false)
2addTask(properties: AddTaskProperties): void {
3 const taskAddedEvent = new TaskAddedEvent({
4 id: UuidGenerator.uuid(),
5 aggregateId: this.id,
6 aggregateVersion: this.version + 1,
7 payload: {
8 title: properties.title.value,
9 description: properties.description.value,
10 assigneeId: properties.assigneeId.value,
11 },
12 occurredAt: Clock.now()
13 });
14 this.addEvent(taskAddedEvent);
15}

The AddTaskProperties interface of the addTask method:

1export interface AddTaskProperties {
2 title: TitleValueObject;
3 description: DescriptionValueObject;
4 assigneeId: AssigneeIdValueObject;
5}

Aggregates methods should produce the errors ConflictError, PreconditionFailedError, and ValidationError to ensure robust command execution, as these Errors are rendered for you by default.

Apply Method

Apply methods handle state transitions when an event is applied. For example, the applyTaskAddedEvent method:

1@OverwriteProtectionBody(false)
2applyTaskAddedEvent(event: TaskAddedEvent): TaskAggregateState {
3 const newAggregateState: TaskAggregateState = { ...this.state };
4 // Modify newAggregateState based on event payload
5 return newAggregateState;
6}

Aggregate State

The aggregate state class is located at: Task/src/shared/domain/aggregate/TaskAggregateState.ts

1import { Clock } from "@codebricks/typebricks";
2import { TaskStatusEnum } from "shared/domain/enums/TaskStatusEnum";
3
4export interface TaskAggregateState {
5 status: TaskStatusEnum;
6}
7
8export const defaultState: TaskAggregateState = {
9 status: TaskStatusEnum.Open,
10};

Properties:

  • TaskAggregateState interface: Defines the values persisted inside the aggregate state.
  • defaultState constant: Sets the default state for your aggregate.

Best Practices

  • Keep aggregates small: Include only the state and behavior that logically belong to the aggregate.
  • Avoid dependencies: Aggregates should not directly depend on other aggregates.
  • Consistency boundary: Ensure all business rules and invariants are enforced within the aggregate boundary.

© 2024 Codebricks | All rights reserved.