Skip to content

Commit cb3ce0d

Browse files
DanieleFedeliDaniele Fedeli
andauthored
Add #injectWS (#276)
* Add #injectWS Now is possible to invoke an websocket handler without listening * Up-to-date documentation Added testing paragraph * Removed unused dependency * remove ws.terminate() in the plugin The user must manually close the ws in the test * add upgradeContext parameter in injectWS It allows to enhance the request made for upgrading the socket * Rejects if websocket upgrade failed * remove useless line in docs * rejects with 'Unexpected server response: <statusCode>' Implementation as close as possibile to ws connectiong error * Fix test * Fix types --------- Signed-off-by: Daniele Fedeli <danieligno10@gmail.com> Co-authored-by: Daniele Fedeli <danielefedeli@MacBook-Air.station>
1 parent a5ef710 commit cb3ce0d

File tree

5 files changed

+270
-6
lines changed

5 files changed

+270
-6
lines changed

README.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,78 @@ fastify.register(require('@fastify/websocket'), {
256256
})
257257
```
258258

259+
### Testing
260+
261+
Testing the ws handler can be quite tricky, luckily `fastify-websocket` decorates fastify instance with `injectWS`.
262+
It allows to test easily a websocket endpoint.
263+
264+
The signature of injectWS is the following: `([path], [upgradeContext])`.
265+
266+
#### App.js
267+
268+
```js
269+
'use strict'
270+
271+
const Fastify = require('fastify')
272+
const FastifyWebSocket = require('@fastify/websocket')
273+
274+
const App = Fastify()
275+
276+
App.register(FastifyWebSocket);
277+
278+
App.register(async function(fastify) {
279+
fastify.addHook('preValidation', async (request, reply) => {
280+
if (request.headers['api-key'] !== 'some-random-key') {
281+
return reply.code(401).send()
282+
}
283+
})
284+
285+
fastify.get('/', { websocket: true }, (connection) => {
286+
connection.socket.on('message', message => {
287+
connection.socket.send('hi from server')
288+
})
289+
})
290+
})
291+
292+
module.exports = App
293+
```
294+
295+
#### App.test.js
296+
297+
```js
298+
'use strict'
299+
300+
const { test } = require('tap')
301+
const Fastify = require('fastify')
302+
const App = require('./app.js')
303+
304+
test('connect to /', async (t) => {
305+
t.plan(1)
306+
307+
const fastify = Fastify()
308+
fastify.register(App)
309+
t.teardown(fastify.close.bind(fastify))
310+
311+
const ws = await fastify.injectWS('/', {headers: { "api-key" : "some-random-key" }})
312+
let resolve;
313+
const promise = new Promise(r => { resolve = r })
314+
315+
ws.on('message', (data) => {
316+
resolve(data.toString());
317+
})
318+
ws.send('hi from client')
319+
320+
t.assert(await promise, 'hi from server')
321+
// Remember to close the ws at the end
322+
ws.terminate()
323+
})
324+
```
325+
326+
#### Things to know
327+
- Websocket need to be closed manually at the end of each test.
328+
- `fastify.ready()` needs to be awaited to ensure that fastify has been decorated.
329+
- You need to register the event listener before sending the message if you need to process server response.
330+
259331
## Options
260332

261333
`@fastify/websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) :

index.js

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
'use strict'
22

33
const { ServerResponse } = require('node:http')
4+
const { PassThrough } = require('node:stream')
5+
const { randomBytes } = require('node:crypto')
46
const fp = require('fastify-plugin')
57
const WebSocket = require('ws')
8+
const Duplexify = require('duplexify')
69

710
const kWs = Symbol('ws-socket')
811
const kWsHead = Symbol('ws-head')
@@ -47,6 +50,60 @@ function fastifyWebsocket (fastify, opts, next) {
4750
const wss = new WebSocket.Server(wssOptions)
4851
fastify.decorate('websocketServer', wss)
4952

53+
async function injectWS (path = '/', upgradeContext = {}) {
54+
const server2Client = new PassThrough()
55+
const client2Server = new PassThrough()
56+
57+
const serverStream = new Duplexify(server2Client, client2Server)
58+
const clientStream = new Duplexify(client2Server, server2Client)
59+
60+
const ws = new WebSocket(null, undefined, { isServer: false })
61+
const head = Buffer.from([])
62+
63+
let resolve, reject
64+
const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject })
65+
66+
ws.on('open', () => {
67+
clientStream.removeListener('data', onData)
68+
resolve(ws)
69+
})
70+
71+
const onData = (chunk) => {
72+
if (chunk.toString().includes('HTTP/1.1 101 Switching Protocols')) {
73+
ws._isServer = false
74+
ws.setSocket(clientStream, head, { maxPayload: 0 })
75+
} else {
76+
clientStream.removeListener('data', onData)
77+
const statusCode = Number(chunk.toString().match(/HTTP\/1.1 (\d+)/)[1])
78+
reject(new Error('Unexpected server response: ' + statusCode))
79+
}
80+
}
81+
82+
clientStream.on('data', onData)
83+
84+
const req = {
85+
...upgradeContext,
86+
method: 'GET',
87+
headers: {
88+
...upgradeContext.headers,
89+
connection: 'upgrade',
90+
upgrade: 'websocket',
91+
'sec-websocket-version': 13,
92+
'sec-websocket-key': randomBytes(16).toString('base64')
93+
},
94+
httpVersion: '1.1',
95+
url: path,
96+
[kWs]: serverStream,
97+
[kWsHead]: head
98+
}
99+
100+
websocketListenServer.emit('upgrade', req, req[kWs], req[kWsHead])
101+
102+
return promise
103+
}
104+
105+
fastify.decorate('injectWS', injectWS)
106+
50107
function onUpgrade (rawRequest, socket, head) {
51108
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
52109
rawRequest[kWs] = socket
@@ -164,6 +221,7 @@ function fastifyWebsocket (fastify, opts, next) {
164221
client.close()
165222
}
166223
}
224+
167225
fastify.server.removeListener('upgrade', onUpgrade)
168226

169227
server.close(done)
@@ -181,7 +239,7 @@ function fastifyWebsocket (fastify, opts, next) {
181239
// Since we already handled the error, adding this listener prevents the ws
182240
// library from emitting the error and causing an uncaughtException
183241
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
184-
conn.on('error', _ => {})
242+
conn.on('error', _ => { })
185243
request.log.error(error)
186244
conn.destroy(error)
187245
}

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"tsd": "^0.30.1"
3939
},
4040
"dependencies": {
41+
"duplexify": "^4.1.2",
4142
"fastify-plugin": "^4.0.0",
4243
"ws": "^8.0.0"
4344
},

test/inject.test.js

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
'use strict'
2+
3+
const { test } = require('tap')
4+
const Fastify = require('fastify')
5+
const fastifyWebsocket = require('..')
6+
7+
function buildFastify (t) {
8+
const fastify = Fastify()
9+
t.teardown(() => { fastify.close() })
10+
fastify.register(fastifyWebsocket)
11+
return fastify
12+
}
13+
14+
test('routes correctly the message', async (t) => {
15+
const fastify = buildFastify(t)
16+
const message = 'hi from client'
17+
18+
let _resolve
19+
const promise = new Promise((resolve) => { _resolve = resolve })
20+
21+
fastify.register(
22+
async function (instance) {
23+
instance.get('/ws', { websocket: true }, function (conn) {
24+
conn.once('data', chunk => {
25+
_resolve(chunk.toString())
26+
})
27+
})
28+
})
29+
30+
await fastify.ready()
31+
const ws = await fastify.injectWS('/ws')
32+
ws.send(message)
33+
t.same(await promise, message)
34+
ws.terminate()
35+
})
36+
37+
test('redirect on / if no path specified', async (t) => {
38+
const fastify = buildFastify(t)
39+
const message = 'hi from client'
40+
41+
let _resolve
42+
const promise = new Promise((resolve) => { _resolve = resolve })
43+
44+
fastify.register(
45+
async function (instance) {
46+
instance.get('/', { websocket: true }, function (conn) {
47+
conn.once('data', chunk => {
48+
_resolve(chunk.toString())
49+
})
50+
})
51+
})
52+
53+
await fastify.ready()
54+
const ws = await fastify.injectWS()
55+
ws.send(message)
56+
t.same(await promise, message)
57+
ws.terminate()
58+
})
59+
60+
test('routes correctly the message between two routes', async (t) => {
61+
const fastify = buildFastify(t)
62+
const message = 'hi from client'
63+
64+
let _resolve
65+
let _reject
66+
const promise = new Promise((resolve, reject) => { _resolve = resolve; _reject = reject })
67+
68+
fastify.register(
69+
async function (instance) {
70+
instance.get('/ws', { websocket: true }, function (conn) {
71+
conn.once('data', () => {
72+
_reject('wrong-route')
73+
})
74+
})
75+
76+
instance.get('/ws-2', { websocket: true }, function (conn) {
77+
conn.once('data', chunk => {
78+
_resolve(chunk.toString())
79+
})
80+
})
81+
})
82+
83+
await fastify.ready()
84+
const ws = await fastify.injectWS('/ws-2')
85+
ws.send(message)
86+
t.same(await promise, message)
87+
ws.terminate()
88+
})
89+
90+
test('use the upgrade context to upgrade if there is some hook', async (t) => {
91+
const fastify = buildFastify(t)
92+
const message = 'hi from client'
93+
94+
let _resolve
95+
const promise = new Promise((resolve) => { _resolve = resolve })
96+
97+
fastify.register(
98+
async function (instance) {
99+
instance.addHook('preValidation', async (request, reply) => {
100+
if (request.headers['api-key'] !== 'some-random-key') {
101+
return reply.code(401).send()
102+
}
103+
})
104+
105+
instance.get('/', { websocket: true }, function (conn) {
106+
conn.once('data', chunk => {
107+
_resolve(chunk.toString())
108+
})
109+
})
110+
})
111+
112+
await fastify.ready()
113+
const ws = await fastify.injectWS('/', { headers: { 'api-key': 'some-random-key' } })
114+
ws.send(message)
115+
t.same(await promise, message)
116+
ws.terminate()
117+
})
118+
119+
test('rejects if the websocket is not upgraded', async (t) => {
120+
const fastify = buildFastify(t)
121+
122+
fastify.register(
123+
async function (instance) {
124+
instance.addHook('preValidation', async (request, reply) => {
125+
return reply.code(401).send()
126+
})
127+
128+
instance.get('/', { websocket: true }, function (conn) {
129+
})
130+
})
131+
132+
await fastify.ready()
133+
t.rejects(fastify.injectWS('/'), 'Unexpected server response: 401')
134+
})

types/index.d.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ declare module 'fastify' {
2727
websocket?: boolean;
2828
}
2929

30+
type InjectWSFn<RawRequest> =
31+
((path?: string, upgradeContext?: Partial<RawRequest>) => Promise<WebSocket>)
32+
3033
interface FastifyInstance<RawServer, RawRequest, RawReply, Logger, TypeProvider> {
31-
get: RouteShorthandMethod<RawServer, RawRequest, RawReply, TypeProvider, Logger>,
3234
websocketServer: WebSocket.Server,
35+
injectWS: InjectWSFn<RawRequest>
3336
}
3437

3538
interface FastifyRequest {
@@ -67,7 +70,6 @@ type FastifyWebsocket = FastifyPluginCallback<fastifyWebsocket.WebsocketPluginOp
6770
declare namespace fastifyWebsocket {
6871

6972
interface WebSocketServerOptions extends Omit<WebSocket.ServerOptions, "path"> { }
70-
7173
export type WebsocketHandler<
7274
RawServer extends RawServerBase = RawServerDefault,
7375
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,
@@ -81,18 +83,15 @@ declare namespace fastifyWebsocket {
8183
connection: SocketStream,
8284
request: FastifyRequest<RequestGeneric, RawServer, RawRequest, SchemaCompiler, TypeProvider, ContextConfig, Logger>
8385
) => void | Promise<any>;
84-
8586
export interface SocketStream extends Duplex {
8687
socket: WebSocket;
8788
}
88-
8989
export interface WebsocketPluginOptions {
9090
errorHandler?: (this: FastifyInstance, error: Error, connection: SocketStream, request: FastifyRequest, reply: FastifyReply) => void;
9191
options?: WebSocketServerOptions;
9292
connectionOptions?: DuplexOptions;
9393
preClose?: preCloseHookHandler | preCloseAsyncHookHandler;
9494
}
95-
9695
export interface RouteOptions<
9796
RawServer extends RawServerBase = RawServerDefault,
9897
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,

0 commit comments

Comments
 (0)