|
1 | | -// This is an example of streaming files |
| 1 | +/* This is an example of streaming files */ |
2 | 2 |
|
3 | 3 | const uWS = require('../dist/uws.js'); |
4 | 4 | const fs = require('fs'); |
5 | | - |
6 | 5 | const port = 9001; |
7 | | -const bigFileName = 'absolutPathTo/bigVideo.mp3'; |
| 6 | + |
| 7 | +const smallFileType = 'application/json'; |
| 8 | +const smallFileName = 'absolutPathTo/smallFile.json'; |
| 9 | +const smallFileCachedBuffer = fs.readFileSync(smallFileName); |
| 10 | +console.log('Small file size is: '+ smallFileCachedBuffer.length +' bytes'); |
| 11 | + |
| 12 | +const bigFileType = 'video/mpeg'; |
| 13 | +const bigFileName = 'absolutPathTo/bigFile.mp3'; |
8 | 14 | const bigFileSize = fs.statSync(bigFileName).size; |
9 | | -console.log('Video size is: '+ bigFileSize +' bytes'); |
| 15 | +console.log('Big file size is: '+ bigFileSize +' bytes'); |
| 16 | + |
| 17 | +let lastStreamIndex = 0; |
| 18 | +let openStreams = 0; |
10 | 19 |
|
11 | | -// Stream data to res |
| 20 | +/* Helper function to stream data */ |
12 | 21 | /** @param {import('node:Stream').Readable} readStream */ |
13 | | -const streamData = (res, readStream, totalSize, onFinished) => { |
14 | | - let chunkBuffer; // Actual chunk being streamed |
15 | | - let totalOffset = 0; // Actual chunk offset |
| 22 | +const streamData = (res, readStream, totalSize, onSucceed) => { |
| 23 | + let chunkBuffer; /* Actual chunk being streamed */ |
| 24 | + let totalOffset = 0; /* Actual chunk offset */ |
| 25 | + |
| 26 | + /* Send actual chunk to client */ |
16 | 27 | const sendChunkBuffer = () => { |
17 | 28 | const [ok, done] = res.tryEnd(chunkBuffer, totalSize); |
18 | 29 | if (done) { |
19 | | - // Streaming finished |
| 30 | + /* Streaming finished */ |
20 | 31 | readStream.destroy(); |
21 | | - onFinished(); |
| 32 | + onSucceed(); |
22 | 33 | } else if (ok) { |
23 | | - // Chunk send success |
| 34 | + /* Chunk send succeed */ |
24 | 35 | totalOffset += chunkBuffer.length; |
25 | | - // Resume stream if it was paused |
| 36 | + /* Resume stream if it was paused */ |
26 | 37 | readStream.resume(); |
27 | 38 | } else { |
28 | | - // Chunk send failed (client backpressure) |
29 | | - // onWritable will be called once client ready to receive new chunk |
30 | | - // Pause stream |
| 39 | + /* Chunk send failed (client backpressure) |
| 40 | + * onWritable will be called once client ready to receive new chunk |
| 41 | + * Pause stream to wait client */ |
31 | 42 | readStream.pause(); |
32 | 43 | } |
33 | 44 | return ok; |
34 | 45 | }; |
35 | 46 |
|
36 | | - // Attach onAborted handler because streaming is async |
37 | | - res.onAborted(() => { |
38 | | - readStream.destroy(); |
39 | | - onFinished(); |
40 | | - }); |
41 | | - |
42 | | - // Register onWritable callback |
43 | | - // Will be called to drain client backpressure |
| 47 | + /* Register onWritable callback |
| 48 | + * Will be called to drain client backpressure */ |
44 | 49 | res.onWritable((offset) => { |
45 | 50 | const offsetDiff = offset - totalOffset; |
46 | 51 | if (offsetDiff) { |
47 | | - // If start of the chunk was successfully sent |
48 | | - // We only send the missing part |
| 52 | + /* If start of the chunk was successfully sent |
| 53 | + * We only send the missing part */ |
49 | 54 | chunkBuffer = chunkBuffer.subarray(offsetDiff); |
50 | 55 | totalOffset = offset; |
51 | 56 | } |
52 | | - // Always return if resend was successful or not |
| 57 | + /* Always return if resend was successful or not */ |
53 | 58 | return sendChunkBuffer(); |
54 | 59 | }); |
55 | 60 |
|
56 | | - // Register callback for stream events |
| 61 | + /* Register callback for stream events */ |
57 | 62 | readStream.on('error', (err) => { |
58 | 63 | console.log('Error reading file: '+ err); |
59 | | - // res.close calls onAborted callback |
60 | | - res.close(); |
| 64 | + /* res.close calls onAborted callback */ |
| 65 | + res.cork(() => res.close()); |
61 | 66 | }).on('data', (newChunkBuffer) => { |
62 | 67 | chunkBuffer = newChunkBuffer; |
63 | | - // Cork before sending new chunk |
| 68 | + /* Cork before sending new chunk */ |
64 | 69 | res.cork(sendChunkBuffer); |
65 | 70 | }); |
66 | 71 | }; |
67 | 72 |
|
68 | | -let lastStreamIndex = 0; |
69 | | -let openStreams = 0; |
70 | | - |
71 | 73 | const app = uWS./*SSL*/App({ |
72 | 74 | key_file_name: 'misc/key.pem', |
73 | 75 | cert_file_name: 'misc/cert.pem', |
74 | 76 | passphrase: '1234' |
| 77 | +}).get('/smallFile', (res, req) => { |
| 78 | + /* !! Use this only for small files !! |
| 79 | + * May cause server backpressure and bad performance |
| 80 | + * For bigger files you have to use streaming method */ |
| 81 | + res.writeHeader('Content-Type', smallFileType).end(smallFileCachedBuffer); |
75 | 82 | }).get('/bigFile', (res, req) => { |
76 | 83 | const streamIndex = ++ lastStreamIndex; |
77 | 84 | console.log('Stream ('+ streamIndex +') was opened, openStreams: '+ (++ openStreams)); |
78 | | - res.writeHeader('Content-Type', 'video/mpeg'); |
79 | | - // Create read stream with fs.createReadStream |
80 | | - streamData(res, fs.createReadStream(bigFileName), bigFileSize, () => { |
81 | | - // On streaming finished (success/error/onAborted) |
82 | | - console.log('Stream ('+ streamIndex +') was closed, openStreams: '+ (-- openStreams)); |
| 85 | + const readStream = fs.createReadStream(bigFileName); |
| 86 | + /* Attach onAborted handler because streaming is async */ |
| 87 | + res.onAborted(() => { |
| 88 | + readStream.destroy(); |
| 89 | + console.log('Stream ('+ streamIndex +') failed, openStreams: '+ (-- openStreams)); |
| 90 | + }); |
| 91 | + res.writeHeader('Content-Type', bigFileType); |
| 92 | + streamData(res, readStream, bigFileSize, () => { |
| 93 | + console.log('Stream ('+ streamIndex +') succeed, openStreams: '+ (-- openStreams)); |
83 | 94 | }); |
84 | | - |
85 | | -}).get('/smallFile', (res, req) => { |
86 | | - // !! Use this only for small files !! |
87 | | - // May cause server backpressure and bad performance |
88 | | - // For bigger files you have to use streaming |
89 | | - try { |
90 | | - const fileBuffer = fs.readFileSync('absolutPathTo/smallData.json'); |
91 | | - res.writeHeader('Content-Type', 'application/json').end(fileBuffer); |
92 | | - } catch (err) { |
93 | | - console.log('Error reading file: '+ err); |
94 | | - res.writeStatus('500 Internal Server Error').end(); |
95 | | - } |
96 | 95 | }).listen(port, (token) => { |
97 | 96 | if (token) { |
98 | 97 | console.log('Listening to port ' + port); |
|
0 commit comments