an eventbus made on portable MQ.
Check the example/pulse and find an example for supported MQ. Note: You need run your MQ first and get its address.
example for natsstreaming.
meta := protocol.NewMetadata()
meta.SetDriver(nats.DriverName)
meta.Properties[nats.NatsURL] = "nats://localhost:4222"
meta.Properties[nats.NatsStreamingClusterID] = "test-cluster"
meta.Properties[nats.SubscriptionType] = "topic"
meta.Properties[nats.ConsumerID] = "app-test-a"
Publisher is asynchronously and could get result about the success or failure to send the event.
t, err := topic.NewTopic(meta, topic.WithMiddlewares(visitor.WithRetry(3)))
if err != nil {
log.Error(err)
return
}
res := t.Publish(context.Background(), protocol.NewMessage("test", "", []byte("hello")))
go func() {
if _, err := res.Get(context.Background()); err != nil {
log.Error(err)
}
}()
Receive is a synchronous function and blocks until have an err set by like ctx.Done() or other error.
s, err := subscription.NewSubscription("hello", meta, subscription.WithCount())
if err != nil {
log.Error(err)
return
}
err = s.Receive(context.Background(), protocol.NewSubscribeRequest("test", meta), func(ctx context.Context, m *protocol.Message) {
log.Debug("receive message ", m)
})
-
Idempotence: Simply record in the map of each Subscription to avoid repeated processing by a single consumer. Nats can provide queueSubscribe
-
Orderliness: Messages use OrderingKey to ensure orderly delivery. If an error occurs, the sending of a certain OrderingKey will be suspended, and the empty key will not be suspended
-
Concurrent processing: Both topic and Subscription use concurrent processing. Pay attention to whether the middleware has critical resources
-
Reliability: ack is implemented independently to ensure that the message is delivered at least once. In future: support QoS 0,1,2 three level.
-
Asynchronous: Message send asynchronously and could be buffered and delay send.
-
Batch Handle: Scheduler could buffer message and batch handle them if underlying MQ supports.
Driver is the realization of various protocol like Nats, http etc.
Message is the object transport from the pulse endpoint. It's format as CloudEvents.
{
"specversion": "1.x-wip",
"type": "coolevent",
"id": "xxxx-xxxx-xxxx",
"source": "bigco.com",
"data": { ... }
}
- nuid: now use nats's package nuid to generated uuid
- CloudEvent: a CNCF project
- Cloud State: sidecar project to move state out of application
- GoogleCloud Pub sub : use to get a new driver
- Dapr: sidecar Synthesizer
- Saga: pulse usage.
- Kong: siprit on extension.
- Kafka: log
- axon: event source DDD CQRS
- webhook: to establish a webhook to receive the response asynchronously