Skip to main content

Publishing

In an event-driven system, publishing events accurately and reliably is crucial for maintaining the consistency and integrity of the system state. This section addresses the challenges and implementation details of publishing events in a way that ensures correct event sequencing and reliability.

Challenges

  1. Concurrent Writes: We need to ensure that concurrent writes do not result in multiple instances of the same aggregate version being written.
  2. Event Ordering: To maintain a consistent order, events must be numbered consecutively within each stream.

Implementation

Persistence of Events

After the execution of a command, it's essential to persist all recorded events associated with an aggregate. The persistence step involves saving three entities:

  1. Event: The actual event that occurred.
  2. Aggregate State: The current state of the aggregate.
  3. Outbox Entry: An entry in the outbox for eventual publishing.

These entities are saved together in a transaction, with table locking applied to the aggregate ID to maintain consistency. During this process, the event sequence number (no) is not set to allow for parallel inserts of multiple aggregates.

/**
* Persists an aggregate, saves its pending events to the event stream and outbox.
*
* @param aggregate - The aggregate to persist.
*/
async save(aggregate: TAggregate): Promise<void> {
if (!aggregate.pendingEvents.length) {
throw new ConflictError('No events to persist.');
}

try {
await this.datasource.manager.transaction(async (transactionalEntityManager: EntityManager) => {
const findOptions: FindManyOptions<EventStreamEntity> = {
where: {
aggregateId: aggregate.id,
aggregateVersion: Between(
aggregate.pendingEvents[0].aggregateVersion,
aggregate.pendingEvents[aggregate.pendingEvents.length - 1].aggregateVersion
)
}
};

const result: TEventStreamEntity[] = await transactionalEntityManager.find(
this.eventStreamEntity,
findOptions as FindManyOptions<TEventStreamEntity>
);

if (result.length) {
console.error("Race condition while persisting aggregate.");
throw new Error("Race condition while persisting aggregate.");
}

await Promise.all(aggregate.pendingEvents.map(async (pendingEvent: Event<any>) => {
const event: TEventStreamEntity = new this.eventStreamEntity(pendingEvent);
await transactionalEntityManager.save(this.eventStreamEntity, event);

const eventMessage: EventMessage = new EventMessage({
streamName: this.streamName,
id: pendingEvent.id,
aggregateId: pendingEvent.aggregateId,
aggregateVersion: pendingEvent.aggregateVersion,
name: pendingEvent.name,
payload: JSON.stringify(pendingEvent.payload),
occurredAt: pendingEvent.occurredAt,
});

const outbox: TOutBoxEntity = new this.outBoxEntity({
id: pendingEvent.id,
name: eventMessage.name,
message: JSON.stringify(eventMessage)
});

await transactionalEntityManager.save(this.outBoxEntity, outbox);
}));
});

aggregate.pendingEvents = [];

await this.datasource.manager.save(
new this.aggregateStateEntity({
aggregateId: aggregate.id,
aggregateVersion: aggregate.version,
state: JSON.stringify(aggregate.state)
})
);

} catch (error) {
console.error('Error during transaction saving of Aggregate: ' + error);
throw error;
}
}

Publishing of Events

The event publishing mechanism involves the following steps:

  1. Set Sequence Numbers: Assign sequence numbers to events to ensure they are processed in the correct order.
  2. Publish to Event Bus: Send the events to the event bus.
  3. Remove Outbox Entry: Remove the processed events from the outbox.
import { EventBridgeClient, PutEventsCommand, PutEventsResponse } from "@aws-sdk/client-eventbridge";
import { DataSource, EntityManager, ObjectType } from "typeorm";
import { OutboxEntity } from "./OutboxEntity";
import { EventMessage } from "../persistence/aggregate/EventMessage";

/**
* Publishes events to EventBridge from the outbox.
*
* * Demos:
* - [Publishing](https:/codebricks.tech/docs/publishing)
*/
export class Publisher<T extends OutboxEntity> {

/**
* Initializes Publisher.
*
* @param appDataSource - TypeORM datasource
* @param eventStreamEntity - Event stream TypeORM entity
* @param outBoxEntity - Event outbox TypeORM entity
* @param aggregateName - Aggregate's name
* @param contextName - Context's name
* @param client - EventBridge client
*/
constructor(
readonly appDataSource: DataSource,
readonly eventStreamEntity: ObjectType<T>,
readonly outBoxEntity: ObjectType<T>,
readonly aggregateName: string,
readonly contextName: string,
readonly client: EventBridgeClient = new EventBridgeClient({})
) {}

/**
* Publishes events from the outbox.
*
* @returns A promise that resolves when the events have been published.
*/
async publish(): Promise<void> {
await this.initDataSource();
await this.setSequenceNumbers();
await this.publishEvents();
}

private async initDataSource(): Promise<void> {
if (!this.appDataSource.isInitialized) {
await this.appDataSource.initialize()
.catch((err) => {
console.error("Error during Data Source initialization", err);
});
}
}

private async setSequenceNumbers() {
await this.appDataSource.manager.transaction(async (transactionalEntityManager: EntityManager) => {
const events = await transactionalEntityManager
.getRepository(this.eventStreamEntity)
.createQueryBuilder('events')
.where('events.no IS NULL')
.orderBy('events.occurredAt')
.setLock("pessimistic_write")
.limit(10)
.getMany();

const result = await transactionalEntityManager
.getRepository(this.eventStreamEntity)
.createQueryBuilder()
.select("MAX(no)", "max")
.getRawOne() ?? { max: 0 };

let maxEventNo = result.max;

for (const event of events) {
await transactionalEntityManager.createQueryBuilder()
.update(this.eventStreamEntity)
.set({ no: maxEventNo + 1 })
.where('id = :id', { id: event.id })
.execute();

await transactionalEntityManager.createQueryBuilder()
.update(this.outBoxEntity)
.set({
no: maxEventNo + 1,
message: () => `jsonb_set(message::jsonb, '{no}', '${maxEventNo + 1}', true)`
})
.where('id = :id', { id: event.id })
.execute();

maxEventNo++;
}
});
}

private async publishEvents() {
await this.appDataSource.manager.transaction("READ COMMITTED", async (transactionalEntityManager: EntityManager) => {
const outboxEvents = await transactionalEntityManager
.getRepository(this.outBoxEntity)
.createQueryBuilder(this.outBoxEntity.name)
.setLock("pessimistic_write")
.setOnLocked("skip_locked")
.where('no is not null')
.limit(10)
.getMany();

await Promise.all(outboxEvents.map(async (outboxEvent: T) => {
const inboxEventMessage: EventMessage = new EventMessage(JSON.parse(outboxEvent.message));
const eventMessage: EventMessage = inboxEventMessage;

if (!await this.sendEvent(eventMessage.name, JSON.stringify(eventMessage))) {
throw new Error('Failed to publish event message');
}
}));

await transactionalEntityManager
.getRepository(this.outBoxEntity)
.remove(outboxEvents);
});
}

private async sendEvent(name: string, message: string): Promise<boolean> {
const source: string = `${this.contextName}.${this.aggregateName}`;

const published: PutEventsResponse = await this.client.send(
new PutEventsCommand({
Entries: [{
EventBusName: String(process.env.EVENT_BUS_NAME),
Source: source,
DetailType: name,
Detail: message,
}],
})
);

return published.FailedEntryCount === 0;
}
}