Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/gofiber/websocket/v2 v2.2.1
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/stretchr/testify v1.9.0
)

require (
Expand All @@ -17,6 +18,7 @@ require (
github.com/bytedance/sonic/loader v0.2.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fasthttp/websocket v1.5.3 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -35,6 +37,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/bytedance/sonic v1.12.6 h1:/isNmCUF2x3Sh8RAp/4mh4ZGkcFAX/hLrzrK3AvpRzk=
Expand Down Expand Up @@ -35,8 +33,6 @@ github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL
github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM=
github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/gofiber/fiber/v2 v2.52.5 h1:tWoP1MJQjGEe4GB5TUGOi7P2E0ZMMRx5ZTG4rT+yGMo=
github.com/gofiber/fiber/v2 v2.52.5/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw=
github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w=
Expand All @@ -50,8 +46,6 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand All @@ -69,8 +63,6 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
65 changes: 55 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"slices"
"strings"
"sync/atomic"
"time"

"github.com/doquangtan/socketio/v4/client"
Expand Down Expand Up @@ -47,9 +48,29 @@ type Io struct {
onAuthentication func(params map[string]string) bool
onConnection connectionEvent
close chan interface{}
path string
}

func New() *Io {
type (
option struct {
path string
}
optionFn func(*option)
)

func SetPath(path string) optionFn { return func(opt *option) { opt.path = path } }

func New(fns ...optionFn) *Io {
// Load sensible default value.
opt := option{
path: "/socket.io/",
}

// Overload them with user's provided ones.
for _, fn := range fns {
fn(&opt)
}

pingInterval := time.Duration(25000 * time.Millisecond)
pingTimeout := time.Duration(25000 * time.Millisecond)
maxPayload := 1000000
Expand All @@ -68,6 +89,7 @@ func New() *Io {
pingInterval: pingInterval,
pingTimeout: pingTimeout,
maxPayload: maxPayload,
path: opt.path,
}
ctx, cancelFunc := context.WithCancel(context.Background())
go io.read(ctx)
Expand All @@ -84,9 +106,9 @@ var upgrader = gWebsocket.Upgrader{}
func (s *Io) ServeHTTP(w http.ResponseWriter, r *http.Request) {
header := r.Header
if slices.Contains(header["Connection"], "Upgrade") && header.Get("Upgrade") == "websocket" {

upgrader.CheckOrigin = func(r *http.Request) bool { return true }
c, err := upgrader.Upgrade(w, r, nil)

if err != nil {
log.Print("Upgrade:", err)
return
Expand All @@ -99,16 +121,22 @@ func (s *Io) ServeHTTP(w http.ResponseWriter, r *http.Request) {

socket := Socket{
Id: s.randomUUID(),
Nps: "/",
Nsp: "/",
Conn: &Conn{
http: c,
http: c,
reqHeaders: r.Header.Clone(),
reqQuery: cloneValues(r.URL.Query()),
data: &atomic.Pointer[any]{},
},

listeners: listeners{
list: make(map[string][]eventCallback),
},
pingTime: s.pingInterval,
}

defer socket.disconnect()

socket.dispose = append(socket.dispose, func() {
s.sockets.delete(socket.Id)
})
Expand All @@ -123,21 +151,26 @@ func (s *Io) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}.ToJson())

for {

messageType, message, err := c.ReadMessage()
if err != nil {
log.Printf("[socket.io] input error: %v\n", err)

break
}

if messageType == websocket.TextMessage {
err := s.handlerMessage(&socket, string(message))
if err != nil {
log.Printf("[socket.io] message handling error: %v\n", err)

return
}
}
}
} else if strings.HasPrefix(r.URL.Path, "/socket.io/") {
} else if strings.HasPrefix(r.URL.Path, s.path) {
clientDistFs, _ := fs.Sub(staticFS, "client-dist")
fs := http.StripPrefix("/socket.io/", http.FileServer(http.FS(clientDistFs)))
fs := http.StripPrefix(s.path, http.FileServer(http.FS(clientDistFs)))
fs.ServeHTTP(w, r)
} else {
http.NotFound(w, r)
Expand Down Expand Up @@ -282,7 +315,7 @@ func (s *Io) new() func(ctx *fiber.Ctx) error {

socket := Socket{
Id: s.randomUUID(),
Nps: "/",
Nsp: "/",
Conn: &Conn{
fasthttp: c,
},
Expand Down Expand Up @@ -324,9 +357,13 @@ func (s *Io) new() func(ctx *fiber.Ctx) error {
}

func (s *Io) handlerMessage(socket *Socket, message string) error {
log.Printf("[socket.io] recv packet sid=%s ns=%s data=%s\n",
socket.Id, socket.Nsp, message)

enginePacketType := string(message[0:1])
switch enginePacketType {
case engineio.MESSAGE.String():

mess := string(message)
packetType := string(message[1:2])
rawpayload := string(message[2:])
Expand All @@ -340,13 +377,16 @@ func (s *Io) handlerMessage(socket *Socket, message string) error {
special3 := -1
nextMess := message

tot := 0

for {
nextSpecial3 := strings.Index(string(nextMess), ",")
if nextSpecial3 == -1 || (special1 != -1 && nextSpecial3 > special1) || (special2 != -1 && nextSpecial3 > special2) {
if nextSpecial3 == -1 || (special1 != -1 && (tot+nextSpecial3) > special1) || (special2 != -1 && (tot+nextSpecial3) > special2) {
break
}
nextMess = nextMess[nextSpecial3+1:]
special3 = nextSpecial3
tot += nextSpecial3
}

if special3 != -1 {
Expand Down Expand Up @@ -395,7 +435,7 @@ func (s *Io) handlerMessage(socket *Socket, message string) error {
if namespace != "/" {
socketWithNamespace := Socket{
Id: socket.Id,
Nps: namespace,
Nsp: namespace,
Conn: socket.Conn,
listeners: listeners{
list: make(map[string][]eventCallback),
Expand Down Expand Up @@ -456,6 +496,7 @@ func (s *Io) handlerMessage(socket *Socket, message string) error {
for _, callback := range s.Of(namespace).onConnection.get("connection") {
callback(socket_nps)
}

case socket_protocol.EVENT.String():
socket_nps, err := s.Of(namespace).sockets.get(socket.Id)
if err != nil {
Expand Down Expand Up @@ -484,7 +525,11 @@ func (s *Io) handlerMessage(socket *Socket, message string) error {
// case socket_protocol.BINARY_ACK.String():
}
case engineio.PONG.String():
// println("Client pong")
// log.Println("[socket.io] received pong")

default:
log.Printf("[socket.io] un-handled packet type: %q\n", enginePacketType)
}

return nil
}
35 changes: 35 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package socketio

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestHandlerMessage(t *testing.T) {
s := Io{
namespaces: namespaces{
list: map[string]*Namespace{
"/": {Name: ""},
},
},
}

// NOTE: That one already passed before eef17c02d9676f5463453a5f167205cd4c4bbba1
err := s.handlerMessage(&Socket{
Id: "test", Nsp: "/",
}, `42/ssh,["resize",{"cols":123,"rows":41}]`)

t.Logf("%T %v", err, err)

require.ErrorIs(t, err, ErrorInvalidConnection)
// NOTE: That one would panic prior to eef17c02d9676f5463453a5f167205cd4c4bbba1
require.NotPanics(t, func() {
err := s.handlerMessage(&Socket{
Id: "test", Nsp: "/",
}, `42/logview,["resize",{"cols":124,"rows":34}]`)
require.ErrorIs(t, err, ErrorInvalidConnection,
"parsed content shouldn't cause an error",
)
}, "input content shouldn't cause a panics")
}
34 changes: 29 additions & 5 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package socketio
import (
"errors"
"io"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/doquangtan/socketio/v4/engineio"
Expand All @@ -13,10 +16,29 @@ import (
)

type Conn struct {
fasthttp *websocket.Conn
http *gWebsocket.Conn
fasthttp *websocket.Conn
http *gWebsocket.Conn
reqHeaders http.Header
reqQuery url.Values
data *atomic.Pointer[any]
}

func (c *Conn) RequestHeaders() http.Header { return c.reqHeaders }
func (c *Conn) RequestQuery() url.Values { return c.reqQuery }

func cloneValues(v url.Values) url.Values {
out := make(url.Values, len(v))
for k, vals := range v {
cp := make([]string, len(vals))
copy(cp, vals)
out[k] = cp
}
return out
}

func (c *Conn) SetData(data any) { c.data.Store(&data) }
func (c *Conn) GetData() any { return c.data.Load() }

func (c *Conn) nextWriter(messageType int) (io.WriteCloser, error) {
if c.http != nil {
return c.http.NextWriter(messageType)
Expand Down Expand Up @@ -49,8 +71,9 @@ func (c *Conn) close() error {

type Socket struct {
sync.RWMutex

Id string
Nps string
Nsp string
Conn *Conn
rooms roomNames
listeners listeners
Expand Down Expand Up @@ -139,9 +162,10 @@ func (s *Socket) writer(t socket_protocol.PacketType, arg ...interface{}) error
return err
}
nps := ""
if s.Nps != "/" {
nps = s.Nps + ","
if s.Nsp != "/" {
nps = s.Nsp + ","
}

if t == socket_protocol.ACK {
agrs := append([]interface{}{}, arg[0].([]interface{})[1:])
socket_protocol.WriteToWithAck(w, t, nps, arg[0].([]interface{})[0].(string), agrs...)
Expand Down