The Java client library for interacting with Danube Messaging Broker platform.
Danube is an open-source distributed Messaging platform written in Rust. Consult the documentation for supported concepts and the platform architecture.
- Basic Messaging - Send messages with byte payloads and optional key-value attributes
- Partitioned Topics - Distribute messages across multiple partitions for horizontal scaling
- Reliable Dispatch - Guaranteed message delivery with persistence (WAL + cloud storage)
- Schema Integration - Type-safe messaging with automatic validation (Bytes, String, Number, Avro, JSON Schema, Protobuf)
- Flexible Subscriptions - Three subscription types for different use cases:
- Exclusive - Single active consumer, guaranteed ordering
- Shared - Load balancing across multiple consumers, parallel processing
- Failover - High availability with automatic standby promotion
- Message Acknowledgment - Reliable message processing with at-least-once delivery
- Partitioned Consumption - Automatic handling of messages from all partitions
- Message Attributes - Access metadata and custom headers
- Schema Management - Register, version, and retrieve schemas (JSON Schema, Avro, Protobuf)
- Compatibility Checking - Validate schema evolution (Backward, Forward, Full, None modes)
- Type Safety - Automatic validation against registered schemas
- Schema Evolution - Safe schema updates with compatibility enforcement
- Virtual Threads - Built on Project Loom for efficient I/O without blocking platform threads
- Reactive API -
Flow.Publisher<StreamMessage>receive API (Java standard) - Connection Pooling - Shared gRPC channel management across producers/consumers
- TLS / mTLS - Secure connections with custom CA and client certificates
- JWT Authentication - API-key based token exchange with automatic renewal
- Topic Namespaces - Organize topics with namespace structure (
/namespace/topic-name)
<dependency>
<groupId>com.danube-messaging</groupId>
<artifactId>danube-client</artifactId>
<version>0.2.0</version>
</dependency>implementation 'com.danube-messaging:danube-client:0.2.0'Requirements: Java 21 or later.
Check out the example files.
Use the instructions from the documentation to run the Danube broker/cluster.
import com.danubemessaging.client.DanubeClient;
import com.danubemessaging.client.Producer;
import java.util.Map;
DanubeClient client = DanubeClient.builder()
.serviceUrl("http://127.0.0.1:6650")
.build();
String topic = "/default/test_topic";
String producerName = "test_producer";
Producer producer = client.newProducer()
.withTopic(topic)
.withName(producerName)
.build();
producer.create();
System.out.printf("The Producer %s was created%n", producerName);
byte[] payload = "Hello Danube".getBytes();
long messageId = producer.send(payload, Map.of());
System.out.printf("The Message with id %d was sent%n", messageId);
client.close();Reliable dispatch can be enabled when creating the producer; the broker will stream messages to consumers from WAL and cloud storage.
import com.danubemessaging.client.DispatchStrategy;
Producer producer = client.newProducer()
.withTopic(topic)
.withName(producerName)
.withDispatchStrategy(DispatchStrategy.RELIABLE)
.build();import com.danubemessaging.client.Consumer;
import com.danubemessaging.client.DanubeClient;
import com.danubemessaging.client.SubType;
import com.danubemessaging.client.model.StreamMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
DanubeClient client = DanubeClient.builder()
.serviceUrl("http://127.0.0.1:6650")
.build();
String topic = "/default/test_topic";
String consumerName = "test_consumer";
String subscriptionName = "test_subscription";
Consumer consumer = client.newConsumer()
.withTopic(topic)
.withConsumerName(consumerName)
.withSubscription(subscriptionName)
.withSubscriptionType(SubType.EXCLUSIVE)
.build();
// Subscribe to the topic
consumer.subscribe();
System.out.printf("The Consumer %s was created%n", consumerName);
CountDownLatch shutdown = new CountDownLatch(1);
// Start receiving messages via Flow.Publisher
consumer.receive().subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(StreamMessage msg) {
System.out.printf("Received message: %s%n", new String(msg.payload()));
// Acknowledge the message
consumer.ack(msg);
}
@Override public void onError(Throwable t) { shutdown.countDown(); }
@Override public void onComplete() { shutdown.countDown(); }
});
shutdown.await();
client.close();import com.danubemessaging.client.DanubeClient;
import com.danubemessaging.client.Producer;
import com.danubemessaging.client.SchemaRegistryClient;
import com.danubemessaging.client.schema.SchemaType;
import java.util.Map;
DanubeClient client = DanubeClient.builder()
.serviceUrl("http://127.0.0.1:6650")
.build();
SchemaRegistryClient schemaClient = client.newSchemaRegistry();
String jsonSchema = """
{
"type": "object",
"properties": {
"field1": {"type": "string"},
"field2": {"type": "integer"}
}
}""";
// Register a JSON schema
schemaClient.registerSchema(
schemaClient.newRegistration()
.withSubject("my-app-events")
.withSchemaType(SchemaType.JSON_SCHEMA)
.withSchemaDefinition(jsonSchema.getBytes()));
// Create producer with schema reference
Producer producer = client.newProducer()
.withTopic("/default/test_topic")
.withName("schema_producer")
.withSchemaLatest("my-app-events")
.build();
producer.create();Browse the examples directory for complete working code.
Working on improving and adding new features. Please feel free to contribute or report any issues you encounter.
Before submitting a PR, start the test cluster and run the integration tests:
# 1. Start the cluster
cd docker/
docker compose up -d
# 2. Wait for the broker to be healthy
docker compose ps
# 3. Run the integration tests from the repository root
cd ..
mvn -pl danube-client -Pintegration-tests verify
# 4. Stop the cluster when done
cd docker/
docker compose down -vdanube-client-protoβ generated protobuf + gRPC Java stubsdanube-clientβ handwritten Java client API and internals
Make sure the proto files are the latest from the Danube project.
Copy all .proto files into:
danube-client-proto/src/main/proto/
DanubeApi.proto
SchemaRegistry.proto
No manual protoc install is required β Maven downloads the compiler and plugin automatically. Regenerate from the repo root:
mvn -pl danube-client-proto -am generate-sourcesOr via the helper script:
bash scripts/generate-stubs.shGenerated sources will be in:
danube-client-proto/target/generated-sources/protobuf/javadanube-client-proto/target/generated-sources/protobuf/grpc-java
mvn clean verify