Skip to content

Extract durable projections into Projection class with minimal Consumer/EventStore wiring#320

Draft
Copilot wants to merge 9 commits into
mainfrom
copilot/extend-consumer-projection-function
Draft

Extract durable projections into Projection class with minimal Consumer/EventStore wiring#320
Copilot wants to merge 9 commits into
mainfrom
copilot/extend-consumer-projection-function

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented May 29, 2026

This change introduces a dedicated Projection concept and keeps durable projection restore with HMAC verification, while cleanly separating projection logic (how events are interpreted) from consumer iteration/persistence (what is consumed and where).

  • New Projection API

    • Added Projection class for projection execution and optional persistence/restore of projection definitions.
    • Supports:
      • reducer function: (state, event) => state
      • per-type reducer map: { [eventType]: (state, event) => state }
    • Supports optional matcher, typeAccessor, and hmac.
    • Added projection.apply(...), projection.handle(stream), projection.persist(...), and restore helpers.
  • Composable projections

    • Added CompositeProjection to combine projections into a single composed read model.
    • Combined projection exposes merged types and nested composed state.
  • Consumer ↔ Projection integration

    • consumer.project(projection) and projection.subscribe(consumer) now behave equivalently for wiring and durability:
      • attach projection application as consumer 'data' handling
      • persist projection sidecar if not already persisted
      • remain idempotent for restore/reconnect flows.
    • API/docs align both paths as equivalent entrypoints depending on user workflow.
  • EventStore integration

    • eventstore.getProjection(...) now supports positional arguments:
      • eventstore.getProjection(name, handlers, initialState, matcher)
      • restores persisted projections when handlers is omitted.
    • EventStore-level projection defaults are derived directly from storage/runtime config:
      • projection HMAC uses storage.hmac
      • projection type accessor mapping is initialized in the EventStore constructor.
    • Updated eventstore.getConsumer(...) to restore and reconnect an existing consumer-side projection sidecar automatically.
  • Projection persistence + trusted restore

    • Projection definitions continue to be persisted to projection sidecar files.
    • Function handlers/matchers are serialized with buildMetadataForMatcher(...).
    • On restore, HMAC is verified before deserialization.
  • Shared file persistence and path-safety utilities

    • Extracted/reused safeUnlink and writeFileAtomic in src/utils/fsUtil.js.
    • Added reusable filename/path safety helpers for safe relative names and root-bounded path resolution.
    • Consumer initialization now validates/sanitizes indexName/identifier and resolves state file paths within the expected root directory.
    • EventStore stream-name/type validation now reuses the shared filename safety utility.
    • isProjectionDefinitionObject now uses shared isPlainObject().
  • Consumer file handling hardening

    • Consumer cleanup handles only consumer state temp files.
    • Projection persistence/cleanup responsibility stays within Projection.
    • Added setState idempotency fast-path (if (this.state === newState) return;).
  • Docs and behavioral coverage

    • Updated API/consumer docs to reflect the minimal API path and updated getProjection signature.
    • Added consumer docs example including a predefined Projection instance in composition.
    • Added a state-shape hint for CompositeProjection results in consumer docs.
    • Removed consumer.createProjection(...) usage and docs.
    • Added/updated tests for projection attachment/restore parity, EventStore projection defaults, composition behavior, HMAC-protected restore, and new fs/path safety helpers.
    • Added method docblocks to Projection/CompositeProjection.
const projection = eventstore.getProjection('user-count', {
  UserCreated: (state) => state + 1,
  UserDeleted: (state) => state - 1
}, 0);

const consumer = eventstore.getConsumer('users', 'user-count', projection.initialState);
consumer.project(projection);
// equivalent durable path:
// projection.subscribe(consumer);

// After process restart/reopen, EventStore restores and reconnects the projection automatically.

Copilot AI changed the title Add durable consumer projections with trusted restore Add durable consumer projections with persisted, HMAC-verified restore May 29, 2026
Copilot finished work on behalf of albe May 29, 2026 17:26
Copilot AI requested a review from albe May 29, 2026 17:26
@albe
Copy link
Copy Markdown
Owner

albe commented May 30, 2026

@copilot I'd rather like to extract the Projection concept into an own class that should only encapsulate the execution, and optional persisting and restoring of the projection definition. It holds the concept of how events are projected into state, as opposed to the consumer that holds the concept of what events should be consumed (through stream and persistent position). Or put differently, a projection is an in-memory model for interpretation of events, while the consumer is a persistent iterator over events. Combining both gives a full read model that can be defined at runtime but stays persistent. Alone, the projection can be used for ad-hoc decision models. Also, a Projection bridges the gap between Consumer and DCB, where not a single stream is the relevant point of
A projection should be composable, so it should be possible to create a new projection out of a combination of other projections.
The projector should receive a name, the definition together with initial state and an optional matcher, and options which include an hmac and an (optional) typeAccessor function.
We then need a clean API to connect a Consumer to a Projection, possibly through the EventStore which provides the type accessor and hmac from the storage.

Here's some examples of how the usage should look (taking inspiration from https://dcb.events/topics/projections/#basic-functionality)

const userCount = new Projection('userCount', {
	initialState: 0,
	handlers: {
		UserCreated: (state, event) => state + 1
		UserDeleted: (state, event) => state - 1
	},
	matcher: {}	// an optional stream matcher predicate like in query() and getEventStream()
}, {
	typeAccessor: (event) => event.payload?.type
});
// OR
const userCount = eventStore.getProjection('userCount', {
	initialState: 0,
	handlers: {
		UserCreated: (state, event) => state + 1
		UserDeleted: (state, event) => state - 1
	},
	matcher: {}	// an optional stream matcher predicate like in query() and getEventStream()
});	// Passes the typeAccessor from event store config

const newStateAfterCreate = userCount.apply(currentState, new UserCreated(...));	// apply a single event

// Ad-hoc usage
const { stream, condition } = eventStore.query(userCount.types, userCount.matcher);
const count = userCount.handle(stream); // reset state to initialState and handle all events in a stream, calling projection.state = projection.apply(projection.state, event) for each
userCount.state; // 4 === count
// t.b.d. what if the userCount has projected some events before and we have the same events here again. There's two options: handle = reset to initialState before processing, making a Projection handle call a real ad-hoc one shot path
// OR Projection remembers last seen global event position and user needs to do eventStore.query(userCount.types, userCount.matcher, userCount.position)
if (count === X) {
   eventStore.commit('user-monitoring', new UserLimitReached(), condition);
}

// Connect a projection to a durable consumer to keep a continuous model
const userCountConsumer = eventStore.getConsumer('userCount');
userCountConsumer.project(userCount); // Initializes Projection with initialState and registers the apply method as the consumer 'data' handler like this: on('data', event => consumer.setState(state => projection.apply(state, event)))
userCountConsumer.state; // 4
// Next time the Consumer is fetched via getConsumer('userCount'), the projection is restored and registered automatically
// Alternatively, the other way around:
userCount.subscribe(userCountConsumer); // Will make this projection durable and continue from where the consumer was last persisted
userCount.state; // 4

userCount.persist(hmac);	// stores the projection definition, using the name as file name and buildMetadataForMatcher to safely store the handler functions and definition. All handler functions are toString()'d before saving, so the definition object can directly be JSON.stringify'd
// restores the projection definition
const userCount = Projection.restore('userCount', hmac, {
	typeAccessor: (event) => event.payload?.type
});

// Projection composition
const userOverview = new CompsiteProjection('userOverview', {
	// composition of existing projections
	userCount,
	lastUser,
	...,
	// inline projection, maybe not needed
	userLoss: {
		initialState: {},
		handlers: {
			UserDeleted: (state, event) => (period) => ({ [period]: (state[period] || 0) + 1 })(YearMonth(event.payload.createdAt))
		}
	}
}, {
	typeAccessor: (event) => event.payload?.type
});
userOverview.handle(stream); // delegates to the respective sub-projection handlers or inline projection handlers
userOverview.types; // combination of all projections' event types
userOverview.state; // { userCount: 4, lastUser: { name: 'John Doe', createdAt: '2026-05-29T14:35:13' }, userLoss: { '2026-05': 1 } }

Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 30, 2026

@copilot I'd rather like to extract the Projection concept into an own class that should only encapsulate the execution, and optional persisting and restoring of the projection definition. It holds th...

Implemented this in commit 2e6f849.

Projection logic is now extracted into Projection/CompositeProjection classes (execution + optional persist/restore), Consumer now supports project(projection) and createProjection(...) delegates through Projection, Projection.subscribe(consumer) is available, and EventStore.getProjection(name, definition?, options?) provides store-level defaults for typeAccessor/HMAC. I also added tests and updated API/consumer docs for the new usage.

Copilot AI changed the title Add durable consumer projections with persisted, HMAC-verified restore Extract durable projections into Projection class with Consumer/EventStore integration May 30, 2026
Copilot finished work on behalf of albe May 30, 2026 20:03
Copy link
Copy Markdown
Owner

@albe albe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot address the comments.

Comment thread src/Consumer.js
Comment thread src/Consumer.js Outdated
Comment thread src/Consumer.js Outdated
Comment thread src/Consumer.js Outdated
Comment thread src/EventStore.js Outdated
}
const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
const projectionTypeAccessor = this.typeAccessor
? (event) => this.typeAccessor(event?.payload || event)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have an ambiguity here between EventStore typeAccessor, which receives the "domain event" (because we need it before persisting a domain event to ensure existence of a Type stream), while the Projection receives "stored events" containing payload and metadata. I think this needs to be cleaned up a bit, because we also have the matcher in the query and EventStream, which acts on pure domain events but receives the storage event metadata as additional parameter. At the consumer, which acts upon the Storage layer, we see the "stored event", which implies some knowledge about the storage schema, that is suboptimal. But since the 'data' event on the Consumer is supposed to be compatible with the nodejs Stream.Readable interface, it receives only one parameter. So the Consumer 'data' event needs to stay low level, but maybe there is a second layer missing, which destructures this into an (payload, metadata) event and then Projection in turn could subscribe to that? It would bypass the autostart of Stream.Readable though.
So for now we stay with this.

Comment thread src/EventStore.js Outdated
Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 31, 2026

@copilot address the comments.

Done in commit 2f46add. I applied the @copilot-tagged review comments and updated code/tests/docs accordingly.

Copilot AI changed the title Extract durable projections into Projection class with Consumer/EventStore integration Extract durable projections into Projection class with minimal Consumer/EventStore wiring May 31, 2026
Copilot finished work on behalf of albe May 31, 2026 11:08
Copilot AI requested a review from albe May 31, 2026 11:08
Copy link
Copy Markdown
Owner

@albe albe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot I think I would like the slightly change the API of eventstore.getProjection to receive single parameters for initialState, handlers and matcher, to more closely resemble the existing APIs of getEventStream and getConsumer. The order should probably put handlers first, since those are the deciding part of the Projection. Then maybe the matcher and last the initialState, or maybe the other way around. I'm uncertain which is more important/frequent. Assuming a Projection is connected to an already filtered eventstream, then probably initialState is more important.

Comment thread docs/consumers.md
Comment thread src/EventStore.js Outdated
Comment thread src/Projection.js Outdated
Comment thread src/Projection.js Outdated
Comment thread docs/api.md Outdated
Comment thread src/EventStore.js Outdated
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Jun 4, 2026

@copilot I think I would like the slightly change the API of eventstore.getProjection to receive single parameters for initialState, handlers and matcher, to more closely resemble the existing APIs of...

Updated in commits a222f4c and f9e7fdd: eventstore.getProjection now supports positional parameters as getProjection(name, handlers, initialState, matcher), while still restoring persisted projections when handlers are omitted.

Copilot finished work on behalf of albe June 4, 2026 17:10
Copilot AI requested a review from albe June 4, 2026 17:10
Copy link
Copy Markdown
Owner

@albe albe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot see the comments

Comment thread src/Consumer.js Outdated
Comment thread src/Consumer.js Outdated
Comment thread src/EventStore.js Outdated
Comment thread docs/consumers.md
Comment thread src/Consumer.js
Comment thread src/Projection.js
Comment thread src/Consumer.js
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Jun 4, 2026

@copilot see the comments

Addressed the requested threads in commit 30a73db.

Copilot finished work on behalf of albe June 4, 2026 20:17
Copilot AI requested a review from albe June 4, 2026 20:17
@albe
Copy link
Copy Markdown
Owner

albe commented Jun 4, 2026

@copilot resolve the merge conflicts in this pull request

Copy link
Copy Markdown
Contributor Author

Copilot AI commented Jun 4, 2026

@copilot resolve the merge conflicts in this pull request

Resolved in 143d9b9. I merged origin/main into this branch, resolved the conflicts in index.js, src/Consumer.js, src/EventStore.js, and test/EventStore.spec.js, and verified with npm test plus validation checks.

Comment thread src/Consumer.js Outdated
Comment thread src/Projection.js Outdated
Copilot finished work on behalf of albe June 5, 2026 19:24
Copilot AI requested a review from albe June 5, 2026 19:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants