Skip to content

bingo-project/websocket

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

WebSocket

Go Reference Go Report Card CI

A production-ready WebSocket framework for Go using JSON-RPC 2.0 protocol with middleware support, grouped routing, and connection management. Part of the Bingo ecosystem.

📖 Documentation: bingoctl.dev/en/advanced/websocket

中文文档

Features

  • JSON-RPC 2.0 Protocol - Industry-standard message format (used by MCP, Ethereum, VSCode LSP)
  • Middleware Pattern - Familiar programming model like Gin/Echo
  • Grouped Routing - Support public/private groups with different middleware chains
  • Connection Management - Hub for client registration, authentication, and topic subscriptions
  • Built-in Handlers - Heartbeat, subscribe/unsubscribe out of the box
  • Rate Limiting - Token bucket algorithm with per-method configuration
  • Single Device Login - Automatic session kick when same user logs in from another device
  • Prometheus Metrics - Built-in observability with connection, message, and error metrics
  • Connection Limits - Configurable total and per-user connection limits
  • Graceful Shutdown - Clean shutdown with client notification

Installation

go get github.com/bingo-project/websocket

Quick Start

package main

import (
    "context"
    "log"
    "net/http"

    "github.com/bingo-project/websocket"
    "github.com/bingo-project/websocket/jsonrpc"
    "github.com/bingo-project/websocket/middleware"
    gorillaWS "github.com/gorilla/websocket"
)

func main() {
    // Create hub and router
    hub := websocket.NewHub()
    router := websocket.NewRouter()

    // Add global middleware
    router.Use(
        middleware.Recovery,
        middleware.RequestID,
        middleware.Logger,
    )

    // Public methods (no auth required)
    public := router.Group()
    public.Handle("heartbeat", websocket.HeartbeatHandler)
    public.Handle("echo", func(c *websocket.Context) *jsonrpc.Response {
        return c.JSON(c.Request.Params)
    })

    // Private methods (require auth)
    private := router.Group(middleware.Auth)
    private.Handle("subscribe", websocket.SubscribeHandler)

    // Start hub
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go hub.Run(ctx)

    // WebSocket upgrader
    upgrader := gorillaWS.Upgrader{
        CheckOrigin: func(r *http.Request) bool { return true },
    }

    // HTTP handler
    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            return
        }

        client := websocket.NewClient(hub, conn, context.Background(),
            websocket.WithRouter(router),
        )
        hub.Register <- client

        go client.WritePump()
        go client.ReadPump()
    })

    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Message Format

Request

{
    "jsonrpc": "2.0",
    "method": "auth.login",
    "params": {"username": "test", "password": "123456"},
    "id": 1
}

Success Response

{
    "jsonrpc": "2.0",
    "result": {"token": "xxx", "expiresAt": 1234567890},
    "id": 1
}

Error Response

{
    "jsonrpc": "2.0",
    "error": {
        "code": -32001,
        "reason": "Unauthorized",
        "message": "Login required"
    },
    "id": 1
}

Server Push (Notification)

{
    "jsonrpc": "2.0",
    "method": "session.kicked",
    "params": {"reason": "Account logged in elsewhere"}
}

Middleware

Built-in Middleware

Middleware Description
Recovery / RecoveryWithLogger Catch panics, return 500 error
RequestID Inject request-id into context
Logger / LoggerWithLogger Log requests and latency
Auth Verify user is authenticated
RateLimit / RateLimitWithStore Token bucket rate limiting
LoginStateUpdater Update client state after login

Custom Middleware

func MyMiddleware(next websocket.Handler) websocket.Handler {
    return func(c *websocket.Context) *jsonrpc.Response {
        // Before handler
        log.Printf("Method: %s", c.Method)

        resp := next(c)

        // After handler
        return resp
    }
}

router.Use(MyMiddleware)

Handler

func Login(c *websocket.Context) *jsonrpc.Response {
    var req LoginRequest
    if err := c.BindValidate(&req); err != nil {
        return c.Error(errors.New(400, "InvalidParams", err.Error()))
    }

    // Business logic...
    token := authenticate(req.Username, req.Password)

    // Update client login state
    c.Client.NotifyLogin(userID, req.Platform, tokenExpiresAt)

    return c.JSON(map[string]any{
        "token":     token,
        "expiresAt": tokenExpiresAt,
    })
}

Connection Management

Hub API

// Get client by ID
client := hub.GetClient("client-id")

// Get all clients for a user
clients := hub.GetClientsByUser("user-123")

// Kick client
hub.KickClient("client-id", "reason")

// Kick all sessions of a user
hub.KickUser("user-123", "account suspended")

// Get statistics
stats := hub.Stats()

Push Messages

// Push to specific user on specific platform
hub.PushToUser("ios", "user-123", "order.created", data)

// Push to user on all platforms
hub.PushToUserAllPlatforms("user-123", "security.alert", data)

// Push to topic subscribers
hub.PushToTopic("group:123", "message.new", data)

// Broadcast to all authenticated clients
hub.Broadcast <- message

Topic Subscription

// Client subscribes to topics
hub.Subscribe <- &websocket.SubscribeEvent{
    Client: client,
    Topics: []string{"group:123", "room:lobby"},
    Result: resultChan,
}

// Client unsubscribes
hub.Unsubscribe <- &websocket.UnsubscribeEvent{
    Client: client,
    Topics: []string{"group:123"},
}

Rate Limiting

store := middleware.NewRateLimiterStore()

router.Use(middleware.RateLimitWithStore(&middleware.RateLimitConfig{
    Default: 10, // 10 requests/second
    Methods: map[string]float64{
        "heartbeat": 0,  // No limit
        "subscribe": 5,  // 5 requests/second
    },
}, store))

// Clean up when client disconnects
hub := websocket.NewHub(
    websocket.WithClientDisconnectCallback(store.Remove),
)

Configuration

cfg := &websocket.HubConfig{
    AnonymousTimeout: 10 * time.Second,  // Disconnect if not logged in within 10s
    AnonymousCleanup: 2 * time.Second,   // Cleanup interval
    HeartbeatTimeout: 60 * time.Second,  // No heartbeat for 60s -> disconnect
    HeartbeatCleanup: 30 * time.Second,
    PingPeriod:       54 * time.Second,  // WebSocket ping interval
    PongWait:         60 * time.Second,
    MaxMessageSize:   4096,
    WriteWait:        10 * time.Second,
    MaxConnections:   10000,             // Max total connections (0 = unlimited)
    MaxConnsPerUser:  5,                 // Max connections per user (0 = unlimited)
}

// Validate config before use
if err := cfg.Validate(); err != nil {
    log.Fatal(err)
}

hub := websocket.NewHubWithConfig(cfg)

Prometheus Metrics

import "github.com/prometheus/client_golang/prometheus"

// Create and register metrics
metrics := websocket.NewMetrics("myapp", "websocket")
metrics.MustRegister(prometheus.DefaultRegisterer)

// Attach metrics to hub
hub := websocket.NewHub(websocket.WithMetrics(metrics))

// Available metrics:
// - myapp_websocket_connections_total
// - myapp_websocket_connections_current
// - myapp_websocket_connections_authenticated
// - myapp_websocket_connections_anonymous
// - myapp_websocket_messages_sent_total
// - myapp_websocket_broadcasts_total
// - myapp_websocket_errors_total{type="connection_limit|user_limit|..."}
// - myapp_websocket_topics_current
// - myapp_websocket_subscriptions_total

Connection Limits

// Check before accepting connection (optional, for early rejection)
if !hub.CanAcceptConnection() {
    http.Error(w, "Too many connections", http.StatusServiceUnavailable)
    return
}

// Check before login (optional)
if !hub.CanUserConnect(userID) {
    return c.Error(errors.New(429, "TooManyConnections", "Max connections reached"))
}

// Limits are also enforced automatically in hub

Graceful Shutdown

ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)

// Handle shutdown signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh

// Cancel context to trigger graceful shutdown
// Hub will notify all clients before closing
cancel()

Examples

See examples/basic for a complete working example with:

  • Hub configuration and validation
  • Prometheus metrics integration
  • Rate limiting middleware
  • Public and private route groups
  • Connection limits
  • Graceful shutdown

Error Code Mapping

HTTP Status JSON-RPC Code Description
400 -32602 Invalid params
401 -32001 Unauthorized
403 -32003 Permission denied
404 -32004 Not found
429 -32029 Too many requests
500 -32603 Internal error

Performance

Benchmark Results

Tested on Apple M1 Pro:

Operation Latency Allocations
Broadcast (1000 clients) ~1.7μs 0 allocs
Subscribe ~1.3μs 10 allocs
PushToTopic (100 clients) ~6.3μs 7 allocs
Register/Unregister ~2.8μs 9 allocs

Run benchmarks:

go test -bench=. -benchmem ./...

Capacity Estimation

Based on gorilla/websocket and Go runtime characteristics, single-server capacity is primarily memory-bound:

Server Config Estimated Connections Notes
4 cores, 8GB 10,000 - 30,000 Dev/test
8 cores, 16GB 50,000 - 100,000 Production entry
16 cores, 32GB 100,000 - 200,000 Mid-size production
32 cores, 64GB 200,000 - 500,000 Large-scale production

Memory estimate: ~20-30KB per connection (2 goroutines, read/write buffers, application data)

Use Cases

Recommended:

  • Instant messaging (IM)
  • Real-time notifications
  • Online collaboration (docs, whiteboard)
  • Live data display (stocks, monitoring)
  • Game state synchronization

⚠️ Requires additional optimization:

  • Ultra-large scale (1M+ connections): Consider async I/O libraries like gnet or nbio
  • Ultra-high frequency (100K+ msg/s): Consider message batching and compression

Production Tuning

Linux Kernel Parameters

# /etc/sysctl.conf

# Increase file descriptor limit
fs.file-max = 1000000

# TCP connection optimization
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
net.core.netdev_max_backlog = 65535

# Memory optimization
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216

Process Limits

# /etc/security/limits.conf
* soft nofile 1000000
* hard nofile 1000000

Related

License

Apache License 2.0

About

Production-ready WebSocket framework for Go with JSON-RPC 2.0, middleware, grouped routing, and connection management

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages