This repository was archived by the owner on Apr 15, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
308 lines (257 loc) · 8.17 KB
/
main.go
File metadata and controls
308 lines (257 loc) · 8.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
// Package main implements a shared wspulse test server for non-Go client
// integration tests. It exposes two local ports:
//
// - WebSocket port — echo server with query-param-controlled behaviour:
// ?reject=1 → ConnectFunc returns an error (HTTP 401)
// ?room=<id> → assigns connection to room <id> (default: "test")
// ?id=<id> → sets connectionID (default: auto-generated UUID)
// ?ignore_pings=1 → raw echo handler that suppresses Pong replies
// (bypasses wspulse/server; used to test pong-timeout)
//
// - Control port — HTTP API for test orchestration:
// GET /health → 200 OK
// POST /kick → kick a connection by ?id=<connectionID>
// POST /shutdown → close WebSocket server + listener
// POST /restart → restart WebSocket server on the same port
//
// On startup, the server prints "READY:<ws_port>:<control_port>" to stderr.
// Client test harnesses parse this line to discover both ports.
package main
import (
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"github.com/gorilla/websocket"
"go.uber.org/zap"
wspulse "github.com/wspulse/server"
)
func main() {
logger := zap.Must(zap.NewDevelopment())
ts := newTestServer(logger)
wsPort, err := ts.startWebSocket()
if err != nil {
logger.Fatal("failed to start WebSocket listener", zap.Error(err))
}
controlPort, err := ts.startControl()
if err != nil {
logger.Fatal("failed to start control listener", zap.Error(err))
}
fmt.Fprintf(os.Stderr, "READY:%d:%d\n", wsPort, controlPort)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
logger.Info("shutting down")
ts.close()
}
// testServer encapsulates the dual-port server state.
type testServer struct {
logger *zap.Logger
mu sync.Mutex
server wspulse.Server
wsListener net.Listener
wsPort int // fixed port for restart
wsServing chan struct{}
}
func newTestServer(logger *zap.Logger) *testServer {
return &testServer{logger: logger}
}
func (ts *testServer) newWSServer() wspulse.Server {
return wspulse.NewServer(
func(r *http.Request) (roomID, connectionID string, err error) {
if r.URL.Query().Get("reject") == "1" {
return "", "", fmt.Errorf("rejected by test server")
}
room := r.URL.Query().Get("room")
if room == "" {
room = "test"
}
return room, r.URL.Query().Get("id"), nil
},
wspulse.WithOnMessage(func(connection wspulse.Connection, f wspulse.Frame) {
if err := connection.Send(f); err != nil {
ts.logger.Warn("echo send failed", zap.Error(err))
}
}),
wspulse.WithLogger(ts.logger),
wspulse.WithMaxMessageSize(1<<20),
)
}
// startWebSocket creates the wspulse server and starts serving on an
// ephemeral port. Returns the port number.
func (ts *testServer) startWebSocket() (int, error) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.server = ts.newWSServer()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, err
}
ts.wsListener = ln
ts.wsPort = ln.Addr().(*net.TCPAddr).Port
ts.wsServing = make(chan struct{})
srv := ts.server
go func() {
defer close(ts.wsServing)
if err := http.Serve(ln, ts.wrapHandler(srv)); err != nil {
ts.logger.Debug("ws http.Serve exited", zap.Error(err))
}
}()
return ts.wsPort, nil
}
// startControl starts the HTTP control server on an ephemeral port.
// Returns the port number.
func (ts *testServer) startControl() (int, error) {
mux := http.NewServeMux()
mux.HandleFunc("GET /health", ts.handleHealth)
mux.HandleFunc("POST /kick", ts.handleKick)
mux.HandleFunc("POST /shutdown", ts.handleShutdown)
mux.HandleFunc("POST /restart", ts.handleRestart)
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, err
}
go func() {
if err := http.Serve(ln, mux); err != nil {
ts.logger.Debug("control http.Serve exited", zap.Error(err))
}
}()
return ln.Addr().(*net.TCPAddr).Port, nil
}
func (ts *testServer) close() {
ts.mu.Lock()
defer ts.mu.Unlock()
if ts.server != nil {
ts.server.Close()
ts.server = nil
}
if ts.wsListener != nil {
_ = ts.wsListener.Close()
<-ts.wsServing
ts.wsListener = nil
}
}
// ── control handlers ────────────────────────────────────────────────────────
func (ts *testServer) handleHealth(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, response{OK: true})
}
func (ts *testServer) handleKick(w http.ResponseWriter, r *http.Request) {
connectionID := r.URL.Query().Get("id")
if connectionID == "" {
writeJSON(w, http.StatusBadRequest, response{OK: false, Error: "missing ?id= parameter"})
return
}
ts.mu.Lock()
srv := ts.server
ts.mu.Unlock()
if srv == nil {
writeJSON(w, http.StatusServiceUnavailable, response{OK: false, Error: "server is shut down"})
return
}
if err := srv.Kick(connectionID); err != nil {
writeJSON(w, http.StatusBadRequest, response{OK: false, Error: err.Error()})
return
}
writeJSON(w, http.StatusOK, response{OK: true})
}
func (ts *testServer) handleShutdown(w http.ResponseWriter, _ *http.Request) {
ts.mu.Lock()
if ts.server == nil {
ts.mu.Unlock()
writeJSON(w, http.StatusOK, response{OK: true, Error: "already shut down"})
return
}
srv := ts.server
ln := ts.wsListener
serving := ts.wsServing
ts.server = nil
ts.wsListener = nil
ts.mu.Unlock()
srv.Close()
if ln != nil {
_ = ln.Close()
<-serving
}
writeJSON(w, http.StatusOK, response{OK: true})
}
func (ts *testServer) handleRestart(w http.ResponseWriter, _ *http.Request) {
ts.mu.Lock()
defer ts.mu.Unlock()
// Clean up previous instance if still running.
if ts.server != nil {
ts.server.Close()
}
if ts.wsListener != nil {
_ = ts.wsListener.Close()
<-ts.wsServing
}
ts.server = ts.newWSServer()
// Rebind to the same port so client URLs remain valid.
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", ts.wsPort))
if err != nil {
writeJSON(w, http.StatusInternalServerError, response{OK: false, Error: err.Error()})
return
}
ts.wsListener = ln
ts.wsServing = make(chan struct{})
srv := ts.server
go func() {
defer close(ts.wsServing)
if err := http.Serve(ln, ts.wrapHandler(srv)); err != nil {
ts.logger.Debug("ws http.Serve exited (restart)", zap.Error(err))
}
}()
writeJSON(w, http.StatusOK, response{OK: true})
}
// ── ignore_pings handler ────────────────────────────────────────────────────
// wrapHandler intercepts ?ignore_pings=1 connections before they reach the
// wspulse server. All other requests are forwarded to srv.
func (ts *testServer) wrapHandler(srv wspulse.Server) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Query().Get("ignore_pings") == "1" {
ts.handleIgnorePings(w, r)
return
}
srv.ServeHTTP(w, r)
})
}
// handleIgnorePings upgrades to WebSocket and echoes messages without
// responding to Ping frames. This lets clients test pong-timeout detection.
func (ts *testServer) handleIgnorePings(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(*http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
ts.logger.Error("ignore_pings upgrade failed", zap.Error(err))
return
}
defer func() { _ = conn.Close() }()
// Suppress automatic Pong replies.
conn.SetPingHandler(func(string) error { return nil })
for {
mt, message, readErr := conn.ReadMessage()
if readErr != nil {
break
}
if writeErr := conn.WriteMessage(mt, message); writeErr != nil {
break
}
}
}
// ── JSON helpers ────────────────────────────────────────────────────────────
type response struct {
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
}
func writeJSON(w http.ResponseWriter, status int, v response) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}