Building blocks for CQRS/ES, inspired by Lokad.CQRS.
CQRS and Event Sourcing are simple in a single process (example), but a minefield in the cloud.
node-cqrs handles the "boring but hard" distributed plumbing - concurrency, message delivery, projections, and rehydration - so you can focus on your domain logic.
- Reliable Consistency: Per-aggregate FIFO handling and conflict-safe writes with optimistic concurrency.
- Resilient Projections: Restart-safe views with checkpoints, readiness gates, and locking.
- Fast Rehydration: Automatic snapshotting and selective event restores.
- Distributed Sagas: Built-in event correlation and origin propagation for complex workflows.
- Smart Pipelines: Pluggable dispatching with back-pressure and concurrency limits.
- Pluggable by Design: Thin interfaces on every component - swap any piece, without patching the library or your domain code.
The core is infrastructure-agnostic, but the heavy lifting for common stacks is done, so you can mix and match sub-modules to fit your environment:
node-cqrs/sqlite– Embedded per-process event storage and/or views.node-cqrs/mongodb– Distributed event storage and persistent projection views for multi-process deployments.node-cqrs/rabbitmq– Robust, distributed command and event bus.node-cqrs/redis– Redis-backed persistent projection views for distributed deployments.
- Overview
- Installation
- ContainerBuilder
- Commands
- Write Model (Aggregates)
- Read Model (Projections and Views)
- Sagas
- Infrastructure Modules
- OpenTelemetry
- Examples
Domain logic lives in three building blocks:
- Aggregates - handle commands and emit events
- Sagas - manage processes by reacting to events and enqueueing follow-up commands
- Projections - consume events and update views
Commands and events are loosely typed objects implementing the IMessage interface:
interface IMessage<TPayload = unknown> {
/** Event or command type */
type: string;
/** Target aggregate identifier for commands, originating aggregate identifier for events */
aggregateId?: Identifier;
/** Aggregate version at the time of the message */
aggregateVersion?: number;
/** Starter event ids of sagas associated with this message, keyed by saga descriptor */
sagaOrigins?: Record<string, string>;
/** Business data */
payload: TPayload;
/** Optional metadata/context (e.g. auth info, request id); set on commands, copied to events */
context?: any;
}Message delivery is handled by the following components, in order:
- Command Bus - routes commands to handlers
- Aggregate Command Handler - restores aggregate state and executes commands
- Event Store - runs the event dispatch pipeline (e.g. encoding, persistence), then publishes events to the event bus for delivery to all subscribers
- Saga Event Handler - restores saga state and applies events
src/,tests/, andexamples/are good entry points - the codebase is intentionally small and readable.
npm install node-cqrsNode.js 16+ and browsers are supported.
Wire buses, the event store, and your domain components with dependency injection:
const builder = new ContainerBuilder();
builder.register(InMemoryEventStorage); // implements IEventStorageReader, IDispatchPipelineProcessor, and IIdentifierProvider
builder.registerAggregate(UserAggregate);
builder.registerProjection(UsersProjection, 'usersView');
builder.registerSaga(WelcomeEmailSaga);
const { commandBus, eventStore, usersView } = builder.container();Manual setup (without DI container)
const commandBus = new InMemoryMessageBus();
const eventBus = new InMemoryMessageBus();
const eventStorage = new InMemoryEventStorage();
const eventStore = new EventStore({
eventStorageReader: eventStorage,
identifierProvider: eventStorage,
eventDispatchPipeline: [eventStorage],
eventBus
});
const aggregateCommandHandler = new AggregateCommandHandler({ eventStore, aggregateType: UserAggregate });
aggregateCommandHandler.subscribe(commandBus);
const projection = new UsersProjection();
projection.subscribe(eventStore);
projection.restore(eventStore);
const users = projection.view;Commands represent intent. Send them via commandBus:
commandBus.send('signupUser', undefined, { payload: { profile, password } });
// or
commandBus.send({ type: 'signupUser', payload: { profile, password } });Commands are handled by Aggregates and may also be enqueued by Sagas.
Aggregates handle commands, validate business rules, and emit events. Minimal contract (IAggregate):
interface IAggregate {
/**
* Applies a single event to update the aggregate's internal state.
*
* This method is used primarily when rehydrating the aggregate
* from the persisted sequence of events
*
* @param event - The event to be applied
*/
mutate(event: IEvent): void;
/**
* Processes a command by executing the aggregate's business logic,
* resulting in new events that capture the state changes.
* It serves as the primary entry point for invoking aggregate behavior
*
* @param command - The command to be processed
* @returns A set of events produced by the command
*/
handle(command: ICommand): IEventSet | Promise<IEventSet>;
}The recommended base class. Public method names are matched to command types - createUser() handles createUser:
class UserAggregate extends AbstractAggregate<void> {
createUser(payload: CreateUserCommandPayload) {
this.emit('userCreated', { username: payload.username });
}
}Override static get handles() to declare command types explicitly.
Keep state separate from command handlers - derive it by projecting the aggregate's own events:
class UserAggregateState {
passwordHash: string;
passwordChanged(event: IEvent<PasswordChangedEventPayload>) {
this.passwordHash = event.payload.passwordHash;
}
}
class UserAggregate extends AbstractAggregate<UserAggregateState> {
protected readonly state = new UserAggregateState();
changePassword(payload: ChangePasswordCommandPayload) {
if (md5(payload.oldPassword) !== this.state.passwordHash)
throw new Error('Invalid password');
this.emit('passwordChanged', { passwordHash: md5(payload.newPassword) });
}
}State must not throw - all validation belongs in the aggregate command handler.
Constructor arguments are injected automatically by the DI container:
class UserAggregate extends AbstractAggregate {
constructor({ id, authService }) {
super({ id });
this._authService = authService;
}
async signupUser(payload) {
await this._authService.registerUser(payload);
}
}
builder.register(AuthService).as('authService');
builder.registerAggregate(UserAggregate);Projections listen to events and update views. Minimal contract (IProjection):
interface IProjection<TView> extends IObserver {
readonly view: TView;
/** Subscribe to new events */
subscribe(eventStore: IObservable): Promise<void> | void;
/** Restore view state from not-yet-projected events */
restore(eventStore: IEventStorageReader): Promise<void> | void;
/** Project new event */
project(event: IEvent): Promise<void> | void;
}Same name-matching rule as AbstractAggregate - userCreated() handles the userCreated event:
class UsersProjection extends AbstractProjection<Map<string, {
username: string
}>> {
constructor() {
super();
this.view = new Map();
}
userCreated(event: IEvent<UserCreatedEventPayload>) {
this.view.set(event.aggregateId, {
username: event.payload.username
});
}
}Override static get handles() to declare event types explicitly.
For persistent views and safe restarts, implement IViewLocker and IEventLocker on the projection view to enable catch-up and last-processed checkpoints.
// optional interface for container typing
interface IMyContainer extends IContainer {
usersView: UsersView;
}
const builder = new ContainerBuilder<IMyContainer>();
builder.registerProjection(UsersProjection, 'usersView');
const { usersView } = builder.container();For projections that manage and need to expose multiple views:
builder.registerProjection(UsersProjection).as('usersProjection');
builder.register(c => c.usersProjection.users).as('usersView');
builder.register(c => c.usersProjection.connections).as('connectionsView');Sagas coordinate multi-step processes by reacting to events and enqueueing follow-up commands.
class WelcomeEmailSaga extends AbstractSaga {
userSignedUp(event) {
this.enqueue('sendWelcomeEmail', undefined, {
email: event.payload.email
});
}
}
builder.register(EventIdAugmentor).as('eventIdAugmenter'); // required: adds event.id
builder.registerSaga(WelcomeEmailSaga);- Handler methods are named after event types (
userSignedUphandlesuserSignedUp) this.enqueue(commandType, aggregateId, payload)produces commandsEventIdAugmentormust be in the dispatch pipeline - starter events useevent.idas the saga originstatic sagaDescriptor(optional) - stable key formessage.sagaOrigins, defaults to class name
handle(event) runs the handler before mutate(event), so handlers always see the previous state.
Saga context is tracked in message.sagaOrigins[sagaDescriptor], storing the starter event id. A saga starts when sagaOrigins[sagaDescriptor] is absent and continues when it is present. A single event type can start multiple saga types.
Optional: explicit startsWith/handles
By default, the saga starts on any handled event that does not have sagaOrigins[sagaDescriptor] and continues when it does.
For strict, explicit routing:
static startsWith: event types allowed to start a sagastatic handles: additional event types to subscribe to
Manual wiring (without DI container)
const commandBus = new InMemoryMessageBus();
const eventBus = new InMemoryMessageBus();
const eventStorage = new InMemoryEventStorage();
const eventStore = new EventStore({
eventStorageReader: eventStorage,
identifierProvider: eventStorage,
eventDispatchPipeline: [
new EventIdAugmentor({ identifierProvider: eventStorage }),
eventStorage
],
eventBus
});
SignupAggregate.register(eventStore, commandBus);
WelcomeEmailSaga.register(eventStore, commandBus);Minimal contract (ISaga):
interface ISaga {
/**
* Apply a historical event to restore saga state.
*/
mutate(event: IEvent): unknown | Promise<unknown>;
/**
* Process an incoming event.
*
* @returns Commands produced by the saga in response to the event
*/
handle(event: IEvent): ReadonlyArray<ICommand> | Promise<ReadonlyArray<ICommand>>;
}Swap implementations by registering different classes in the DI container. All modules below implement the same interfaces - pick what fits your deployment.
| Module | Event storage | Object view storage | Projection wiring / execution | Message buses |
|---|---|---|---|---|
node-cqrs |
InMemoryEventStorage, InMemorySnapshotStorage |
InMemoryView, InMemoryLock |
AbstractProjection |
InMemoryMessageBus |
node-cqrs/sqlite |
SqliteEventStorage |
SqliteObjectView, SqliteObjectStorage, SqliteViewLocker, SqliteEventLocker |
AbstractSqliteObjectProjection, AbstractSqliteView |
- |
node-cqrs/mongodb |
MongoEventStorage |
MongoObjectView, MongoObjectStorage, MongoViewLocker, MongoEventLocker |
AbstractMongoObjectProjection, AbstractMongoView |
- |
node-cqrs/redis |
- | RedisView, RedisObjectStorage, RedisViewLocker, RedisEventLocker |
AbstractRedisProjection |
- |
node-cqrs/rabbitmq |
- | - | - | RabbitMqGateway, RabbitMqCommandBus, RabbitMqEventBus |
node-cqrs/workers |
- | - | AbstractWorkerProjection, WorkerProxyProjection |
- |
Where aggregate events are persisted and replayed from.
| Implementation | Import | Peer deps | Notes |
|---|---|---|---|
InMemoryEventStorage |
node-cqrs |
- | Dev/test only; data lost on restart (example) |
SqliteEventStorage |
node-cqrs/sqlite |
better-sqlite3 |
Embedded, single-process (example) |
MongoEventStorage |
node-cqrs/mongodb |
mongodb |
Distributed, multi-process (example) |
Where projections store and query their read-side state. Each persistent backend provides the same layered set of building blocks:
| Layer | Purpose |
|---|---|
| Object storage | Key/value CRUD with optimistic concurrency |
| View locker | Prevents concurrent schema-migration rebuilds - only one process rebuilds at a time; others wait |
| Event locker | Per-event deduplication and last-projected checkpoint |
| Composite view | Combines the above into a single view object |
| Base projection | Wires locking, checkpointing, and error handling automatically |
| Class | Notes |
|---|---|
InMemoryLock |
Simple in-process lock |
InMemoryView |
Simple Map-backed view; restores from events on each restart |
| Class | Role |
|---|---|
SqliteObjectStorage |
Key/value object storage with version-based concurrency |
SqliteViewLocker |
Prevents concurrent schema-migration rebuilds via SQLite row lock |
SqliteEventLocker |
Event deduplication and last-event checkpoint |
AbstractSqliteView |
Base class for relational (non-object) SQLite views with view and event locks embedded |
SqliteObjectView |
Composite view combining the above |
AbstractSqliteObjectProjection |
Base projection wired to SqliteObjectView |
See src/sqlite for additional documentation, and examples/sqlite for runnable project examples
Experimental - not yet validated in production. APIs may change in minor versions.
| Class | Role |
|---|---|
MongoObjectStorage |
Document storage with version-based optimistic concurrency |
MongoViewLocker |
Prevents concurrent schema-migration rebuilds; auto-prolongs lock via token + TTL |
MongoEventLocker |
Event deduplication and last-event checkpoint |
AbstractMongoView |
Base class combining MongoViewLocker + MongoEventLocker |
MongoObjectView |
Composite view combining the above |
AbstractMongoObjectProjection |
Base projection wired to MongoObjectView |
See src/mongodb for additional documentation, and examples/mongodb-views for runnable projection examples.
Experimental - not yet validated in production. APIs may change in minor versions.
| Class | Role |
|---|---|
RedisObjectStorage |
Key/value object storage backed by Redis hashes |
RedisViewLocker |
Prevents concurrent schema-migration rebuilds; auto-prolongs lock via PEXPIRE |
RedisEventLocker |
Event deduplication and last-event checkpoint |
RedisView |
Composite view combining the above |
AbstractRedisProjection |
Base projection wired to RedisView |
See src/redis for additional documentation, and examples/redis for runnable projection examples.
How commands and events move between producers and consumers.
| Implementation | Import | Peer deps | Notes |
|---|---|---|---|
InMemoryMessageBus |
node-cqrs |
- | Single-process; used as both command and event bus (example) |
RabbitMqEventBus |
node-cqrs/rabbitmq |
amqplib |
Fanout delivery to all subscribers (instructions) |
RabbitMqCommandBus |
node-cqrs/rabbitmq |
amqplib |
Point-to-point via durable queue (instructions) |
| Implementation | Import | Notes |
|---|---|---|
InMemorySnapshotStorage |
node-cqrs |
Aggregate snapshot cache in memory, resets on process restart |
AbstractWorkerProjection |
node-cqrs/workers |
Run projections in worker threads (instructions, example) |
Experimental - the Workers module is new and has not been validated in production. APIs may change in minor versions.
Optional distributed tracing via OpenTelemetry. Requires @opentelemetry/api peer dependency. Register a tracerFactory in the container to enable automatic span creation across CQRS components:
import { trace } from '@opentelemetry/api';
builder.register(() => (name: string) => trace.getTracer(`cqrs.${name}`)).as('tracerFactory');See examples/telemetry/index.ts for a full working example.
- examples/user-domain-framework-free - minimal, no-framework CQRS/ES in one file
- examples/user-domain-ts - TypeScript with DI container
- examples/user-domain-cjs - CommonJS
- examples/redis - Redis-backed persistent projection
- examples/sagas-simple - simple saga
- examples/sagas-overlaps - overlapping sagas, multi-step flow
- examples/sqlite - SQLite-backed object storage view
- examples/browser - browser smoke test
- examples/workers-projection - worker thread projection
- examples/mongodb-eventstore - MongoDB event storage with DI container and manual wiring
- examples/mongodb-views - MongoDB-backed projection views with object storage and locking
- examples/telemetry - OpenTelemetry tracing with multiple exporters
TS examples can be run with NodeJS 24+ without transpiling.