A production-ready WebSocket framework for Go using JSON-RPC 2.0 protocol with middleware support, grouped routing, and connection management. Part of the Bingo ecosystem.
📖 Documentation: bingoctl.dev/en/advanced/websocket
- JSON-RPC 2.0 Protocol - Industry-standard message format (used by MCP, Ethereum, VSCode LSP)
- Middleware Pattern - Familiar programming model like Gin/Echo
- Grouped Routing - Support public/private groups with different middleware chains
- Connection Management - Hub for client registration, authentication, and topic subscriptions
- Built-in Handlers - Heartbeat, subscribe/unsubscribe out of the box
- Rate Limiting - Token bucket algorithm with per-method configuration
- Single Device Login - Automatic session kick when same user logs in from another device
- Prometheus Metrics - Built-in observability with connection, message, and error metrics
- Connection Limits - Configurable total and per-user connection limits
- Graceful Shutdown - Clean shutdown with client notification
go get github.com/bingo-project/websocketpackage main
import (
"context"
"log"
"net/http"
"github.com/bingo-project/websocket"
"github.com/bingo-project/websocket/jsonrpc"
"github.com/bingo-project/websocket/middleware"
gorillaWS "github.com/gorilla/websocket"
)
func main() {
// Create hub and router
hub := websocket.NewHub()
router := websocket.NewRouter()
// Add global middleware
router.Use(
middleware.Recovery,
middleware.RequestID,
middleware.Logger,
)
// Public methods (no auth required)
public := router.Group()
public.Handle("heartbeat", websocket.HeartbeatHandler)
public.Handle("echo", func(c *websocket.Context) *jsonrpc.Response {
return c.JSON(c.Request.Params)
})
// Private methods (require auth)
private := router.Group(middleware.Auth)
private.Handle("subscribe", websocket.SubscribeHandler)
// Start hub
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go hub.Run(ctx)
// WebSocket upgrader
upgrader := gorillaWS.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// HTTP handler
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
client := websocket.NewClient(hub, conn, context.Background(),
websocket.WithRouter(router),
)
hub.Register <- client
go client.WritePump()
go client.ReadPump()
})
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}{
"jsonrpc": "2.0",
"method": "auth.login",
"params": {"username": "test", "password": "123456"},
"id": 1
}{
"jsonrpc": "2.0",
"result": {"token": "xxx", "expiresAt": 1234567890},
"id": 1
}{
"jsonrpc": "2.0",
"error": {
"code": -32001,
"reason": "Unauthorized",
"message": "Login required"
},
"id": 1
}{
"jsonrpc": "2.0",
"method": "session.kicked",
"params": {"reason": "Account logged in elsewhere"}
}| Middleware | Description |
|---|---|
Recovery / RecoveryWithLogger |
Catch panics, return 500 error |
RequestID |
Inject request-id into context |
Logger / LoggerWithLogger |
Log requests and latency |
Auth |
Verify user is authenticated |
RateLimit / RateLimitWithStore |
Token bucket rate limiting |
LoginStateUpdater |
Update client state after login |
func MyMiddleware(next websocket.Handler) websocket.Handler {
return func(c *websocket.Context) *jsonrpc.Response {
// Before handler
log.Printf("Method: %s", c.Method)
resp := next(c)
// After handler
return resp
}
}
router.Use(MyMiddleware)func Login(c *websocket.Context) *jsonrpc.Response {
var req LoginRequest
if err := c.BindValidate(&req); err != nil {
return c.Error(errors.New(400, "InvalidParams", err.Error()))
}
// Business logic...
token := authenticate(req.Username, req.Password)
// Update client login state
c.Client.NotifyLogin(userID, req.Platform, tokenExpiresAt)
return c.JSON(map[string]any{
"token": token,
"expiresAt": tokenExpiresAt,
})
}// Get client by ID
client := hub.GetClient("client-id")
// Get all clients for a user
clients := hub.GetClientsByUser("user-123")
// Kick client
hub.KickClient("client-id", "reason")
// Kick all sessions of a user
hub.KickUser("user-123", "account suspended")
// Get statistics
stats := hub.Stats()// Push to specific user on specific platform
hub.PushToUser("ios", "user-123", "order.created", data)
// Push to user on all platforms
hub.PushToUserAllPlatforms("user-123", "security.alert", data)
// Push to topic subscribers
hub.PushToTopic("group:123", "message.new", data)
// Broadcast to all authenticated clients
hub.Broadcast <- message// Client subscribes to topics
hub.Subscribe <- &websocket.SubscribeEvent{
Client: client,
Topics: []string{"group:123", "room:lobby"},
Result: resultChan,
}
// Client unsubscribes
hub.Unsubscribe <- &websocket.UnsubscribeEvent{
Client: client,
Topics: []string{"group:123"},
}store := middleware.NewRateLimiterStore()
router.Use(middleware.RateLimitWithStore(&middleware.RateLimitConfig{
Default: 10, // 10 requests/second
Methods: map[string]float64{
"heartbeat": 0, // No limit
"subscribe": 5, // 5 requests/second
},
}, store))
// Clean up when client disconnects
hub := websocket.NewHub(
websocket.WithClientDisconnectCallback(store.Remove),
)cfg := &websocket.HubConfig{
AnonymousTimeout: 10 * time.Second, // Disconnect if not logged in within 10s
AnonymousCleanup: 2 * time.Second, // Cleanup interval
HeartbeatTimeout: 60 * time.Second, // No heartbeat for 60s -> disconnect
HeartbeatCleanup: 30 * time.Second,
PingPeriod: 54 * time.Second, // WebSocket ping interval
PongWait: 60 * time.Second,
MaxMessageSize: 4096,
WriteWait: 10 * time.Second,
MaxConnections: 10000, // Max total connections (0 = unlimited)
MaxConnsPerUser: 5, // Max connections per user (0 = unlimited)
}
// Validate config before use
if err := cfg.Validate(); err != nil {
log.Fatal(err)
}
hub := websocket.NewHubWithConfig(cfg)import "github.com/prometheus/client_golang/prometheus"
// Create and register metrics
metrics := websocket.NewMetrics("myapp", "websocket")
metrics.MustRegister(prometheus.DefaultRegisterer)
// Attach metrics to hub
hub := websocket.NewHub(websocket.WithMetrics(metrics))
// Available metrics:
// - myapp_websocket_connections_total
// - myapp_websocket_connections_current
// - myapp_websocket_connections_authenticated
// - myapp_websocket_connections_anonymous
// - myapp_websocket_messages_sent_total
// - myapp_websocket_broadcasts_total
// - myapp_websocket_errors_total{type="connection_limit|user_limit|..."}
// - myapp_websocket_topics_current
// - myapp_websocket_subscriptions_total// Check before accepting connection (optional, for early rejection)
if !hub.CanAcceptConnection() {
http.Error(w, "Too many connections", http.StatusServiceUnavailable)
return
}
// Check before login (optional)
if !hub.CanUserConnect(userID) {
return c.Error(errors.New(429, "TooManyConnections", "Max connections reached"))
}
// Limits are also enforced automatically in hubctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
// Handle shutdown signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
// Cancel context to trigger graceful shutdown
// Hub will notify all clients before closing
cancel()See examples/basic for a complete working example with:
- Hub configuration and validation
- Prometheus metrics integration
- Rate limiting middleware
- Public and private route groups
- Connection limits
- Graceful shutdown
| HTTP Status | JSON-RPC Code | Description |
|---|---|---|
| 400 | -32602 | Invalid params |
| 401 | -32001 | Unauthorized |
| 403 | -32003 | Permission denied |
| 404 | -32004 | Not found |
| 429 | -32029 | Too many requests |
| 500 | -32603 | Internal error |
Tested on Apple M1 Pro:
| Operation | Latency | Allocations |
|---|---|---|
| Broadcast (1000 clients) | ~1.7μs | 0 allocs |
| Subscribe | ~1.3μs | 10 allocs |
| PushToTopic (100 clients) | ~6.3μs | 7 allocs |
| Register/Unregister | ~2.8μs | 9 allocs |
Run benchmarks:
go test -bench=. -benchmem ./...Based on gorilla/websocket and Go runtime characteristics, single-server capacity is primarily memory-bound:
| Server Config | Estimated Connections | Notes |
|---|---|---|
| 4 cores, 8GB | 10,000 - 30,000 | Dev/test |
| 8 cores, 16GB | 50,000 - 100,000 | Production entry |
| 16 cores, 32GB | 100,000 - 200,000 | Mid-size production |
| 32 cores, 64GB | 200,000 - 500,000 | Large-scale production |
Memory estimate: ~20-30KB per connection (2 goroutines, read/write buffers, application data)
✅ Recommended:
- Instant messaging (IM)
- Real-time notifications
- Online collaboration (docs, whiteboard)
- Live data display (stocks, monitoring)
- Game state synchronization
- Ultra-large scale (1M+ connections): Consider async I/O libraries like gnet or nbio
- Ultra-high frequency (100K+ msg/s): Consider message batching and compression
# /etc/sysctl.conf
# Increase file descriptor limit
fs.file-max = 1000000
# TCP connection optimization
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
net.core.netdev_max_backlog = 65535
# Memory optimization
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216# /etc/security/limits.conf
* soft nofile 1000000
* hard nofile 1000000- Bingo Protocol Layer - Using WebSocket as a pluggable protocol in Bingo
Apache License 2.0