A minimal, production-ready WebSocket server for Go with room-based routing, session resumption, and pluggable codecs.
Status: v0 — API is being stabilized. Module path: github.com/wspulse/server. Package name: wspulse.
- Thin transport layer: no business logic, no auth, no message history
- Three public concepts:
Server,Connection,Frame - Plug in any HTTP router via
http.Handler - Swappable codecs: JSON (default) or binary (e.g. Protobuf)
- Transparent session resumption with configurable grace window
go get github.com/wspulse/serverimport "github.com/wspulse/server" // package name: wspulse
srv := wspulse.NewServer(
// ConnectFunc: authenticate and assign room + connection IDs
func(r *http.Request) (roomID, connectionID string, err error) {
token := r.URL.Query().Get("token")
userID, err := myauth.Verify(token)
if err != nil {
return "", "", err // → HTTP 401
}
return r.URL.Query().Get("room"), userID, nil
},
wspulse.WithOnMessage(func(connection wspulse.Connection, f wspulse.Frame) {
// echo back to the whole room
srv.Broadcast(connection.RoomID(), f)
}),
wspulse.WithOnDisconnect(func(connection wspulse.Connection, err error) {
log.Printf("disconnected: %s", connection.ID())
}),
wspulse.WithHeartbeat(10*time.Second, 30*time.Second),
wspulse.WithResumeWindow(30*time.Second),
)
// Standard library
http.Handle("/ws", srv)
// Gin
router.GET("/ws", func(c *gin.Context) {
srv.ServeHTTP(c.Writer, c.Request)
})// Send to a specific connection
srv.Send(connectionID, wspulse.Frame{Event: "sys", Payload: []byte(`{"event":"welcome"}`)})
// Broadcast to a room
payload, _ := json.Marshal(myMessage)
srv.Broadcast(roomID, wspulse.Frame{Event: "msg", Payload: payload})
// Kick a connection
srv.Kick(connectionID)
// List connections in a room
connections := srv.GetConnections(roomID)core/router provides Gin-style middleware and per-event handler dispatch. wspulse.Connection satisfies router.Connection via structural subtyping — no adaptor needed.
import (
"github.com/wspulse/server" // package name: wspulse
"github.com/wspulse/core/router"
)
rtr := router.New()
rtr.Use(router.Recovery()) // recover from panics in handlers
rtr.On("chat.message", func(c *router.Context) {
srv.Broadcast(c.Connection.RoomID(), c.Frame)
})
rtr.On("chat.join", func(c *router.Context) {
welcome := wspulse.Frame{Event: "chat.welcome", Payload: []byte(`"hello"`)}
_ = c.Connection.Send(welcome)
})
var srv wspulse.Server
srv = wspulse.NewServer(
connectFunc,
wspulse.WithOnMessage(func(conn wspulse.Connection, f wspulse.Frame) {
rtr.Dispatch(conn, f)
}),
)See wspulse/core for the full router API.
| Symbol | Description |
|---|---|
Server |
Manages sessions, heartbeats, and room routing |
Connection |
A logical WebSocket session (ID, RoomID, Send, Close, Done) |
Frame |
Transport unit (ID, Event, Payload []byte) — re-exported from core |
ConnectFunc |
func(*http.Request) (roomID, connectionID string, err error) |
Codec |
Interface: Encode(Frame), Decode([]byte), FrameType() — from core |
JSONCodec |
Default codec — text frames, JSON payload — re-exported from core |
| Option | Default |
|---|---|
WithOnConnect(fn) |
— |
WithOnMessage(fn) |
— |
WithOnDisconnect(fn) |
— |
WithOnTransportDrop(fn) |
— |
WithOnTransportRestore(fn) |
— |
WithResumeWindow(d) |
0 (disabled) |
WithHeartbeat(ping, pong) |
10 s / 30 s |
WithWriteWait(d) |
10 s |
WithMaxMessageSize(n) |
512 B |
WithSendBufferSize(n) |
256 frames |
WithCodec(c) |
JSONCodec |
WithCheckOrigin(fn) |
allow all |
WithLogger(l) |
zap.NewNop() — accepts *zap.Logger |
WithMetrics(mc) |
NoopCollector — accepts MetricsCollector |
- Room-based routing — connections are partitioned into rooms; broadcast targets a single room.
- Pluggable auth —
ConnectFuncruns during HTTP Upgrade, before any WebSocket frames are exchanged. - Session resumption — opt-in via
WithResumeWindow(d). When a transport drops, the session is suspended fordbefore firingOnDisconnect. If the client reconnects with the sameconnectionIDwithin that window, the new WebSocket is swapped in transparently — noOnConnect/OnDisconnectcallbacks fire, and buffered frames are replayed in order. Disabled by default. - Automatic heartbeat — server-side Ping / Pong with configurable intervals (
WithHeartbeat). - Backpressure — bounded per-connection send buffer; oldest frame is dropped on overflow during broadcast.
- Swappable codec — JSON by default; implement the
Codecinterface to plug in any encoding (binary, Protobuf, MessagePack, etc.). - Kick —
Server.Kick(connectionID)always destroys the session immediately, bypassing the resume window. - Graceful shutdown —
Server.Close()sends close frames to all connected clients, drains in-flight registrations, and firesOnDisconnectfor every session. - Metrics — optional
MetricsCollectorinterface for observability; default isNoopCollector(minimal overhead). See Metrics below.
wspulse/server exposes a MetricsCollector interface with typed hooks for connection lifecycle, room state, throughput, backpressure, and heartbeat health. The default is NoopCollector{} (minimal overhead). Embed NoopCollector in custom implementations for forward-compatible additions.
// Use a contrib adapter (e.g. wspulse/metrics-prometheus or wspulse/metrics-otel),
// or implement MetricsCollector yourself.
var collector wspulse.MetricsCollector = myCollector
srv := wspulse.NewServer(connect,
wspulse.WithMetrics(collector),
)The interface covers:
| Hook | Description |
|---|---|
ConnectionOpened / ConnectionClosed |
Session lifecycle with duration and disconnect reason |
RoomCreated / RoomDestroyed |
Room allocation and deallocation |
MessageReceived / MessageSent / MessageBroadcast |
Throughput with byte sizes and fan-out |
FrameDropped / SendBufferUtilization |
Backpressure visibility |
ResumeAttempt |
Session resumption tracking |
PongTimeout |
Heartbeat health |
Implement MetricsCollector to plug in any backend (Prometheus, OTel, or custom). See doc/internals.md for goroutine call sites and thread safety requirements.
The router from wspulse/core integrates directly with WithOnMessage. It dispatches each incoming frame to its handler based on the "event" field in the JSON message:
{
"id": "msg-001",
"event": "chat.message",
"payload": { "text": "hello" }
}The value of "event" is frame.Event on the Go side, and is the key used to select the handler.
import (
"github.com/wspulse/server" // package name: wspulse
"github.com/wspulse/core/router"
)
var srv wspulse.Server
r := router.New()
r.Use(router.Recovery())
r.Use(func(c *router.Context) {
// middleware runs before every handler
c.Set("roomID", c.Connection.RoomID())
c.Next()
})
// matches frames where "event" == "chat.message"
r.On("chat.message", func(c *router.Context) {
srv.Broadcast(c.Connection.RoomID(), wspulse.Frame{
Event: "chat.message",
Payload: c.Frame.Payload,
})
})
// matches frames where "event" == "ping"
r.On("ping", func(c *router.Context) {
_ = c.Connection.Send(wspulse.Frame{Event: "pong"})
})
srv = wspulse.NewServer(
connectFn,
wspulse.WithOnMessage(func(conn wspulse.Connection, f wspulse.Frame) {
r.Dispatch(conn, f)
}),
)make fmt # auto-format source files (gofmt + goimports)
make check # validate format, lint, test with race detector (pre-commit gate)
make test # go test -race -count=3 ./...
make test-cover # go test with coverage report → coverage.html
make bench # run benchmarks with memory allocation stats
make tidy # go mod tidy (GOWORK=off)
make clean # remove build artifacts and test cache- wspulse/core - Shared types and codecs
- wspulse/client-go - Go client library with automatic reconnection and session resumption