Projector
Introduction
In an event-sourced system using CQRS, a Projector plays a critical role in updating read models based on consumed events. When events occur, projectors convert these events into operations (upsert, update, delete) on Projected Entities. This process ensures that the read models reflect the current state as per the events captured.
Implementation
Projector Class
The ActiveTaskOverviewProjector
class demonstrates how to implement a projector. It listens for specific events and updates the projected entities accordingly. The implementation can be found at: Task/src/useCases/read/ActiveTaskOverviewProjection/application/ActiveTaskOverviewProjector.ts
.
import { Projector, ProjectMethods, OverwriteProtectionBody } from "@codebricks/codebricks-framework";
import { ActiveTaskOverviewEntity } from "shared/application/readmodels/ActiveTaskOverviewEntity";
import { ActiveTaskOverviewRepository, ActiveTaskOverviewRepositoryMethods } from "../infrastructure/ActiveTaskOverviewRepository";
import { TaskAddedEventMessage } from "shared/application/inboundEvents/TaskAddedEventMessage";
import { TaskCompletedEventMessage } from "shared/application/inboundEvents/TaskCompletedEventMessage";
export class ActiveTaskOverviewProjector extends Projector<ActiveTaskOverviewEntity> {
readonly projectionName: string = 'ActiveTaskOverview';
readonly projectMethods: ProjectMethods = {
'Demo.Task.TaskAdded': this.projectDemoTaskTaskAdded.bind(this),
'Demo.Task.TaskCompleted': this.projectDemoTaskTaskCompleted.bind(this)
};
readonly streamNames: string[] = ['Demo.Task'];
constructor(readonly repository: ActiveTaskOverviewRepository = new ActiveTaskOverviewRepository()) {
super(repository);
}
@OverwriteProtectionBody(false)
async projectDemoTaskTaskAdded(eventMessage: TaskAddedEventMessage, methods: ActiveTaskOverviewRepositoryMethods): Promise<void> {
const existingProjectedEntity: ActiveTaskOverviewEntity | null = await methods.getOne({
where: {
taskId: eventMessage.aggregateId,
}
});
const projectedEntity: ActiveTaskOverviewEntity = this.applyDemoTaskTaskAdded(existingProjectedEntity, eventMessage);
await methods.updateOne(projectedEntity);
}
@OverwriteProtectionBody(false)
async projectDemoTaskTaskCompleted(eventMessage: TaskCompletedEventMessage, methods: ActiveTaskOverviewRepositoryMethods): Promise<void> {
await methods.delete(
{
where: {
taskId: eventMessage.aggregateId,
}
}
);
}
@OverwriteProtectionBody(false)
applyDemoTaskTaskAdded(existingProjectedEntity: ActiveTaskOverviewEntity | null, eventMessage: TaskAddedEventMessage): ActiveTaskOverviewEntity {
const newProjectedEntity = existingProjectedEntity ? new ActiveTaskOverviewEntity({...existingProjectedEntity}) : new ActiveTaskOverviewEntity();
newProjectedEntity.taskId = eventMessage.aggregateId;
newProjectedEntity.title = eventMessage.payload.title;
newProjectedEntity.description = eventMessage.payload.description;
newProjectedEntity.assigneeId = eventMessage.payload.assigneeId;
return newProjectedEntity;
}
}
Key Concepts
-
projectionName
andstreamNames
Properties:projectionName
: Identifies the specific projection this projector handles.streamNames
: Specifies the event streams from which this projector listens for events.
-
projectMethods
Property:- Maps specific events to their processing methods.
- For example,
projectDemoTaskTaskAdded
handles theTaskAdded
event, whileprojectDemoTaskTaskCompleted
handles theTaskCompleted
event.
-
Project Methods:
projectDemoTaskTaskAdded
: Fetches existing projected entities, applies updates from the event, and saves the updated entities.projectDemoTaskTaskCompleted
: Deletes the projected entity based on the event.
-
Apply Methods:
applyDemoTaskTaskAdded
: Updates the projected entity with values derived from the event message.
Best Practices
- Consistency: Ensure projections accurately reflect the current state as per the events processed.
- Error Management: Implement robust error handling to prevent failures in projection from affecting system stability.
- Performance: Optimize the projection logic to handle high volumes of events efficiently.
By default, errors are logged to ensure processing continues for subsequent events. If you require stopping processing upon an error, customize the error handling logic in the catch block.