-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
240 additions
and
119 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file modified
BIN
+9.17 KB
(120%)
docs/architecture/img/Danube_architecture_non_persistent.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# Danube client library | ||
|
||
Currently, the supported clients are the [Rust Client](https://github.com/danrusei/danube/tree/main/danube-client) and [Go Client](https://github.com/danrusei/danube-go) clients. However, the community is encouraged to contribute by developing clients in other programming languages. | ||
|
||
## Patterns | ||
|
||
The Danube permits multiple topics and subcriber to the same topic. The [Subscription Types](../architecture/Queuing_PubSub_messaging.md) can be combined to obtain message queueing or fan-out pub-sub messaging patterns. | ||
|
||
![Producers Consumers](../architecture/img/producers_consumers.png "Producers Consumers") | ||
|
||
## Rust client | ||
|
||
The Rust [danube-client](https://crates.io/crates/danube-client) is an asynchronous Rust client library. To start using the `danube-client` library in your Rust project, you need to add it as a dependency. You can do this by running the following command: | ||
|
||
```bash | ||
cargo add danube-client | ||
``` | ||
|
||
This command will add danube-client to your `Cargo.toml` file. Once added, you can import and use the library in your Rust code to interact with the Danube Pub/Sub messaging platform. | ||
|
||
## Go client | ||
|
||
To start using the [danube-go](https://pkg.go.dev/github.com/danrusei/danube-go) library in your Go project, you need to add it as a dependency. You can do this by running the following command: | ||
|
||
```bash | ||
go get github.com/danrusei/danube-go | ||
``` | ||
|
||
This command will fetch the `danube-go` library and add it to your `go.mod` file. Once added, you can import and use the library in your Go code to interact with the Danube Pub/Sub messaging platform. | ||
|
||
## Community Danube clients | ||
|
||
TBD |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
# Consumer | ||
|
||
A consumer is a process that attaches to a topic via a subscription and then receives messages. | ||
|
||
**Subscription Types** - describe the way the consumers receive the messages from topics | ||
|
||
* **Exclusive** - Only one consumer can subscribe, guaranteeing message order. | ||
* **Shared** - Multiple consumers can subscribe, messages are delivered round-robin, offering good scalability but no order guarantee. | ||
* **Failover** - Similar to shared subscriptions, but multiple consumers can subscribe, and one actively receives messages. | ||
|
||
## Example | ||
|
||
=== "Rust" | ||
|
||
```rust | ||
let topic = "/default/test_topic".to_string(); | ||
|
||
let mut consumer = client | ||
.new_consumer() | ||
.with_topic(topic.clone()) | ||
.with_consumer_name("test_consumer") | ||
.with_subscription("test_subscription") | ||
.with_subscription_type(SubType::Exclusive) | ||
.build(); | ||
|
||
// Subscribe to the topic | ||
let consumer_id = consumer.subscribe().await?; | ||
println!("The Consumer with ID: {:?} was created", consumer_id); | ||
|
||
let _schema = client.get_schema(topic).await.unwrap(); | ||
|
||
// Start receiving messages | ||
let mut message_stream = consumer.receive().await?; | ||
|
||
while let Some(message) = message_stream.next().await { | ||
//process the message and ack for receive | ||
} | ||
``` | ||
|
||
=== "Go" | ||
|
||
```go | ||
ctx := context.Background() | ||
topic := "/default/test_topic" | ||
subType := danube.Exclusive | ||
|
||
consumer, err := client.NewConsumer(ctx). | ||
WithConsumerName("test_consumer"). | ||
WithTopic(topic). | ||
WithSubscription("test_subscription"). | ||
WithSubscriptionType(subType). | ||
Build() | ||
if err != nil { | ||
log.Fatalf("Failed to initialize the consumer: %v", err) | ||
} | ||
|
||
consumerID, err := consumer.Subscribe(ctx) | ||
if err != nil { | ||
log.Fatalf("Failed to subscribe: %v", err) | ||
} | ||
log.Printf("The Consumer with ID: %v was created", consumerID) | ||
|
||
// Receiving messages | ||
streamClient, err := consumer.Receive(ctx) | ||
if err != nil { | ||
log.Fatalf("Failed to receive messages: %v", err) | ||
} | ||
|
||
for { | ||
msg, err := streamClient.Recv() | ||
//process the message and ack for receive | ||
|
||
} | ||
``` | ||
|
||
## Complete example | ||
|
||
For a complete example implementation of the above code using producers and consumers, check the examples: | ||
|
||
* [Rust Examples](https://github.com/danrusei/danube/tree/main/danube-client/examples) | ||
* [Go Examples](https://github.com/danrusei/danube-go/tree/main/examples) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
# Producer | ||
|
||
A producer is a process that attaches to a topic and publishes messages to a Danube broker. The Danube broker processes the messages. | ||
|
||
**Access Mode** is a mechanism to determin the permissions of producers on topics. | ||
|
||
* **Shared** - Multiple producers can publish on a topic. | ||
* **Exclusive** - If there is already a producer connected, other producers trying to publish on this topic get errors immediately. | ||
|
||
## Example | ||
|
||
=== "Rust" | ||
|
||
```rust | ||
let topic = "/default/test_topic".to_string(); | ||
|
||
let json_schema = r#"{"type": "object", "properties": {"field1": {"type": "string"}, "field2": {"type": "integer"}}}"#.to_string(); | ||
|
||
let mut producer = client | ||
.new_producer() | ||
.with_topic(topic) | ||
.with_name("test_producer1") | ||
.with_schema("my_app".into(), SchemaType::Json(json_schema)) | ||
.build(); | ||
|
||
let prod_id = producer.create().await?; | ||
info!("The Producer was created with ID: {:?}", prod_id); | ||
|
||
let data = json!({ | ||
"field1": format!{"value{}", i}, | ||
"field2": 2020+i, | ||
}); | ||
|
||
// Convert to string and encode to bytes | ||
let json_string = serde_json::to_string(&data).unwrap(); | ||
let encoded_data = json_string.as_bytes().to_vec(); | ||
|
||
// let json_message = r#"{"field1": "value", "field2": 123}"#.as_bytes().to_vec(); | ||
let message_id = producer.send(encoded_data).await?; | ||
println!("The Message with id {} was sent", message_id); | ||
``` | ||
|
||
=== "Go" | ||
|
||
```go | ||
ctx := context.Background() | ||
topic := "/default/test_topic" | ||
jsonSchema := `{"type": "object", "properties": {"field1": {"type": "string"}, "field2": {"type": "integer"}}}` | ||
|
||
producer, err := client.NewProducer(ctx). | ||
WithName("test_producer"). | ||
WithTopic(topic). | ||
WithSchema("test_schema", danube.SchemaType_JSON, jsonSchema). | ||
Build() | ||
if err != nil { | ||
log.Fatalf("unable to initialize the producer: %v", err) | ||
} | ||
|
||
producerID, err := producer.Create(context.Background()) | ||
if err != nil { | ||
log.Fatalf("Failed to create producer: %v", err) | ||
} | ||
log.Printf("The Producer was created with ID: %v", producerID) | ||
|
||
data := map[string]interface{}{ | ||
"field1": fmt.Sprintf("value%d", 24), | ||
"field2": 2024, | ||
} | ||
|
||
jsonData, err := json.Marshal(data) | ||
if err != nil { | ||
log.Fatalf("Failed to marshal data: %v", err) | ||
} | ||
|
||
messageID, err := producer.Send(context.Background(), jsonData) | ||
if err != nil { | ||
log.Fatalf("Failed to send message: %v", err) | ||
} | ||
log.Printf("The Message with id %v was sent", messageID) | ||
``` | ||
|
||
## Complete example | ||
|
||
For a complete example implementation of the above code using producers and consumers, check the examples: | ||
|
||
* [Rust Examples](https://github.com/danrusei/danube/tree/main/danube-client/examples) | ||
* [Go Examples](https://github.com/danrusei/danube-go/tree/main/examples) |
50 changes: 26 additions & 24 deletions
50
docs/client_libraries/Rust_client/setup.md → docs/client_libraries/setup.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.