Skip to content

feat: add multi stream append support #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ jobs:
name: Tests (CI)
uses: ./.github/workflows/tests.yml
with:
image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES).ci.fullname }}
runtime: ci
secrets: inherit
46 changes: 46 additions & 0 deletions .github/workflows/load-configuration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Load KurrentDB Runtime Configuration
on:
workflow_call:
inputs:
runtime:
description: "The runtime's name. Current options are: `ci`, `previous-lts`, `latest`"
type: string

outputs:
runtime:
description: The runtime's name
value: ${{ inputs.runtime }}

registry:
description: The Docker registry
value: ${{ jobs.load.outputs.registry }}

image:
description: The Docker image
value: ${{ jobs.load.outputs.image }}

tag:
description: The Docker image tag
value: ${{ jobs.load.outputs.tag }}

full_image_name:
description: The full Docker image name (including registry, image, and tag)
value: ${{ jobs.load.outputs.full_image_name }}

jobs:
load:
runs-on: ubuntu-latest
outputs:
registry: ${{ steps.set.outputs.registry }}
image: ${{ steps.set.outputs.image }}
tag: ${{ steps.set.outputs.tag }}
full_image_name: ${{ steps.set.outputs.full_image_name }}

steps:
- name: Set KurrentDB Runtime Configuration Properties
id: set
run: |
echo "registry=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].registry }}" >> $GITHUB_OUTPUT
echo "tag=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].tag }}" >> $GITHUB_OUTPUT
echo "image=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].image }}" >> $GITHUB_OUTPUT
echo "full_image_name=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].fullname }}" >> $GITHUB_OUTPUT
6 changes: 4 additions & 2 deletions .github/workflows/lts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
name: Tests (LTS)
uses: ./.github/workflows/tests.yml
with:
image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES).lts.fullname }}
runtime: lts
secrets: inherit

# Will be removed in the future
Expand All @@ -24,5 +24,7 @@ jobs:

uses: ./.github/workflows/plugins-tests.yml
with:
image: "docker.eventstore.com/eventstore-ee/eventstoredb-commercial:24.2.0-jammy"
registry: docker.eventstore.com/eventstore-ee
image: eventstoredb-commercial
tag: 24.2.0-jammy
secrets: inherit
12 changes: 10 additions & 2 deletions .github/workflows/plugins-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ name: enterprise plugins tests workflow
on:
workflow_call:
inputs:
registry:
required: true
type: string
image:
required: true
type: string
tag:
required: true
type: string

jobs:
single_node:
Expand Down Expand Up @@ -46,7 +52,7 @@ jobs:
- name: Execute Gradle build
run: ./gradlew ci --tests ${{ matrix.test }}Tests
env:
KURRENTDB_IMAGE: ${{ inputs.image }}
KURRENTDB_IMAGE: ${{inputs.registry}}/${{ inputs.image }}:${{ inputs.tag }}
SECURE: true

- uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -81,7 +87,9 @@ jobs:
- name: Set up cluster with Docker Compose
run: docker compose up -d
env:
KURRENTDB_IMAGE: ${{ inputs.image }}
KURRENTDB_DOCKER_REGISTRY: ${{ inputs.registry }}
KURRENTDB_DOCKER_IMAGE: ${{ inputs.image }}
KURRENTDB_DOCKER_TAG: ${{ inputs.tag }}

- name: Generate user certificates
run: docker compose --file configure-user-certs-for-tests.yml up
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/previous-lts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ jobs:
name: Tests (Previous LTS)
uses: ./.github/workflows/tests.yml
with:
image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)['previous-lts'].fullname }}
runtime: previous-lts
secrets: inherit
29 changes: 16 additions & 13 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@ name: tests workflow
on:
workflow_call:
inputs:
image:
runtime:
required: true
type: string

jobs:
load_configuration:
uses: ./.github/workflows/load-configuration.yml
with:
runtime: ${{ inputs.runtime }}

single_node:
needs: load_configuration
name: Single node

strategy:
fail-fast: false
matrix:
test: [Streams, PersistentSubscriptions, Telemetry]
test: [Streams, PersistentSubscriptions, Telemetry, MultiStreamAppend]

runs-on: ubuntu-latest
steps:
Expand All @@ -41,16 +47,10 @@ jobs:
- name: Execute Gradle build
run: ./gradlew ci --tests ${{ matrix.test }}Tests
env:
KURRENTDB_IMAGE: ${{ inputs.image }}

- uses: actions/upload-artifact@v4
if: failure()
with:
name: esdb_logs.tar.gz
path: /tmp/esdb_logs.tar.gz
if-no-files-found: error
KURRENTDB_IMAGE: ${{ needs.load_configuration.outputs.full_image_name }}

secure:
needs: load_configuration
name: Secure

strategy:
Expand Down Expand Up @@ -86,7 +86,7 @@ jobs:
- name: Execute Gradle build
run: ./gradlew ci --tests ${{ matrix.test }}Tests
env:
KURRENTDB_IMAGE: ${{ inputs.image }}
KURRENTDB_IMAGE: ${{ needs.load_configuration.outputs.full_image_name }}
SECURE: true

- uses: actions/upload-artifact@v4
Expand All @@ -96,12 +96,13 @@ jobs:
path: /tmp/esdb_logs.tar.gz

cluster:
needs: load_configuration
name: Cluster

strategy:
fail-fast: false
matrix:
test: [Streams, PersistentSubscriptions]
test: [Streams, PersistentSubscriptions, MultiStreamAppend]

runs-on: ubuntu-latest
steps:
Expand All @@ -117,7 +118,9 @@ jobs:
- name: Set up cluster with Docker Compose
run: docker compose up -d
env:
KURRENTDB_IMAGE: ${{ inputs.image }}
KURRENTDB_DOCKER_REGISTRY: ${{ needs.load_configuration.outputs.registry }}
KURRENTDB_DOCKER_IMAGE: ${{ needs.load_configuration.outputs.image }}
KURRENTDB_DOCKER_TAG: ${{ needs.load_configuration.outputs.tag }}

- name: Set up JDK 8
uses: actions/setup-java@v3
Expand Down
46 changes: 15 additions & 31 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.5"

services:
volumes-provisioner:
image: hasnat/volumes-provisioner
Expand All @@ -26,71 +24,57 @@ services:
- volumes-provisioner

esdb-node1: &template
image: ${KURRENTDB_IMAGE:-docker.kurrent.io/eventstore/eventstoredb-ee:lts}
image: ${KURRENTDB_DOCKER_REGISTRY:-docker.kurrent.io/eventstore}/${KURRENTDB_DOCKER_IMAGE:-eventstoredb-ee}:${KURRENTDB_DOCKER_TAG:-lts}
env_file:
- vars.env
environment:
- EVENTSTORE_GOSSIP_SEED=172.30.240.12:2113,172.30.240.13:2113
- EVENTSTORE_INT_IP=172.30.240.11
- EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node1/node.crt
- EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node1/node.key
- EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2111
- EVENTSTORE_REPLICATION_IP=172.30.240.11
- EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node1/node.crt
- EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node1/node.key
- EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2111
ports:
- 2111:2113
networks:
clusternetwork:
ipv4_address: 172.30.240.11
volumes:
- ./certs:/etc/eventstore/certs
- ./certs:/etc/kurrentdb/certs
restart: unless-stopped
depends_on:
- cert-gen

esdb-node2:
<<: *template
env_file:
- vars.env
environment:
- EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.13:2113
- EVENTSTORE_INT_IP=172.30.240.12
- EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node2/node.crt
- EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node2/node.key
- EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2112
- EVENTSTORE_REPLICATION_IP=172.30.240.12
- EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node2/node.crt
- EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node2/node.key
- EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2112
ports:
- 2112:2113
networks:
clusternetwork:
ipv4_address: 172.30.240.12
volumes:
- ./certs:/etc/eventstore/certs
restart: unless-stopped
depends_on:
- cert-gen

esdb-node3:
<<: *template
env_file:
- vars.env
environment:
- EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.12:2113
- EVENTSTORE_INT_IP=172.30.240.13
- EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node3/node.crt
- EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node3/node.key
- EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2113
- EVENTSTORE_REPLICATION_IP=172.30.240.13
- EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node3/node.crt
- EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node3/node.key
- EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2113
ports:
- 2113:2113
networks:
clusternetwork:
ipv4_address: 172.30.240.13
volumes:
- ./certs:/etc/eventstore/certs
restart: unless-stopped
depends_on:
- cert-gen

networks:
clusternetwork:
name: eventstoredb.local
name: kurrentdb.local
driver: bridge
ipam:
driver: default
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/kurrent/dbclient/AppendStreamFailure.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kurrent.dbclient;

public class AppendStreamFailure {
private final io.kurrentdb.v2.AppendStreamFailure inner;

AppendStreamFailure(io.kurrentdb.v2.AppendStreamFailure inner) {
this.inner = inner;
}

public String getStreamName() {
return this.inner.getStream();
}

public void visit(MultiAppendStreamErrorVisitor visitor) {
if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.WRONG_EXPECTED_REVISION) {
visitor.onWrongExpectedRevision(this.inner.getWrongExpectedRevision().getStreamRevision());
return;
}

if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) {
visitor.onAccessDenied(this.inner.getAccessDenied().getReason());
}

if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) {
visitor.onStreamDeleted();
return;
}

if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) {
visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize());
return;
}

throw new IllegalArgumentException("Append failure does not match any known error type");
}
}
27 changes: 27 additions & 0 deletions src/main/java/io/kurrent/dbclient/AppendStreamRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.kurrent.dbclient;

import java.util.Iterator;

public class AppendStreamRequest {
private final String streamName;
private final Iterator<EventData> events;
private final StreamState expectedState;

public AppendStreamRequest(String streamName, Iterator<EventData> events, StreamState expectedState) {
this.streamName = streamName;
this.events = events;
this.expectedState = expectedState;
}

public String getStreamName() {
return streamName;
}

public Iterator<EventData> getEvents() {
return events;
}

public StreamState getExpectedState() {
return expectedState;
}
}
21 changes: 21 additions & 0 deletions src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.kurrent.dbclient;

public class AppendStreamSuccess {
private final io.kurrentdb.v2.AppendStreamSuccess inner;

AppendStreamSuccess(io.kurrentdb.v2.AppendStreamSuccess inner) {
this.inner = inner;
}

public String getStreamName() {
return this.inner.getStream();
}

public long getStreamRevision() {
return this.inner.getStreamRevision();
}

public long getPosition() {
return this.inner.getPosition();
}
}
1 change: 1 addition & 0 deletions src/main/java/io/kurrent/dbclient/FeatureFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ class FeatureFlags {
public final static int PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM = 8;
public final static int PERSISTENT_SUBSCRIPTION_GET_INFO = 16;
public final static int PERSISTENT_SUBSCRIPTION_TO_ALL = 32;
public final static int MULTI_STREAM_APPEND = 64;
public final static int PERSISTENT_SUBSCRIPTION_MANAGEMENT = PERSISTENT_SUBSCRIPTION_LIST | PERSISTENT_SUBSCRIPTION_REPLAY | PERSISTENT_SUBSCRIPTION_GET_INFO | PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM;
}
2 changes: 1 addition & 1 deletion src/main/java/io/kurrent/dbclient/GossipClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class GossipClient {

public GossipClient(KurrentDBClientSettings settings, ManagedChannel channel) {
_channel = channel;
_stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), (long)settings.getGossipTimeout());
_stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), settings.getGossipTimeout());
}

public void shutdown() {
Expand Down
Loading
Loading