Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(websocket)!: rename the connector module #129

Merged
merged 1 commit into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/reugn/go-streams/nats v0.0.0
github.com/reugn/go-streams/pulsar v0.0.0
github.com/reugn/go-streams/redis v0.0.0
github.com/reugn/go-streams/ws v0.0.0
github.com/reugn/go-streams/websocket v0.0.0
)

require (
Expand Down Expand Up @@ -94,5 +94,5 @@ replace (
github.com/reugn/go-streams/nats => ../nats
github.com/reugn/go-streams/pulsar => ../pulsar
github.com/reugn/go-streams/redis => ../redis
github.com/reugn/go-streams/ws => ../ws
github.com/reugn/go-streams/websocket => ../websocket
)
1 change: 1 addition & 0 deletions examples/websocket/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
websocket
30 changes: 10 additions & 20 deletions examples/ws/main.go → examples/websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/gorilla/websocket"
"github.com/reugn/go-streams/flow"
ext "github.com/reugn/go-streams/ws"
ws "github.com/reugn/go-streams/websocket"
)

type wsServer struct {
Expand Down Expand Up @@ -43,7 +40,7 @@ func (server *wsServer) init() {
go func() {
<-time.After(time.Second)
payload := []byte("foo")
server.broadcast <- ext.Message{
server.broadcast <- ws.Message{
MsgType: websocket.TextMessage,
Payload: payload,
}
Expand Down Expand Up @@ -72,7 +69,7 @@ func (server *wsServer) handleConnections(w http.ResponseWriter, r *http.Request
break
}

wsMessage := ext.Message{
wsMessage := ws.Message{
MsgType: messageType,
Payload: payload,
}
Expand All @@ -88,7 +85,7 @@ func (server *wsServer) handleMessages() {
for {
msg := <-server.broadcast
for client := range server.clients {
m := msg.(ext.Message)
m := msg.(ws.Message)
err := client.WriteMessage(m.MsgType, m.Payload)
if err != nil {
log.Printf("Error in WriteMessage: %s", err)
Expand All @@ -104,34 +101,27 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

go term()
startWsServer()
time.Sleep(500 * time.Millisecond)

url := "ws://127.0.0.1:8080/ws"
source, err := ext.NewWebSocketSource(ctx, url)
source, err := ws.NewSource(ctx, url)
if err != nil {
log.Fatal(err)
}

addAsteriskMapFlow := flow.NewMap(addAsterisk, 1)
sink, err := ext.NewWebSocketSink(ctx, url)
mapFlow := flow.NewMap(addAsterisk, 1)

sink, err := ws.NewSink(url)
if err != nil {
log.Fatal(err)
}

source.
Via(addAsteriskMapFlow).
Via(mapFlow).
To(sink)
}

var addAsterisk = func(msg ext.Message) string {
var addAsterisk = func(msg ws.Message) string {
return string(msg.Payload) + "*"
}

func term() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
os.Exit(1)
}
1 change: 0 additions & 1 deletion examples/ws/.gitignore

This file was deleted.

2 changes: 2 additions & 0 deletions websocket/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package websocket implements the WebSocket connector.
package websocket
2 changes: 1 addition & 1 deletion ws/go.mod → websocket/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/reugn/go-streams/ws
module github.com/reugn/go-streams/websocket

go 1.18

Expand Down
File renamed without changes.
151 changes: 151 additions & 0 deletions websocket/web_socket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package websocket

import (
"context"
"log"

ws "github.com/gorilla/websocket"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// Message represents a WebSocket message container.
// Message types are defined in [RFC 6455], section 11.8.
//
// [RFC 6455]: https://www.rfc-editor.org/rfc/rfc6455.html#section-11.8
type Message struct {
MsgType int
Payload []byte
}

// Source represents a WebSocket source connector.
type Source struct {
connection *ws.Conn
out chan any
}

var _ streams.Source = (*Source)(nil)

// NewSource creates and returns a new Source using the default dialer.
func NewSource(ctx context.Context, url string) (*Source, error) {
return NewSourceWithDialer(ctx, url, ws.DefaultDialer)
}

// NewSourceWithDialer returns a new Source using the specified dialer.
func NewSourceWithDialer(ctx context.Context, url string,
dialer *ws.Dialer) (*Source, error) {
// create a new client connection
conn, _, err := dialer.Dial(url, nil)
if err != nil {
return nil, err
}

source := &Source{
connection: conn,
out: make(chan any),
}
go source.init(ctx)

return source, nil
}

func (wsock *Source) init(ctx context.Context) {
loop:
for {
select {
case <-ctx.Done():
break loop
default:
messageType, payload, err := wsock.connection.ReadMessage()
if err != nil {
log.Printf("Error in ReadMessage: %s", err)
} else {
// exit loop on CloseMessage
if messageType == ws.CloseMessage {
break loop
}
wsock.out <- Message{
MsgType: messageType,
Payload: payload,
}
}
}
}
log.Print("Closing WebSocket source connector")
close(wsock.out)
if err := wsock.connection.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// Via streams data to a specified operator and returns it.
func (wsock *Source) Via(operator streams.Flow) streams.Flow {
flow.DoStream(wsock, operator)
return operator
}

// Out returns the output channel of the Source connector.
func (wsock *Source) Out() <-chan any {
return wsock.out
}

// Sink represents a WebSocket sink connector.
type Sink struct {
connection *ws.Conn
in chan any
}

var _ streams.Sink = (*Sink)(nil)

// NewSink creates and returns a new Sink using the default dialer.
func NewSink(url string) (*Sink, error) {
return NewSinkWithDialer(url, ws.DefaultDialer)
}

// NewSinkWithDialer returns a new Sink using the specified dialer.
func NewSinkWithDialer(url string, dialer *ws.Dialer) (*Sink, error) {
// create a new client connection
conn, _, err := dialer.Dial(url, nil)
if err != nil {
return nil, err
}

sink := &Sink{
connection: conn,
in: make(chan any),
}
go sink.init()

return sink, nil
}

func (wsock *Sink) init() {
for msg := range wsock.in {
var err error
switch m := msg.(type) {
case Message:
err = wsock.connection.WriteMessage(m.MsgType, m.Payload)
case *Message:
err = wsock.connection.WriteMessage(m.MsgType, m.Payload)
case string:
err = wsock.connection.WriteMessage(ws.TextMessage, []byte(m))
case []byte:
err = wsock.connection.WriteMessage(ws.BinaryMessage, m)
default:
log.Printf("Unsupported message type %v", m)
}

if err != nil {
log.Printf("Error processing WebSocket message: %s", err)
}
}
log.Print("Closing WebSocket sink connector")
if err := wsock.connection.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// In returns the input channel of the Sink connector.
func (wsock *Sink) In() chan<- any {
return wsock.in
}
2 changes: 0 additions & 2 deletions ws/doc.go

This file was deleted.

Loading
Loading