Readmodel

The Readmodel is a prepared view on data that is projected from Events. The data schema of a Readmodel is optimized for the need of the consumer that queries the Readmodel.

It is written via a Projection Usecase.

Plan Readmodel Projection

Add source event

1
Select Projection (P) use case
  • Click the Projection (P) use case on the Plan.
2
Open the use case menu
  • Click the use case menu icon on left side.
3
Select a source event
  • Select a source event and press the add button.
  • Repeat to add multiple events.
Take care to add the event of the right aggregate. Other aggregates can have Events with the same name.
Take care to add the event of the right bounded context. You can add Events from all bounded contexts you have access to.

Your screen should look like that

Projection Operation

Determines the amount of entities and the error behaviour when an event manipulates the Readmodel:

  • upsert one: Insert or update a single entry - throws no error
  • update one: Update a single entry - throws an error when the entry was not found
  • update many: Update zero entries or a single entry or multiple entries - throws no error
  • delete: Remove zero entries or a single entry or multiple entries - throws no error

Projected Entity

Name

The Name is used to name the Projected Entity and Projector. It also sets the folder and infrastructure name for the use case.

For more details, see Naming.

Schema

The properties of the Projected Entity Schema are planned with the Codebricks Planner.

Additionally to the Schema we define the primary key of the Projected entity here.

The Primary Key is a property or a group of properties used to uniquely address an entry in a Readmodel table.

You need to set at least one property to Primary Key.

For more details, see Schema.

Value Sources

The Projected Entity Schema Property Value Sources define how each property is initialized.

For more details, see Data Flow.

The Projected Entity can use the following Metadata Value Sources:

  • Aggregate Id: uuid - Id of the aggregate of the processed event.
  • Aggregate Version: int - Version of the aggregate of the processed event.
  • Event Occurred At: Datetime - Occurred At Timestamp of the processed event.

Projection Conditions

1
Start adding condition
  • Click plus to add a condition
2
Select a Value Target
  • The Value Target is a property of the Readmodel schema.
  • The Value Target provides the property name the projection WHERE condition.
3
Select a Value Source
  • The Value Source is a property of the Event or a Source Readmodel.
  • The Value Source provides the property value the projection WHERE condition.

Your screen should look like that

Determines the addressing when an event manipulates the Readmodel.

The properties build the WHERE condition with an AND connection between all properties in the UPDATE statement.

Implement Readmodel Projection

Projector Class

The TaskOverviewProjector class shows how to implement a projector. It listens for specific events and updates the projected entities accordingly. The implementation can be found at: Task/src/useCases/read/TaskOverviewProjection/application/TaskOverviewProjector.ts.

1import { Projector, ProjectMethods, OverwriteProtectionBody } from "@codebricks/typebricks";
2import { defaultTaskOverview, TaskOverview } from "shared/application/readmodels/TaskOverview";
3import { TaskOverviewRepository, TaskOverviewRepositoryMethods } from "../infrastructure/TaskOverviewRepository";
4import { TaskAddedEventMessage } from "shared/application/inboundEvents/TaskAddedEventMessage";
5import { TaskCompletedEventMessage } from "shared/application/inboundEvents/TaskCompletedEventMessage";
6import { TaskStatusEnum } from "shared/domain/enums/TaskStatusEnum";
7
8export class TaskOverviewProjector extends Projector {
9 readonly projectionName: string = 'TaskOverview';
10 readonly projectMethods: ProjectMethods = {
11 'Taskbricks.Task.TaskAdded': this.projectTaskbricksTaskTaskAdded.bind(this),
12 'Taskbricks.Task.TaskCompleted': this.projectTaskbricksTaskTaskCompleted.bind(this)
13 };
14 readonly streamNames: string[] = ['Taskbricks.Task'];
15
16 constructor(readonly repository: TaskOverviewRepository = new TaskOverviewRepository()) {
17 super(repository);
18 }
19
20 @OverwriteProtectionBody(false)
21 async projectTaskbricksTaskTaskAdded(eventMessage: TaskAddedEventMessage, methods: TaskOverviewRepositoryMethods): Promise<void> {
22 const existingProjectedEntity: TaskOverview | null = await methods.getOne({
23 where: {
24 taskId: eventMessage.aggregateId,
25 }
26 });
27 const projectedEntity: TaskOverview = this.applyTaskbricksTaskTaskAdded(existingProjectedEntity, eventMessage);
28 await methods.updateOne(projectedEntity);
29 }
30
31 @OverwriteProtectionBody(false)
32 async projectTaskbricksTaskTaskCompleted(eventMessage: TaskCompletedEventMessage, methods: TaskOverviewRepositoryMethods): Promise<void> {
33 const existingProjectedEntity: TaskOverview | null = await methods.getOne({
34 where: {
35 taskId: eventMessage.aggregateId,
36 }
37 });
38 const projectedEntity: TaskOverview = this.applyTaskbricksTaskTaskCompleted(existingProjectedEntity, eventMessage);
39 await methods.updateOne(projectedEntity);
40 }
41
42 @OverwriteProtectionBody(false)
43 applyTaskbricksTaskTaskAdded(existingProjectedEntity: TaskOverview | null, eventMessage: TaskAddedEventMessage): TaskOverview {
44 const newProjectedEntity = existingProjectedEntity ?? defaultTaskOverview;
45 newProjectedEntity.taskId = eventMessage.aggregateId;
46 newProjectedEntity.title = eventMessage.payload.title;
47 newProjectedEntity.description = eventMessage.payload.description;
48 newProjectedEntity.assigneeId = eventMessage.payload.assigneeId;
49 newProjectedEntity.status = TaskStatusEnum.Open;
50
51 return newProjectedEntity;
52 }
53
54 @OverwriteProtectionBody(false)
55 applyTaskbricksTaskTaskCompleted(existingProjectedEntity: TaskOverview | null, eventMessage: TaskCompletedEventMessage): TaskOverview {
56 const newProjectedEntity = existingProjectedEntity ?? defaultTaskOverview;
57 newProjectedEntity.status = TaskStatusEnum.Completed;
58
59 return newProjectedEntity;
60 }
61}

projectionName and streamNames Properties:

  • projectionName: Identifies the specific projection this projector handles.
  • streamNames: Specifies the event streams from which this projector listens for events.

projectMethods Property:

  • Maps events to their project methods.
  • For example, projectTaskbricksTaskTaskAdded handles the TaskAdded event, while projectTaskbricksTaskTaskCompleted handles the TaskCompleted event.

Project Methods:

  • projectTaskbricksTaskTaskAdded: Fetches existing projected entities, applies updates from the event, and saves the updated entities.
  • projectTaskbricksTaskTaskCompleted: Deletes the projected entity based on the event.

Apply Methods:

  • applyTaskbricksTaskTaskAdded: Updates the projected entity with values derived from the event message.

Best Practices

  • Consistency: Ensure projections accurately reflect the current state as per the events processed.
  • Error Management: Implement robust error handling to prevent failures in projection from affecting system stability.
  • Performance: Optimize the projection logic to handle high volumes of events efficiently.

Inbound Event

Inbound events are deserialized representations of event messages that are consumed from the event bus. In the context of event-driven architectures, these events represent actions or changes that have occurred in the system, which other services or components need to react to.

Inbound events play a crucial role in the CQRS and event sourcing architecture by ensuring that the system can respond to changes that originate outside the service’s own command or write operations. They help maintain consistency across microservices by allowing services to synchronize their states based on events from other services.

Below is an implementation of an inbound event, found in the following file: Task/src/shared/application/inboundEvents/TaskAddedEventMessage.ts.

1import { InboundEvent } from "@codebricks/typebricks";
2
3export interface TaskAddedEventMessageProperties {
4 streamName: string;
5 no: number;
6 id: string;
7 aggregateId: string;
8 aggregateVersion: number;
9 payload: TaskAddedEventMessagePayload;
10 occurredAt: Date;
11}
12
13export interface TaskAddedEventMessagePayload {
14 title: string;
15 description: string;
16 assigneeId: string;
17}
18
19export class TaskAddedEventMessage extends InboundEvent<TaskAddedEventMessagePayload> {
20 static readonly eventName: string = 'TaskAdded';
21
22 constructor(properties: TaskAddedEventMessageProperties) {
23 super({...properties, name: TaskAddedEventMessage.eventName});
24 }
25}

InboundEvent Class: The TaskAddedEventMessage class extends the InboundEvent class provided by the Codebricks framework. This base class likely provides common functionality for handling inbound events, such as validation, deserialization, and base properties.

TaskAddedEventMessageProperties Interface: This interface defines the structure of the data that makes up the TaskAddedEventMessage. It includes properties such as streamName, no, id, aggregateId, and aggregateVersion, which are essential for identifying and processing the event.

  • streamName: The name of the event stream this event belongs to.

  • no: The sequential number of the event within its stream.

  • id: A unique identifier for the event.

  • aggregateId: The ID of the aggregate (e.g., a specific task) that this event pertains to.

  • aggregateVersion: The version of the aggregate at the time this event was created.

  • payload: The actual data or changes introduced by the event, such as the task's title, description, and assigneeId.

  • occurredAt: The timestamp when the event occurred.

  • TaskAddedEventMessagePayload Interface: This interface defines the structure of the payload property, which contains the specific details of the task that was added.

  • Constructor: The constructor of TaskAddedEventMessage initializes the event by passing all necessary properties to the base InboundEvent class. The event name is hard coded as 'TaskAdded', indicating the specific type of event.

Projected Entity

The projected entity is used to store information that has been projected from one or more events. This allows the system to create materialized views, which are optimized for specific read queries and can significantly improve query performance by denormalizing data structures.

Projected entities are typically used to optimize the read side of a CQRS-based system, where complex joins or transformations would be costly if executed on the fly. By precomputing and storing these views, we can serve queries more efficiently.

The implementation of the projected entity is split into two files. First the projected entity itself and secondly the corresponding TypeORM entity. The first is responsible for defining the shape of the data, while the latter one is responsible for persisting it to the database.

Below is an implementation of a projected entity, found in the following file: Task/src/shared/application/readmodels/TaskOverview.ts.

1import { Clock } from "@codebricks/typebricks";
2import { TaskStatusEnum } from "shared/domain/enums/TaskStatusEnum";
3
4export interface TaskOverview {
5 taskId: string;
6 title: string;
7 description: string;
8 assigneeId: string;
9 status: TaskStatusEnum;
10}
11
12export const defaultTaskOverview: TaskOverview = {
13 taskId: '',
14 title: '',
15 description: '',
16 assigneeId: '',
17 status: TaskStatusEnum.Open,
18 };

Interface Definition: The TaskOverview interface defines the structure of the data that the readmodel will hold. It acts as a contract for the entity’s shape.

Default Value Definition: The defaultTaskOverview constant defines the default values for each property in case the Projector is not setting it.

The next code snippet shows the implementation of the corresponding TypeORM entity, found in the following file: Task/src/shared/infrastructure/persistence/readmodel/TaskOverviewEntity.ts.

1import { Entity, Column, PrimaryColumn, BaseEntity } from "typeorm";
2import { JsonColumnTransformer } from "@codebricks/typebricks";
3import { TaskOverview } from "shared/application/readmodels/TaskOverview";
4import { TaskStatusEnum } from "shared/domain/enums/TaskStatusEnum";
5
6@Entity('task_task_overview')
7export class TaskOverviewEntity extends BaseEntity {
8 @PrimaryColumn({ name: 'task_id', type: 'uuid'})
9 taskId: string;
10 @Column({ name: 'title', type: 'text'})
11 title: string;
12 @Column({ name: 'description', type: 'text'})
13 description: string;
14 @Column({ name: 'assignee_id', type: 'uuid'})
15 assigneeId: string;
16 @Column({ name: 'status', type: 'text'})
17 status: TaskStatusEnum;
18
19 constructor(props?: TaskOverview) {
20 super();
21 if (props){
22 this.taskId = props.taskId;
23 this.title = props.title;
24 this.description = props.description;
25 this.assigneeId = props.assigneeId;
26 this.status = props.status;
27 }
28 }
29}

Entity Definition: The TaskOverviewEntity class is decorated with TypeORM annotations, marking it as a database entity tied to the task_overview table.

Primary Column: The taskId field is marked as the primary key, ensuring each task overview is uniquely identifiable.

Columns: The title, description, and assigneeId fields are annotated as columns, defining their types and how they map to the database schema.

Constructor: The constructor allows for the instantiation of the entity with specific properties.

TypeORM Integration: TypeORM is used to define the schema and interact with the database. The @Entity, @PrimaryColumn, and @Column decorators link the entity's properties to the database structure.


© 2024 Codebricks | All rights reserved.