Replies: 4 comments 3 replies
-
It would be helpful if you could provide more context to your use case (and maybe more code) – what are you trying to achieve? There should be a solution which doesn't imply a direct Apologies for the late response. |
Beta Was this translation helpful? Give feedback.
-
Ooh okay. What I'm trying to accomplish is to have a server send an sse event to a specific client. The server code with the subscription can be found here: https://github.com/hiveot/hub/blob/main/runtime/transports/httpstransport/sseserver/GoSseServer.go In case you like some more context:
This project has a runtime that accepts requests from clients using https connections, and uses SSE as a return channel. The clients can be IoT devices, web users or other services. When a client establishes a SSE connections, the server is notified using the go-sse onSession callback (line 12). It obtains the runtime session (line 15) for this client and requests a channel from this session (line 36). A go-sse SSE subscription is created to be able to send messages to the client (line 28). Messages received on the session channel (line 41) are pushed to the client (line 53). The go-sse examples show a broadcast solution and I didn't see an example of sending an SSE event to a specific client. Hence the use of subscription. If there is a better way to send messages to an individual sse connection, then I'd be happy to change this to follow the prescribed approach. |
Beta Was this translation helpful? Give feedback.
-
A simple way to achieve this with the current design of the library is to assign each client to its own topic. For each IoT device/user/service you should find a way to generate a unique ID and use that as the topic of the event. For example, what you could do is to generate a new UUID for every new request. With s := sse.Server{
OnSession: func(s *sse.Session) (sse.Subscriber, bool) {
id := uuid.New() // some UUID library of choice
// or fetch ID from a database using the s.Req data, or other method
return sse.Subscriber{
Client: s,
LastEventID: s.LastEventID,
Topics: []string{id.String()},
}, true
}
} If you have multiple topics you want a client to be subscribed to, you can easily emulate that: requestedTopics := getTopics(s.Req) // get the topics from the request in some way
topics := make([]string, 0, len(requestedTopics))
for _, rt := range requestedTopics {
// generate a unique topic name using the client's ID and the topic names it wishes to subscribe to.
topics = append(topics, id.String() + ">>" + rt)
}
return sse.Subscription{
// ...
Topics: topics
}, true In order to find these IDs later, you should keep a thread-safe registry of them somewhere or have a deterministic manner of regenerating them. If you have a database with all the clients, for example, you could retrieve the ID by some unique device name which you obtain through other means. The issue of generating these IDs changes on a case-by-case basis – you should find out the way you can obtain that in your product. If you also want to send a message to all clients subscribed to a specific topic, you may also hold a mapping between topics and all the IDs of the clients subscribed to that topic. This should also be easy if you already have a database with the users/devices or have a reliable deterministic way of generating the IDs. This is how I'd approach the issue. It should be the least amount of work because it enables you to leverage both the already existing Please let me know if this helps you find a way forward with your problem! Given that this was an issue beforehand and the thread has some other unnecessary comments, please mark as answer if it does help, for discoverability. |
Beta Was this translation helpful? Give feedback.
-
Thank you I'll give it a try. |
Beta Was this translation helpful? Give feedback.
-
Edit from maintainer: this was originally an issue with the title "Race condition in server using sub.Client.Send()", which was subsequently deemed to be a confusion on how to use the library. Discussion from issue left here for reference. The library primitives referenced in the issue discussion are part of
v0.8.0
ofgo-sse
– they may be subject to change or removal in the future. The ideas and approaches presented here should still remain relevant even after future updates.There seems to be a race condition sending messages from the server to the client.
In sse.Server OnSession a subscription is created as follow:
In a separate gorouting, messages are send from the server to this client:
This works most of the time, but running it with go test -race exposes a race condition:
The documentation of the Send function states that the interface does not have to be thread-safe:
(server.go:26)
Beta Was this translation helpful? Give feedback.
All reactions