Skip to content

feat: Phase 1 — Core framework + KurrentDB integration#1

Merged
alexeyzimarev merged 32 commits intomainfrom
feature/phase1-implementation
Mar 20, 2026
Merged

feat: Phase 1 — Core framework + KurrentDB integration#1
alexeyzimarev merged 32 commits intomainfrom
feature/phase1-implementation

Conversation

@alexeyzimarev
Copy link
Copy Markdown
Contributor

Summary

Complete Go port of the Eventuous Event Sourcing library — Phase 1 covering core framework and KurrentDB integration.

What's included

Core module (core/) — near-zero external deps:

  • Domain: Aggregate[S] with fold pattern, Apply, guards (EnsureNew/EnsureExists)
  • Persistence: EventReader/EventWriter/EventStore interfaces, LoadState, LoadAggregate, StoreAggregate
  • Serialization: TypeMap (bidirectional type registry), Codec interface, JSON implementation
  • Command services: Functional Service[S] (primary) and AggregateService[S] (optional DDD pattern)
  • Subscriptions: EventHandler, middleware chain (WithConcurrency, WithPartitioning, WithLogging), CheckpointCommitter with gap detection
  • Test infrastructure: In-memory store, store conformance suite (22 tests), subscription conformance suite, shared Booking test domain

KurrentDB module (kurrentdb/) — EventStoreDB/KurrentDB integration:

  • Store implementing full EventStore interface (append, read forward/backward, delete, truncate)
  • Catch-up subscriptions (stream + $all) with checkpoint support
  • Persistent subscriptions (stream + $all) with ack/nack
  • Integration tests via testcontainers (no manual Docker setup needed)

OTel module (otel/) — OpenTelemetry observability:

  • TracedCommandHandler decorator (tracing spans + duration histogram + error counter)
  • TracingMiddleware for subscriptions

CI pipeline (.github/workflows/ci.yml):

  • Lint (go vet + gofmt)
  • Build (parallel across modules)
  • Core + OTel unit tests (no infra)
  • KurrentDB integration tests (testcontainers)

Design principles

  • Functional-first: Pure functions over OOP. Type switch fold instead of handler registration.
  • Idiomatic Go: Composition over inheritance. Middleware chains. context.Context + errors. Functional options.
  • Multi-module: core/ has near-zero deps. Each integration is a separate Go module.

Spec & plan

  • Design spec: docs/specs/2026-03-20-phase1-design.md
  • Implementation plan: docs/specs/2026-03-20-phase1-plan.md

Test plan

  • cd core && go test -race ./... — 40+ unit tests across all core packages
  • cd otel && go test -race ./... — 4 OTel tracing/metrics tests
  • cd kurrentdb && go test -race -timeout 300s ./... — integration tests against real KurrentDB via testcontainers
  • CI pipeline passes on GitHub Actions

🤖 Generated with Claude Code

alexeyzimarev and others added 27 commits March 20, 2026 14:49
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements StreamName (typed string with Category/ID split on first dash),
ExpectedVersion and ExpectedState constants, Metadata map with immutable
WithCorrelationID/WithCausationID helpers, and sentinel error values.
All behaviour is covered by six unit tests (TDD green).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements TypeMap in core/codec with Register[E], TypeName, and
NewInstance. Uses reflect.Type as the key for the forward map so both
pointer and value forms of a type resolve to the same entry.
Concurrent-safe via sync.RWMutex. Idempotent registration; errors on
conflicts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Introduce the Codec interface (Encode/Decode) and JSONCodec backed by
TypeMap for event type resolution. Codec.Decode dereferences the
*T returned by TypeMap.NewInstance so callers always receive a value,
not a pointer.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements the generic Aggregate[S] type that tracks state via a fold
function, accumulates pending changes for optimistic concurrency, and
provides EnsureNew/EnsureExists guards for creation and mutation commands.
Includes all 10 mirrored tests from the .NET Eventuous aggregate test suite.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements a thread-safe in-memory EventStore (memstore.Store) under
core/test/memstore/ that satisfies the store.EventStore interface.
Provides optimistic-concurrency checks, backwards reads, stream
deletion, and truncation — no infrastructure required.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ory store

Implements storetest.RunAll with sub-suites for Append, Read, ReadBackwards,
OptimisticConcurrency, StreamExists, DeleteStream, and TruncateStream. Fixes
a uint64 overflow bug in memstore.ReadEventsBackwards where passing ^uint64(0)
as the start sentinel was cast to int(-1), bypassing the bounds clamp.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements store helper functions for the functional command service and
aggregate command service. LoadState reads all events from a stream,
folds them into state, and handles IsNew/IsExisting/IsAny semantics.
LoadAggregate wraps LoadState to reconstruct an Aggregate[S].
StoreAggregate appends pending changes with optimistic concurrency control.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ut.Domain

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements Service[S] with On[C,S]() registration, Handle() pipeline
(load state → act → append → fold), and 8 tests covering IsNew/IsExisting/IsAny,
no-op, and handler-not-found scenarios.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implement AggregateService[S] and OnAggregate registration in core/command/aggservice.go.
The service loads an aggregate via store.LoadAggregate, enforces IsNew/IsExisting/IsAny guards,
invokes the handler's Act function (which records events via agg.Apply), and persists changes
with store.StoreAggregate. Covers the full test matrix: OnNew success/conflict, OnExisting
success/missing, OnAny new and existing streams, handler not found, and no-op (no Apply called).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements the subscription package with Subscription interface,
EventHandler/HandlerFunc, ConsumeContext, and Middleware chain
including WithLogging, WithConcurrency, and WithPartitioning.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements CheckpointStore interface and CheckpointCommitter that safely
batches checkpoint writes while ensuring no position is committed until
all prior sequence numbers have been processed, preventing data loss
under concurrent/out-of-order event processing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds subtest package with TestConsumeProducedEvents and TestResumeFromCheckpoint
exported test functions any subscription implementation can call to verify conformance,
along with collectingHandler and memCheckpointStore helpers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add KurrentDB event store implementation that satisfies the
store.EventStore interface using the KurrentDB Go client. All 21
conformance tests pass against a live KurrentDB instance.

Also update storetest to use per-process unique stream names and
export a NewCodec helper so external store implementations can
reuse the conformance test codec.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace hardcoded localhost:2113 connection with testcontainers-go to
start an ephemeral EventStoreDB container per test run, removing the
docker-compose dependency for tests. Also fix isStreamNotFound to add
a string-based fallback for errors from rs.Recv() that don't unwrap
into *kurrentdb.Error.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements CatchUp subscription that consumes events from KurrentDB
via either a single stream or the $all stream. Supports checkpoint-
based resumption, middleware chaining, server-side filters, and
link resolution. Includes integration tests for stream consumption,
$all consumption, and checkpoint resume.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implement server-managed persistent subscriptions for KurrentDB with
auto-creation of subscription groups, ack on handler success, and nack
with retry on handler failure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implements TracedCommandHandler wrapping any CommandHandler[S] with OTel
span creation and duration/error metrics, and TracingMiddleware providing
a subscription.Middleware that creates a span per consumed event.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…st package

Wire the subscription conformance suite from core/test/subtest/ to the
KurrentDB catch-up subscription. Export TestEvent type and add NewCodec()
to the subtest package so external modules can build a matching codec.
Add per-process runID to stream names for test isolation against
persistent stores.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implements TracedCommandHandler wrapping command.CommandHandler with OTel spans,
duration histogram, and error counter; TracingMiddleware for subscriptions adds
per-event spans with OK/error status. Adds missing error-path test and success
span.SetStatus(Ok) to subscription middleware.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Exercises the full stack: codec → KurrentDB store → functional command
service (BookRoom + RecordPayment) → catch-up subscription, using the
shared testdomain types and a real KurrentDB container.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Lint: go vet + gofmt check across all modules
- Build: parallel build of core, kurrentdb, otel
- Test core: unit tests with race detector (no infra needed)
- Test otel: unit tests with race detector (no infra needed)
- Test kurrentdb: integration tests via testcontainers (Docker)

Runs on PRs and pushes to main/dev.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds copyright and license headers to all 55 .go files matching
the .NET Eventuous convention. Also adds the full LICENSE file.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 55c75accbc

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +64 to +68
defer func() {
<-sem // release slot
wg.Done()
}()
_ = next.HandleEvent(ctx, msg)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Wait for concurrent handlers before returning

When WithConcurrency is used under kurrentdb.CatchUp.Start or Persistent.Start, those loops checkpoint or ack the event immediately after HandleEvent returns. This middleware launches next.HandleEvent in a goroutine, drops its error, and then returns nil unless the context is already canceled, so the subscription can mark an event as processed before the handler has actually finished. A crash/restart or handler failure during that background work will therefore skip or lose events.

Useful? React with 👍 / 👎.

Comment on lines +255 to +257
opts := kurrentdb.AppendToStreamOptions{
StreamState: kurrentdb.Any{},
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Honor optimistic concurrency when truncating streams

TruncateStream takes an expected version just like the rest of the EventStore API, but this implementation always calls SetStreamMetadata with StreamState: kurrentdb.Any{}. In a concurrent writer scenario, a stale caller can still advance $tb after someone else has appended new events, so truncation succeeds even though the expected version no longer matches and newer data may be hidden unexpectedly.

Useful? React with 👍 / 👎.

Comment on lines +16 to +17
env:
GO_VERSION: "1.23"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Raise the CI Go version to match module requirements

I checked .github/workflows/ci.yml: every build/test job installs Go 1.23, but kurrentdb/go.mod and otel/go.mod both declare go 1.25.0. The go directive is the minimum required toolchain for a module, so the build, test-kurrentdb, and test-otel jobs will fail on GitHub Actions before they even compile those modules.

Useful? React with 👍 / 👎.

expected = eventuous.ExpectedVersion(agg.OriginalVersion())
}

return writer.AppendEvents(ctx, stream, expected, events)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Update aggregate state after a successful store

After a successful append this helper returns immediately without mutating agg, so the aggregate keeps its old OriginalVersion() and previously persisted Changes(). Any caller that reuses the same aggregate instance for another command will resend already-written events and usually hit ErrOptimisticConcurrency even when no concurrent writer touched the stream.

Useful? React with 👍 / 👎.

alexeyzimarev and others added 2 commits March 20, 2026 17:45
- WithConcurrency: block until handler completes and return its error,
  preventing checkpoint/ack before processing finishes
- TruncateStream: validate expected version against data stream before
  writing metadata, honoring optimistic concurrency
- StoreAggregate: ClearChanges now advances originalVersion so the
  aggregate can be reused without stale version conflicts
- CI: bump Go version to 1.25 to match kurrentdb/otel module requirements

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- TestWithConcurrency_ReturnsHandlerError: verify handler errors are
  propagated, not swallowed (catches fire-and-forget bug)
- TestWithConcurrency_BlocksUntilHandlerCompletes: verify HandleEvent
  blocks until the inner handler finishes (catches premature ack bug)
- TestClearChanges_AdvancesVersion: verify ClearChanges bumps
  OriginalVersion so the aggregate is reusable
- TestClearChanges_ThenApply: verify Apply after ClearChanges produces
  correct versions for a second command cycle
- TestStoreAggregate_AggregateReusableAfterStore: verify the aggregate
  can be stored twice without ErrOptimisticConcurrency

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
alexeyzimarev and others added 3 commits March 20, 2026 17:47
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tterns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add paths-ignore to the push trigger (was only on pull_request).
Also exclude LICENSE and .gitignore from triggering builds.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@alexeyzimarev alexeyzimarev merged commit b042d15 into main Mar 20, 2026
4 checks passed
@alexeyzimarev alexeyzimarev deleted the feature/phase1-implementation branch March 20, 2026 16:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant