diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 947dc27..af38e8e 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -28,7 +28,7 @@ jobs: - name: Deploy to GitHub Pages uses: peaceiris/actions-gh-pages@v3 with: - deploy_key: ${{ secrets.DEPLOY_KEY }} + deploy_key: ${{ secrets.GITHUB_TOKEN }} publish_dir: ./site user_name: 'github-actions' user_email: 'github-actions@github.com' \ No newline at end of file diff --git a/docs/architecture/img/Danube_architecture_non_persistent.png b/docs/architecture/img/Danube_architecture_non_persistent.png index 52ef5da..46fca79 100644 Binary files a/docs/architecture/img/Danube_architecture_non_persistent.png and b/docs/architecture/img/Danube_architecture_non_persistent.png differ diff --git a/docs/architecture/img/Danube_architecture_persistent.png b/docs/architecture/img/Danube_architecture_persistent.png index 62d5292..e95c353 100644 Binary files a/docs/architecture/img/Danube_architecture_persistent.png and b/docs/architecture/img/Danube_architecture_persistent.png differ diff --git a/docs/architecture/img/partitioned_topics.png b/docs/architecture/img/partitioned_topics.png index 78cc34c..7d7892d 100644 Binary files a/docs/architecture/img/partitioned_topics.png and b/docs/architecture/img/partitioned_topics.png differ diff --git a/docs/architecture/img/producers_consumers.png b/docs/architecture/img/producers_consumers.png index 91d341b..bd01a6c 100644 Binary files a/docs/architecture/img/producers_consumers.png and b/docs/architecture/img/producers_consumers.png differ diff --git a/docs/client_libraries/Go_client/setup.md b/docs/client_libraries/Go_client/setup.md deleted file mode 100644 index a89ab2b..0000000 --- a/docs/client_libraries/Go_client/setup.md +++ /dev/null @@ -1,3 +0,0 @@ -# Danube Pub/Sub Go client - -The client is still in development diff --git a/docs/client_libraries/Rust_client/usage.md b/docs/client_libraries/Rust_client/usage.md deleted file mode 100644 index 365409d..0000000 --- a/docs/client_libraries/Rust_client/usage.md +++ /dev/null @@ -1,86 +0,0 @@ -# Use Danube Rust cient - -The Danube permits multiple topics and subcriber to the same topic. The [Subscription Types](../../architecture/Queuing_PubSub_messaging.md) can be combined to obtain message queueing or fan-out pub-sub messaging patterns. - -![Producers Consumers](../../architecture/img/producers_consumers.png "Producers Consumers") - -## Producer - -A producer is a process that attaches to a topic and publishes messages to a Danube broker. The Danube broker processes the messages. - -**Access Mode** is a mechanism to determin the permissions of producers on topics. - -* **Shared** - Multiple producers can publish on a topic. -* **Exclusive** - If there is already a producer connected, other producers trying to publish on this topic get errors immediately. - -### Example - -```rust -let topic = "/default/test_topic".to_string(); - -let json_schema = r#"{"type": "object", "properties": {"field1": {"type": "string"}, "field2": {"type": "integer"}}}"#.to_string(); - -let mut producer = client - .new_producer() - .with_topic(topic) - .with_name("test_producer1") - .with_schema("my_app".into(), SchemaType::Json(json_schema)) - .build(); - -let prod_id = producer.create().await?; -info!("The Producer was created with ID: {:?}", prod_id); - -let data = json!({ - "field1": format!{"value{}", i}, - "field2": 2020+i, - }); - -// Convert to string and encode to bytes -let json_string = serde_json::to_string(&data).unwrap(); -let encoded_data = json_string.as_bytes().to_vec(); - -// let json_message = r#"{"field1": "value", "field2": 123}"#.as_bytes().to_vec(); -let message_id = producer.send(encoded_data).await?; -println!("The Message with id {} was sent", message_id); -``` - -## Consumer - -A consumer is a process that attaches to a topic via a subscription and then receives messages. - -**Subscription Types** - describe the way the consumers receive the messages from topics - -* **Exclusive** - Only one consumer can subscribe, guaranteeing message order. -* **Shared** - Multiple consumers can subscribe, messages are delivered round-robin, offering good scalability but no order guarantee. -* **Failover** - Similar to shared subscriptions, but multiple consumers can subscribe, and one actively receives messages. - -### Example - -```rust -let topic = "/default/test_topic".to_string(); - - let mut consumer = client - .new_consumer() - .with_topic(topic.clone()) - .with_consumer_name("test_consumer") - .with_subscription("test_subscription") - .with_subscription_type(SubType::Exclusive) - .build(); - - // Subscribe to the topic - let consumer_id = consumer.subscribe().await?; - println!("The Consumer with ID: {:?} was created", consumer_id); - - let _schema = client.get_schema(topic).await.unwrap(); - - // Start receiving messages - let mut message_stream = consumer.receive().await?; - - while let Some(message) = message_stream.next().await { - //process the message and ack for receive - } -``` - -## Complete example - -For a complete example implementation of the above code using producers and consumers, check the [Examples folder](https://github.com/danrusei/danube/tree/main/danube-client/examples). diff --git a/docs/client_libraries/clients.md b/docs/client_libraries/clients.md new file mode 100644 index 0000000..a9627be --- /dev/null +++ b/docs/client_libraries/clients.md @@ -0,0 +1,33 @@ +# Danube client library + +Currently, the supported clients are the [Rust Client](https://github.com/danrusei/danube/tree/main/danube-client) and [Go Client](https://github.com/danrusei/danube-go) clients. However, the community is encouraged to contribute by developing clients in other programming languages. + +## Patterns + +The Danube permits multiple topics and subcriber to the same topic. The [Subscription Types](../architecture/Queuing_PubSub_messaging.md) can be combined to obtain message queueing or fan-out pub-sub messaging patterns. + +![Producers Consumers](../architecture/img/producers_consumers.png "Producers Consumers") + +## Rust client + +The Rust [danube-client](https://crates.io/crates/danube-client) is an asynchronous Rust client library. To start using the `danube-client` library in your Rust project, you need to add it as a dependency. You can do this by running the following command: + +```bash +cargo add danube-client +``` + +This command will add danube-client to your `Cargo.toml` file. Once added, you can import and use the library in your Rust code to interact with the Danube Pub/Sub messaging platform. + +## Go client + +To start using the [danube-go](https://pkg.go.dev/github.com/danrusei/danube-go) library in your Go project, you need to add it as a dependency. You can do this by running the following command: + +```bash +go get github.com/danrusei/danube-go +``` + +This command will fetch the `danube-go` library and add it to your `go.mod` file. Once added, you can import and use the library in your Go code to interact with the Danube Pub/Sub messaging platform. + +## Community Danube clients + +TBD diff --git a/docs/client_libraries/consumer.md b/docs/client_libraries/consumer.md new file mode 100644 index 0000000..a6a2406 --- /dev/null +++ b/docs/client_libraries/consumer.md @@ -0,0 +1,84 @@ +# Consumer + +A consumer is a process that attaches to a topic via a subscription and then receives messages. + +**Subscription Types** - describe the way the consumers receive the messages from topics + +* **Exclusive** - Only one consumer can subscribe, guaranteeing message order. +* **Shared** - Multiple consumers can subscribe, messages are delivered round-robin, offering good scalability but no order guarantee. +* **Failover** - Similar to shared subscriptions, but multiple consumers can subscribe, and one actively receives messages. + +## Example + +=== "Rust" + + ```rust + let topic = "/default/test_topic".to_string(); + + let mut consumer = client + .new_consumer() + .with_topic(topic.clone()) + .with_consumer_name("test_consumer") + .with_subscription("test_subscription") + .with_subscription_type(SubType::Exclusive) + .build(); + + // Subscribe to the topic + let consumer_id = consumer.subscribe().await?; + println!("The Consumer with ID: {:?} was created", consumer_id); + + let _schema = client.get_schema(topic).await.unwrap(); + + // Start receiving messages + let mut message_stream = consumer.receive().await?; + + while let Some(message) = message_stream.next().await { + + //process the message and ack for receive + + } + ``` + +=== "Go" + + ```go + ctx := context.Background() + topic := "/default/test_topic" + subType := danube.Exclusive + + consumer, err := client.NewConsumer(ctx). + WithConsumerName("test_consumer"). + WithTopic(topic). + WithSubscription("test_subscription"). + WithSubscriptionType(subType). + Build() + if err != nil { + log.Fatalf("Failed to initialize the consumer: %v", err) + } + + consumerID, err := consumer.Subscribe(ctx) + if err != nil { + log.Fatalf("Failed to subscribe: %v", err) + } + log.Printf("The Consumer with ID: %v was created", consumerID) + + // Receiving messages + streamClient, err := consumer.Receive(ctx) + if err != nil { + log.Fatalf("Failed to receive messages: %v", err) + } + + for { + msg, err := streamClient.Recv() + + //process the message and ack for receive + + } + ``` + +## Complete example + +For a complete example implementation of the above code using producers and consumers, check the examples: + +* [Rust Examples](https://github.com/danrusei/danube/tree/main/danube-client/examples) +* [Go Examples](https://github.com/danrusei/danube-go/tree/main/examples) diff --git a/docs/client_libraries/producer.md b/docs/client_libraries/producer.md new file mode 100644 index 0000000..c03a6ba --- /dev/null +++ b/docs/client_libraries/producer.md @@ -0,0 +1,87 @@ +# Producer + +A producer is a process that attaches to a topic and publishes messages to a Danube broker. The Danube broker processes the messages. + +**Access Mode** is a mechanism to determin the permissions of producers on topics. + +* **Shared** - Multiple producers can publish on a topic. +* **Exclusive** - If there is already a producer connected, other producers trying to publish on this topic get errors immediately. + +## Example + +=== "Rust" + + ```rust + let topic = "/default/test_topic".to_string(); + + let json_schema = r#"{"type": "object", "properties": {"field1": {"type": "string"}, "field2": {"type": "integer"}}}"#.to_string(); + + let mut producer = client + .new_producer() + .with_topic(topic) + .with_name("test_producer1") + .with_schema("my_app".into(), SchemaType::Json(json_schema)) + .build(); + + let prod_id = producer.create().await?; + info!("The Producer was created with ID: {:?}", prod_id); + + let data = json!({ + "field1": format!{"value{}", i}, + "field2": 2020+i, + }); + + // Convert to string and encode to bytes + let json_string = serde_json::to_string(&data).unwrap(); + let encoded_data = json_string.as_bytes().to_vec(); + + // let json_message = r#"{"field1": "value", "field2": 123}"#.as_bytes().to_vec(); + let message_id = producer.send(encoded_data).await?; + println!("The Message with id {} was sent", message_id); + ``` + +=== "Go" + + ```go + ctx := context.Background() + topic := "/default/test_topic" + jsonSchema := `{"type": "object", "properties": {"field1": {"type": "string"}, "field2": {"type": "integer"}}}` + + producer, err := client.NewProducer(ctx). + WithName("test_producer"). + WithTopic(topic). + WithSchema("test_schema", danube.SchemaType_JSON, jsonSchema). + Build() + if err != nil { + log.Fatalf("unable to initialize the producer: %v", err) + } + + producerID, err := producer.Create(context.Background()) + if err != nil { + log.Fatalf("Failed to create producer: %v", err) + } + log.Printf("The Producer was created with ID: %v", producerID) + + data := map[string]interface{}{ + "field1": fmt.Sprintf("value%d", 24), + "field2": 2024, + } + + jsonData, err := json.Marshal(data) + if err != nil { + log.Fatalf("Failed to marshal data: %v", err) + } + + messageID, err := producer.Send(context.Background(), jsonData) + if err != nil { + log.Fatalf("Failed to send message: %v", err) + } + log.Printf("The Message with id %v was sent", messageID) + ``` + +## Complete example + +For a complete example implementation of the above code using producers and consumers, check the examples: + +* [Rust Examples](https://github.com/danrusei/danube/tree/main/danube-client/examples) +* [Go Examples](https://github.com/danrusei/danube-go/tree/main/examples) diff --git a/docs/client_libraries/Rust_client/setup.md b/docs/client_libraries/setup.md similarity index 54% rename from docs/client_libraries/Rust_client/setup.md rename to docs/client_libraries/setup.md index 443ee45..57d5d2b 100644 --- a/docs/client_libraries/Rust_client/setup.md +++ b/docs/client_libraries/setup.md @@ -1,37 +1,39 @@ -# Getting Started with `danube-client` +# Client Setup -The `danube-client` is an asynchronous Rust client library designed for interacting with the Danube Pub/Sub messaging platform. This guide will walk you through the steps to add the `danube-client` library to your Rust project. +Before an application creates a producer/consumer, the client library needs to initiate a setup phase including two steps: -## Adding `danube-client` to Your Project +* The client attempts to determine the owner of the topic by sending a Lookup request to Broker. +* Once the client library has the broker address, it creates a RPC connection (or reuses an existing connection from the pool) and (in later stage authenticates it ). +* Within this connection, the clients (producer, consumer) and brokers exchange RPC commands. At this point, the client sends a command to create producer/consumer to the broker, which will comply after doing some validation checks. -Use the `cargo add` command to add `danube-client` to your `Cargo.toml` file. +=== "Rust" -``` bash -cargo add danube-client -``` + ```rust + use danube_client::DanubeClient; -## Client Setup + #[tokio::main] + async fn main() -> Result<()> { + // Setup tracing + tracing_subscriber::fmt::init(); -Before an application creates a producer/consumer, the client library needs to initiate a setup phase including two steps: + let client = DanubeClient::builder() + .service_url("http://[::1]:6650") + .build() + .unwrap(); + } + ``` -* The client attempts to determine the owner of the topic by sending a Lookup request to Broker. -* Once the client library has the broker address, it creates a RPC connection (or reuses an existing connection from the pool) and (in later stage authenticates it ). -* Within this connection, the clients (producer, consumer) and brokers exchange RPC commands. At this point, the client sends a command to create producer/consumer to the broker, which will comply after doing some validation checks. +=== "Go" + + ```go + import "github.com/danrusei/danube-go" -``` rust -use danube_client::DanubeClient; + func main() { -#[tokio::main] -async fn main() -> Result<()> { - // Setup tracing - tracing_subscriber::fmt::init(); + client := danube.NewClient().ServiceURL("127.0.0.1:6650").Build() - let client = DanubeClient::builder() - .service_url("http://[::1]:6650") - .build() - .unwrap(); -} -``` + } + ``` ## Refer to the Documentation diff --git a/mkdocs.yml b/mkdocs.yml index e2aca30..92de3f0 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -10,11 +10,11 @@ nav: - Pub-Sub messaging: architecture/Queuing_PubSub_messaging.md - PubSub vs Streaming: architecture/PubSub_messaging_vs_Streaming.md - Client Libraries: - - Rust client: - - Setup: client_libraries/Rust_client/setup.md - - Usage: client_libraries/Rust_client/usage.md - - Go client: - - Setup: client_libraries/Go_client/setup.md + - Danube Clients: client_libraries/clients.md + - Setup: client_libraries/setup.md + - Create Producer: client_libraries/producer.md + - Create Consumer: client_libraries/consumer.md + - Admin API: - Danube CLI: - Brokers: admin_API/danube_admin/brokers.md @@ -25,6 +25,8 @@ nav: - Metadata Resources: development/internal_resources.md theme: name: material + features: + - content.tabs.link markdown_extensions: - pymdownx.highlight: anchor_linenums: true @@ -33,3 +35,5 @@ markdown_extensions: - pymdownx.inlinehilite - pymdownx.snippets - pymdownx.superfences + - pymdownx.tabbed: + alternate_style: true