Skip to main content

Projector

Introduction

In an event-sourced system using CQRS, a Projector plays a critical role in updating read models based on consumed events. When events occur, projectors convert these events into operations (upsert, update, delete) on Projected Entities. This process ensures that the read models reflect the current state as per the events captured.

Implementation

Projector Class

The ActiveTaskOverviewProjector class demonstrates how to implement a projector. It listens for specific events and updates the projected entities accordingly. The implementation can be found at: Task/src/useCases/read/ActiveTaskOverviewProjection/application/ActiveTaskOverviewProjector.ts.

import { Projector, ProjectMethods, OverwriteProtectionBody } from "@codebricks/codebricks-framework";
import { ActiveTaskOverviewEntity } from "shared/application/readmodels/ActiveTaskOverviewEntity";
import { ActiveTaskOverviewRepository, ActiveTaskOverviewRepositoryMethods } from "../infrastructure/ActiveTaskOverviewRepository";
import { TaskAddedEventMessage } from "shared/application/inboundEvents/TaskAddedEventMessage";
import { TaskCompletedEventMessage } from "shared/application/inboundEvents/TaskCompletedEventMessage";

export class ActiveTaskOverviewProjector extends Projector<ActiveTaskOverviewEntity> {
readonly projectionName: string = 'ActiveTaskOverview';
readonly projectMethods: ProjectMethods = {
'Demo.Task.TaskAdded': this.projectDemoTaskTaskAdded.bind(this),
'Demo.Task.TaskCompleted': this.projectDemoTaskTaskCompleted.bind(this)
};
readonly streamNames: string[] = ['Demo.Task'];

constructor(readonly repository: ActiveTaskOverviewRepository = new ActiveTaskOverviewRepository()) {
super(repository);
}

@OverwriteProtectionBody(false)
async projectDemoTaskTaskAdded(eventMessage: TaskAddedEventMessage, methods: ActiveTaskOverviewRepositoryMethods): Promise<void> {
const existingProjectedEntity: ActiveTaskOverviewEntity | null = await methods.getOne({
where: {
taskId: eventMessage.aggregateId,
}
});
const projectedEntity: ActiveTaskOverviewEntity = this.applyDemoTaskTaskAdded(existingProjectedEntity, eventMessage);
await methods.updateOne(projectedEntity);
}

@OverwriteProtectionBody(false)
async projectDemoTaskTaskCompleted(eventMessage: TaskCompletedEventMessage, methods: ActiveTaskOverviewRepositoryMethods): Promise<void> {
await methods.delete(
{
where: {
taskId: eventMessage.aggregateId,
}
}
);
}

@OverwriteProtectionBody(false)
applyDemoTaskTaskAdded(existingProjectedEntity: ActiveTaskOverviewEntity | null, eventMessage: TaskAddedEventMessage): ActiveTaskOverviewEntity {
const newProjectedEntity = existingProjectedEntity ? new ActiveTaskOverviewEntity({...existingProjectedEntity}) : new ActiveTaskOverviewEntity();
newProjectedEntity.taskId = eventMessage.aggregateId;
newProjectedEntity.title = eventMessage.payload.title;
newProjectedEntity.description = eventMessage.payload.description;
newProjectedEntity.assigneeId = eventMessage.payload.assigneeId;

return newProjectedEntity;
}
}

Key Concepts

  • projectionName and streamNames Properties:

    • projectionName: Identifies the specific projection this projector handles.
    • streamNames: Specifies the event streams from which this projector listens for events.
  • projectMethods Property:

    • Maps specific events to their processing methods.
    • For example, projectDemoTaskTaskAdded handles the TaskAdded event, while projectDemoTaskTaskCompleted handles the TaskCompleted event.
  • Project Methods:

    • projectDemoTaskTaskAdded: Fetches existing projected entities, applies updates from the event, and saves the updated entities.
    • projectDemoTaskTaskCompleted: Deletes the projected entity based on the event.
  • Apply Methods:

    • applyDemoTaskTaskAdded: Updates the projected entity with values derived from the event message.

Best Practices

  • Consistency: Ensure projections accurately reflect the current state as per the events processed.
  • Error Management: Implement robust error handling to prevent failures in projection from affecting system stability.
  • Performance: Optimize the projection logic to handle high volumes of events efficiently.
Error Handling

By default, errors are logged to ensure processing continues for subsequent events. If you require stopping processing upon an error, customize the error handling logic in the catch block.