Skip to content

Commit 53e7269

Browse files
committed
fixup! Add stall detection to recover from frozen uploads
1 parent 1636aa7 commit 53e7269

File tree

2 files changed

+54
-125
lines changed

2 files changed

+54
-125
lines changed

lib/StallDetector.ts

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ export class StallDetector {
1010
private intervalId: ReturnType<typeof setInterval> | null = null
1111
private lastProgressTime = 0
1212
private isActive = false
13-
private isPaused = false
1413

1514
constructor(
1615
options: StallDetectionOptions,
@@ -47,7 +46,7 @@ export class StallDetector {
4746

4847
// Setup periodic check
4948
this.intervalId = setInterval(() => {
50-
if (!this.isActive || this.isPaused) {
49+
if (!this.isActive) {
5150
return
5251
}
5352

@@ -76,26 +75,6 @@ export class StallDetector {
7675
this.lastProgressTime = Date.now()
7776
}
7877

79-
/**
80-
* Pause stall detection temporarily (e.g., during onBeforeRequest callback)
81-
*/
82-
pause(): void {
83-
this.isActive = false
84-
this.isPaused = true
85-
}
86-
87-
/**
88-
* Resume stall detection after pause
89-
*/
90-
resume(): void {
91-
if (this.isPaused) {
92-
this.isPaused = false
93-
this.isActive = true
94-
// Reset the last progress time to avoid false positives
95-
this.lastProgressTime = Date.now()
96-
}
97-
}
98-
9978
/**
10079
* Detect if current HttpStack supports progress events
10180
*/

lib/upload.ts

Lines changed: 53 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ import {
1919
} from './options.js'
2020
import { uuid } from './uuid.js'
2121

22-
interface ExtendedHttpRequest extends HttpRequest {
23-
_upload?: BaseUpload
24-
}
25-
2622
export const defaultOptions = {
2723
endpoint: undefined,
2824

@@ -120,9 +116,6 @@ export class BaseUpload {
120116
// upload options or HEAD response)
121117
private _uploadLengthDeferred: boolean
122118

123-
// Stall detector instance
124-
private _stallDetector?: StallDetector
125-
126119
constructor(file: UploadInput, options: UploadOptions) {
127120
// Warn about removed options from previous versions
128121
if ('resume' in options) {
@@ -229,9 +222,6 @@ export class BaseUpload {
229222
}
230223
}
231224

232-
// Setup stall detection before starting the upload
233-
this._setupStallDetection()
234-
235225
// Note: `start` does not return a Promise or await the preparation on purpose.
236226
// Its supposed to return immediately and start the upload in the background.
237227
this._prepareAndStartUpload().catch((err) => {
@@ -361,11 +351,6 @@ export class BaseUpload {
361351
throw new Error('tus: Expected totalSize to be set')
362352
}
363353

364-
// Update progress timestamp for the parallel upload to track stalls
365-
if (upload._stallDetector) {
366-
upload._stallDetector.updateProgress()
367-
}
368-
369354
this._emitProgress(totalProgress, totalSize)
370355
},
371356
// Wait until every partial upload has an upload URL, so we can add
@@ -486,11 +471,6 @@ export class BaseUpload {
486471
// Set the aborted flag before any `await`s, so no new requests are started.
487472
this._aborted = true
488473

489-
// Clear any stall detection
490-
if (this._stallDetector) {
491-
this._stallDetector.stop()
492-
}
493-
494474
// Stop any parallel partial uploads, that have been started in _startParallelUploads.
495475
if (this._parallelUploads != null) {
496476
for (const upload of this._parallelUploads) {
@@ -585,11 +565,6 @@ export class BaseUpload {
585565
* @api private
586566
*/
587567
private _emitProgress(bytesSent: number, bytesTotal: number | null): void {
588-
// Update stall detection state if progress has been made
589-
if (this._stallDetector) {
590-
this._stallDetector.updateProgress()
591-
}
592-
593568
if (typeof this.options.onProgress === 'function') {
594569
this.options.onProgress(bytesSent, bytesTotal)
595570
}
@@ -868,7 +843,31 @@ export class BaseUpload {
868843
const start = this._offset
869844
let end = this._offset + this.options.chunkSize
870845

846+
// Create stall detector for this request if stall detection is enabled and supported
847+
let stallDetector: StallDetector | undefined
848+
if (this.options.stallDetection?.enabled) {
849+
// Only enable stall detection if the HTTP stack supports progress events
850+
if (this.options.httpStack.supportsProgressEvents?.()) {
851+
stallDetector = new StallDetector(
852+
this.options.stallDetection,
853+
this.options.httpStack,
854+
(reason: string) => {
855+
// Handle stall by aborting the current request and triggering retry
856+
if (this._req) {
857+
this._req.abort()
858+
}
859+
this._retryOrEmitError(new Error(`Upload stalled: ${reason}`))
860+
},
861+
)
862+
stallDetector.start()
863+
}
864+
}
865+
871866
req.setProgressHandler((bytesSent) => {
867+
// Update per-request stall detector if active
868+
if (stallDetector) {
869+
stallDetector.updateProgress()
870+
}
872871
this._emitProgress(start + bytesSent, this._size)
873872
})
874873

@@ -911,22 +910,42 @@ export class BaseUpload {
911910
// See https://community.transloadit.com/t/how-to-abort-hanging-companion-uploads/16488/13
912911
const newSize = this._offset + sizeOfValue
913912
if (!this._uploadLengthDeferred && done && newSize !== this._size) {
913+
// Clean up stall detector before throwing error
914+
if (stallDetector) {
915+
stallDetector.stop()
916+
}
914917
throw new Error(
915918
`upload was configured with a size of ${this._size} bytes, but the source is done after ${newSize} bytes`,
916919
)
917920
}
918921

919-
if (value == null) {
920-
return await this._sendRequest(req)
921-
}
922+
try {
923+
let response: HttpResponse
924+
if (value == null) {
925+
response = await this._sendRequest(req)
926+
} else {
927+
if (
928+
this.options.protocol === PROTOCOL_IETF_DRAFT_03 ||
929+
this.options.protocol === PROTOCOL_IETF_DRAFT_05
930+
) {
931+
req.setHeader('Upload-Complete', done ? '?1' : '?0')
932+
}
933+
response = await this._sendRequest(req, value)
934+
}
922935

923-
if (
924-
this.options.protocol === PROTOCOL_IETF_DRAFT_03 ||
925-
this.options.protocol === PROTOCOL_IETF_DRAFT_05
926-
) {
927-
req.setHeader('Upload-Complete', done ? '?1' : '?0')
936+
// Clean up stall detector on successful completion
937+
if (stallDetector) {
938+
stallDetector.stop()
939+
}
940+
941+
return response
942+
} catch (error) {
943+
// Clean up stall detector on error
944+
if (stallDetector) {
945+
stallDetector.stop()
946+
}
947+
throw error
928948
}
929-
return await this._sendRequest(req, value)
930949
}
931950

932951
/**
@@ -965,8 +984,6 @@ export class BaseUpload {
965984
private _openRequest(method: string, url: string): HttpRequest {
966985
const req = openRequest(method, url, this.options)
967986
this._req = req
968-
// Store reference to this upload instance for stall detection management
969-
;(req as ExtendedHttpRequest)._upload = this
970987
return req
971988
}
972989

@@ -1029,62 +1046,6 @@ export class BaseUpload {
10291046
_sendRequest(req: HttpRequest, body?: SliceType): Promise<HttpResponse> {
10301047
return sendRequest(req, body, this.options)
10311048
}
1032-
1033-
/**
1034-
* Setup stall detection monitoring
1035-
*/
1036-
private _setupStallDetection(): void {
1037-
const stallDetection = this.options.stallDetection
1038-
1039-
// Early return if disabled or undefined
1040-
if (!stallDetection || !stallDetection.enabled) {
1041-
return
1042-
}
1043-
1044-
// Create stall detector instance
1045-
this._stallDetector = new StallDetector(
1046-
stallDetection,
1047-
this.options.httpStack,
1048-
(reason: string) => this._handleStall(reason),
1049-
)
1050-
1051-
// Start monitoring
1052-
this._stallDetector.start()
1053-
}
1054-
1055-
/**
1056-
* Handle a detected stall by forcing a retry
1057-
*/
1058-
private _handleStall(reason: string): void {
1059-
// Just abort the current request, not the entire upload
1060-
// Each parallel upload instance has its own stall detection
1061-
if (this._req) {
1062-
this._req.abort()
1063-
}
1064-
1065-
// Force a retry via the error mechanism
1066-
this._retryOrEmitError(new Error(`Upload stalled: ${reason}`))
1067-
}
1068-
1069-
/**
1070-
* Pause stall detection temporarily
1071-
* @api private
1072-
*/
1073-
_pauseStallDetection(): void {
1074-
if (this._stallDetector) {
1075-
this._stallDetector.pause()
1076-
}
1077-
}
1078-
1079-
/**
1080-
* Resume stall detection
1081-
* @api private
1082-
*/
1083-
_resumeStallDetection(): void {
1084-
if (this._stallDetector) {
1085-
this._stallDetector.resume()
1086-
}
1087-
}
10881049
}
10891050

10901051
function encodeMetadata(metadata: Record<string, string>): string {
@@ -1145,21 +1106,10 @@ async function sendRequest(
11451106
body: SliceType | undefined,
11461107
options: UploadOptions,
11471108
): Promise<HttpResponse> {
1148-
// Pause stall detection during onBeforeRequest callback to avoid false positives
1149-
const upload = (req as ExtendedHttpRequest)._upload
1150-
if (upload) {
1151-
upload._pauseStallDetection()
1152-
}
1153-
11541109
if (typeof options.onBeforeRequest === 'function') {
11551110
await options.onBeforeRequest(req)
11561111
}
11571112

1158-
// Resume stall detection after callback
1159-
if (upload) {
1160-
upload._resumeStallDetection()
1161-
}
1162-
11631113
const res = await req.send(body)
11641114

11651115
if (typeof options.onAfterResponse === 'function') {

0 commit comments

Comments
 (0)