feat: Phase 1 — Core framework + KurrentDB integration#1
feat: Phase 1 — Core framework + KurrentDB integration#1alexeyzimarev merged 32 commits intomainfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements StreamName (typed string with Category/ID split on first dash), ExpectedVersion and ExpectedState constants, Metadata map with immutable WithCorrelationID/WithCausationID helpers, and sentinel error values. All behaviour is covered by six unit tests (TDD green). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements TypeMap in core/codec with Register[E], TypeName, and NewInstance. Uses reflect.Type as the key for the forward map so both pointer and value forms of a type resolve to the same entry. Concurrent-safe via sync.RWMutex. Idempotent registration; errors on conflicts. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Introduce the Codec interface (Encode/Decode) and JSONCodec backed by TypeMap for event type resolution. Codec.Decode dereferences the *T returned by TypeMap.NewInstance so callers always receive a value, not a pointer. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements the generic Aggregate[S] type that tracks state via a fold function, accumulates pending changes for optimistic concurrency, and provides EnsureNew/EnsureExists guards for creation and mutation commands. Includes all 10 mirrored tests from the .NET Eventuous aggregate test suite. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements a thread-safe in-memory EventStore (memstore.Store) under core/test/memstore/ that satisfies the store.EventStore interface. Provides optimistic-concurrency checks, backwards reads, stream deletion, and truncation — no infrastructure required. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ory store Implements storetest.RunAll with sub-suites for Append, Read, ReadBackwards, OptimisticConcurrency, StreamExists, DeleteStream, and TruncateStream. Fixes a uint64 overflow bug in memstore.ReadEventsBackwards where passing ^uint64(0) as the start sentinel was cast to int(-1), bypassing the bounds clamp. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements store helper functions for the functional command service and aggregate command service. LoadState reads all events from a stream, folds them into state, and handles IsNew/IsExisting/IsAny semantics. LoadAggregate wraps LoadState to reconstruct an Aggregate[S]. StoreAggregate appends pending changes with optimistic concurrency control. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ut.Domain Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements Service[S] with On[C,S]() registration, Handle() pipeline (load state → act → append → fold), and 8 tests covering IsNew/IsExisting/IsAny, no-op, and handler-not-found scenarios. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implement AggregateService[S] and OnAggregate registration in core/command/aggservice.go. The service loads an aggregate via store.LoadAggregate, enforces IsNew/IsExisting/IsAny guards, invokes the handler's Act function (which records events via agg.Apply), and persists changes with store.StoreAggregate. Covers the full test matrix: OnNew success/conflict, OnExisting success/missing, OnAny new and existing streams, handler not found, and no-op (no Apply called). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements the subscription package with Subscription interface, EventHandler/HandlerFunc, ConsumeContext, and Middleware chain including WithLogging, WithConcurrency, and WithPartitioning. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements CheckpointStore interface and CheckpointCommitter that safely batches checkpoint writes while ensuring no position is committed until all prior sequence numbers have been processed, preventing data loss under concurrent/out-of-order event processing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds subtest package with TestConsumeProducedEvents and TestResumeFromCheckpoint exported test functions any subscription implementation can call to verify conformance, along with collectingHandler and memCheckpointStore helpers. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add KurrentDB event store implementation that satisfies the store.EventStore interface using the KurrentDB Go client. All 21 conformance tests pass against a live KurrentDB instance. Also update storetest to use per-process unique stream names and export a NewCodec helper so external store implementations can reuse the conformance test codec. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace hardcoded localhost:2113 connection with testcontainers-go to start an ephemeral EventStoreDB container per test run, removing the docker-compose dependency for tests. Also fix isStreamNotFound to add a string-based fallback for errors from rs.Recv() that don't unwrap into *kurrentdb.Error. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements CatchUp subscription that consumes events from KurrentDB via either a single stream or the $all stream. Supports checkpoint- based resumption, middleware chaining, server-side filters, and link resolution. Includes integration tests for stream consumption, $all consumption, and checkpoint resume. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implement server-managed persistent subscriptions for KurrentDB with auto-creation of subscription groups, ack on handler success, and nack with retry on handler failure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implements TracedCommandHandler wrapping any CommandHandler[S] with OTel span creation and duration/error metrics, and TracingMiddleware providing a subscription.Middleware that creates a span per consumed event. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…st package Wire the subscription conformance suite from core/test/subtest/ to the KurrentDB catch-up subscription. Export TestEvent type and add NewCodec() to the subtest package so external modules can build a matching codec. Add per-process runID to stream names for test isolation against persistent stores. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implements TracedCommandHandler wrapping command.CommandHandler with OTel spans, duration histogram, and error counter; TracingMiddleware for subscriptions adds per-event spans with OK/error status. Adds missing error-path test and success span.SetStatus(Ok) to subscription middleware. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Exercises the full stack: codec → KurrentDB store → functional command service (BookRoom + RecordPayment) → catch-up subscription, using the shared testdomain types and a real KurrentDB container. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Lint: go vet + gofmt check across all modules - Build: parallel build of core, kurrentdb, otel - Test core: unit tests with race detector (no infra needed) - Test otel: unit tests with race detector (no infra needed) - Test kurrentdb: integration tests via testcontainers (Docker) Runs on PRs and pushes to main/dev. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds copyright and license headers to all 55 .go files matching the .NET Eventuous convention. Also adds the full LICENSE file. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 55c75accbc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
core/subscription/middleware.go
Outdated
| defer func() { | ||
| <-sem // release slot | ||
| wg.Done() | ||
| }() | ||
| _ = next.HandleEvent(ctx, msg) |
There was a problem hiding this comment.
Wait for concurrent handlers before returning
When WithConcurrency is used under kurrentdb.CatchUp.Start or Persistent.Start, those loops checkpoint or ack the event immediately after HandleEvent returns. This middleware launches next.HandleEvent in a goroutine, drops its error, and then returns nil unless the context is already canceled, so the subscription can mark an event as processed before the handler has actually finished. A crash/restart or handler failure during that background work will therefore skip or lose events.
Useful? React with 👍 / 👎.
| opts := kurrentdb.AppendToStreamOptions{ | ||
| StreamState: kurrentdb.Any{}, | ||
| } |
There was a problem hiding this comment.
Honor optimistic concurrency when truncating streams
TruncateStream takes an expected version just like the rest of the EventStore API, but this implementation always calls SetStreamMetadata with StreamState: kurrentdb.Any{}. In a concurrent writer scenario, a stale caller can still advance $tb after someone else has appended new events, so truncation succeeds even though the expected version no longer matches and newer data may be hidden unexpectedly.
Useful? React with 👍 / 👎.
.github/workflows/ci.yml
Outdated
| env: | ||
| GO_VERSION: "1.23" |
There was a problem hiding this comment.
Raise the CI Go version to match module requirements
I checked .github/workflows/ci.yml: every build/test job installs Go 1.23, but kurrentdb/go.mod and otel/go.mod both declare go 1.25.0. The go directive is the minimum required toolchain for a module, so the build, test-kurrentdb, and test-otel jobs will fail on GitHub Actions before they even compile those modules.
Useful? React with 👍 / 👎.
core/store/aggregate.go
Outdated
| expected = eventuous.ExpectedVersion(agg.OriginalVersion()) | ||
| } | ||
|
|
||
| return writer.AppendEvents(ctx, stream, expected, events) |
There was a problem hiding this comment.
Update aggregate state after a successful store
After a successful append this helper returns immediately without mutating agg, so the aggregate keeps its old OriginalVersion() and previously persisted Changes(). Any caller that reuses the same aggregate instance for another command will resend already-written events and usually hit ErrOptimisticConcurrency even when no concurrent writer touched the stream.
Useful? React with 👍 / 👎.
- WithConcurrency: block until handler completes and return its error, preventing checkpoint/ack before processing finishes - TruncateStream: validate expected version against data stream before writing metadata, honoring optimistic concurrency - StoreAggregate: ClearChanges now advances originalVersion so the aggregate can be reused without stale version conflicts - CI: bump Go version to 1.25 to match kurrentdb/otel module requirements Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- TestWithConcurrency_ReturnsHandlerError: verify handler errors are propagated, not swallowed (catches fire-and-forget bug) - TestWithConcurrency_BlocksUntilHandlerCompletes: verify HandleEvent blocks until the inner handler finishes (catches premature ack bug) - TestClearChanges_AdvancesVersion: verify ClearChanges bumps OriginalVersion so the aggregate is reusable - TestClearChanges_ThenApply: verify Apply after ClearChanges produces correct versions for a second command cycle - TestStoreAggregate_AggregateReusableAfterStore: verify the aggregate can be stored twice without ErrOptimisticConcurrency Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tterns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add paths-ignore to the push trigger (was only on pull_request). Also exclude LICENSE and .gitignore from triggering builds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Complete Go port of the Eventuous Event Sourcing library — Phase 1 covering core framework and KurrentDB integration.
What's included
Core module (
core/) — near-zero external deps:Aggregate[S]with fold pattern, Apply, guards (EnsureNew/EnsureExists)EventReader/EventWriter/EventStoreinterfaces,LoadState,LoadAggregate,StoreAggregateTypeMap(bidirectional type registry),Codecinterface, JSON implementationService[S](primary) andAggregateService[S](optional DDD pattern)EventHandler, middleware chain (WithConcurrency,WithPartitioning,WithLogging),CheckpointCommitterwith gap detectionKurrentDB module (
kurrentdb/) — EventStoreDB/KurrentDB integration:Storeimplementing fullEventStoreinterface (append, read forward/backward, delete, truncate)OTel module (
otel/) — OpenTelemetry observability:TracedCommandHandlerdecorator (tracing spans + duration histogram + error counter)TracingMiddlewarefor subscriptionsCI pipeline (
.github/workflows/ci.yml):Design principles
context.Context+ errors. Functional options.core/has near-zero deps. Each integration is a separate Go module.Spec & plan
docs/specs/2026-03-20-phase1-design.mddocs/specs/2026-03-20-phase1-plan.mdTest plan
cd core && go test -race ./...— 40+ unit tests across all core packagescd otel && go test -race ./...— 4 OTel tracing/metrics testscd kurrentdb && go test -race -timeout 300s ./...— integration tests against real KurrentDB via testcontainers🤖 Generated with Claude Code