From 2b3eaf12647593730c10cb646fd5ca7714f81c25 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Fri, 31 Oct 2025 13:59:25 +0100 Subject: [PATCH 01/10] try to override p-map till react router esolves this issue --- .../react-router-7-framework-node-20-18/package.json | 5 +++++ .../react-router-7-framework-spa-node-20-18/package.json | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json b/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json index e9f1f0d51504..ceea35b5f885 100644 --- a/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json +++ b/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json @@ -55,5 +55,10 @@ "volta": { "extends": "../../package.json", "node": "20.18.2" + }, + "pnpm": { + "overrides": { + "p-map": "^6.0.0" + } } } diff --git a/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json b/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json index 4ad6fae68416..cd3b3bfcb332 100644 --- a/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json +++ b/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json @@ -53,5 +53,10 @@ "volta": { "extends": "../../package.json", "node": "20.18.2" + }, + "pnpm": { + "overrides": { + "p-map": "^6.0.0" + } } } From 0842114f499296f206aa43d853ff0021c7c63209 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 10:39:54 +0100 Subject: [PATCH 02/10] fix(cloudflare): Keep root span alive until streaming responses are consumed --- packages/cloudflare/src/request.ts | 92 +++++++++++++++---- packages/cloudflare/src/utils/streaming.ts | 72 +++++++++++++++ .../cloudflare/test/durableobject.test.ts | 20 +++- packages/cloudflare/test/handler.test.ts | 6 +- packages/cloudflare/test/pages-plugin.test.ts | 4 +- packages/cloudflare/test/request.test.ts | 34 ++++++- 6 files changed, 201 insertions(+), 27 deletions(-) create mode 100644 packages/cloudflare/src/utils/streaming.ts diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index 5c97562d9fde..4b3943f0f14d 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -8,13 +8,14 @@ import { parseStringToURLObject, SEMANTIC_ATTRIBUTE_SENTRY_OP, setHttpStatus, - startSpan, + startSpanManual, winterCGHeadersToDict, withIsolationScope, } from '@sentry/core'; import type { CloudflareOptions } from './client'; import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils'; import { init } from './sdk'; +import { classifyResponseStreaming } from './utils/streaming'; interface RequestHandlerWrapperOptions { options: CloudflareOptions; @@ -98,26 +99,79 @@ export function wrapRequestHandler( // Note: This span will not have a duration unless I/O happens in the handler. This is // because of how the cloudflare workers runtime works. // See: https://developers.cloudflare.com/workers/runtime-apis/performance/ - return startSpan( - { - name, - attributes, - }, - async span => { - try { - const res = await handler(); - setHttpStatus(span, res.status); - return res; - } catch (e) { - if (captureErrors) { - captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); - } - throw e; - } finally { + + // Use startSpanManual to control when span ends (needed for streaming responses) + return startSpanManual({ name, attributes }, async span => { + let res: Response; + + try { + res = await handler(); + setHttpStatus(span, res.status); + } catch (e) { + span.end(); // End span on error + if (captureErrors) { + captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); + } + waitUntil?.(flush(2000)); + throw e; + } + + // Classify response to detect actual streaming + const classification = await classifyResponseStreaming(res); + + if (classification.isStreaming) { + // Streaming response detected - monitor consumption to keep span alive + if (!classification.response.body) { + // Shouldn't happen since isStreaming requires body, but handle gracefully + span.end(); waitUntil?.(flush(2000)); + return classification.response; } - }, - ); + + const [clientStream, monitorStream] = classification.response.body.tee(); + + // Monitor stream consumption and end span when complete + const streamMonitor = (async () => { + const reader = monitorStream.getReader(); + + // Safety timeout to prevent infinite loops if stream hangs + const timeout = setTimeout(() => { + span.end(); + reader.cancel().catch(() => {}); + }, 30000); // 30 second max + + try { + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + } + clearTimeout(timeout); + span.end(); + } catch (err) { + clearTimeout(timeout); + span.end(); + } finally { + reader.releaseLock(); + } + })(); + + // Use waitUntil to keep context alive and flush after span ends + waitUntil?.(streamMonitor.then(() => flush(2000))); + + // Return response with client stream + return new Response(clientStream, { + status: classification.response.status, + statusText: classification.response.statusText, + headers: classification.response.headers, + }); + } + + // Non-streaming response - end span immediately and return original + span.end(); + waitUntil?.(flush(2000)); + return classification.response; + }); }, ); }); diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts new file mode 100644 index 000000000000..d4753a5430c8 --- /dev/null +++ b/packages/cloudflare/src/utils/streaming.ts @@ -0,0 +1,72 @@ +export type StreamingGuess = { + response: Response; + isStreaming: boolean; +}; + +/** + * + */ +export async function classifyResponseStreaming( + res: Response, + opts: { timeoutMs?: number } = {}, +): Promise { + const timeoutMs = opts.timeoutMs ?? 25; + + if (!res.body) { + return { response: res, isStreaming: false }; + } + + const ct = res.headers.get('content-type') ?? ''; + const cl = res.headers.get('content-length'); + + // Definitive streaming indicators + if (/^text\/event-stream\b/i.test(ct)) { + return { response: res, isStreaming: true }; + } + + // Definitive non-streaming indicators + if (cl && /^\d+$/.test(cl)) { + return { response: res, isStreaming: false }; + } + + // Probe the stream to detect streaming behavior + // NOTE: This tees the stream and returns a new Response object + const [probe, pass] = res.body.tee(); + const reader = probe.getReader(); + + const firstChunkPromise = (async () => { + try { + const { value, done } = await reader.read(); + reader.releaseLock(); + if (done) return { arrivedBytes: 0, done: true }; + const bytes = + value && typeof value === 'object' && 'byteLength' in value ? (value as { byteLength: number }).byteLength : 0; + return { arrivedBytes: bytes, done: false }; + } catch { + return { arrivedBytes: 0, done: false }; + } + })(); + + const timeout = new Promise<{ arrivedBytes: number; done: boolean }>(r => + setTimeout(() => r({ arrivedBytes: 0, done: false }), timeoutMs), + ); + + const peek = await Promise.race([firstChunkPromise, timeout]); + + // We must return the teed response since original is now locked + const preserved = new Response(pass, res); + + let isStreaming = false; + if (peek.done) { + // Stream completed immediately + isStreaming = false; + } else if (peek.arrivedBytes === 0) { + // Timeout waiting for first chunk - definitely streaming + isStreaming = true; + } else { + // Got first chunk - streaming if no Content-Length + isStreaming = cl == null; + } + + return { response: preserved, isStreaming }; +} diff --git a/packages/cloudflare/test/durableobject.test.ts b/packages/cloudflare/test/durableobject.test.ts index 4d9e2a20fe97..42a9ef45b735 100644 --- a/packages/cloudflare/test/durableobject.test.ts +++ b/packages/cloudflare/test/durableobject.test.ts @@ -133,11 +133,25 @@ describe('instrumentDurableObjectWithSentry', () => { waitUntil, } as unknown as ExecutionContext; const dObject: any = Reflect.construct(instrumented, [context, {} as any]); - expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow(); - expect(flush).not.toBeCalled(); - expect(waitUntil).toHaveBeenCalledOnce(); + + // Call fetch (don't await yet) + const responsePromise = dObject.fetch(new Request('https://example.com')); + + // Advance past classification timeout and get response + vi.advanceTimersByTime(30); + const response = await responsePromise; + + // Consume response (triggers span end for buffered responses) + await response.text(); + + // The flush should now be queued in waitUntil + expect(waitUntil).toHaveBeenCalled(); + + // Advance to trigger the setTimeout in the handler's waitUntil vi.advanceTimersToNextTimer(); await Promise.all(waitUntil.mock.calls.map(([p]) => p)); + + // Now flush should have been called expect(flush).toBeCalled(); }); diff --git a/packages/cloudflare/test/handler.test.ts b/packages/cloudflare/test/handler.test.ts index 7768689ffc48..15fa3effcd7f 100644 --- a/packages/cloudflare/test/handler.test.ts +++ b/packages/cloudflare/test/handler.test.ts @@ -72,7 +72,11 @@ describe('withSentry', () => { createMockExecutionContext(), ); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content + expect(result?.status).toBe(response.status); + if (result) { + expect(await result.text()).toBe('test'); + } }); test('merges options from env and callback', async () => { diff --git a/packages/cloudflare/test/pages-plugin.test.ts b/packages/cloudflare/test/pages-plugin.test.ts index 5cfbd1f4bb5e..7f70ac7de098 100644 --- a/packages/cloudflare/test/pages-plugin.test.ts +++ b/packages/cloudflare/test/pages-plugin.test.ts @@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => { pluginArgs: MOCK_OPTIONS, }); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content + expect(result.status).toBe(response.status); + expect(await result.text()).toBe('test'); }); }); diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index d6d0de5824a1..68f7dcdea7f2 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -33,7 +33,9 @@ describe('withSentry', () => { { options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() }, () => response, ); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content matches + expect(result.status).toBe(response.status); + expect(await result.text()).toBe('test'); }); test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => { @@ -48,6 +50,25 @@ describe('withSentry', () => { expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise)); }); + test('handles streaming responses correctly', async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + controller.enqueue(new TextEncoder().encode('chunk2')); + controller.close(); + }, + }); + const streamingResponse = new Response(stream); + + const result = await wrapRequestHandler( + { options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() }, + () => streamingResponse, + ); + + const text = await result.text(); + expect(text).toBe('chunk1chunk2'); + }); + test("doesn't error if context is undefined", () => { expect(() => wrapRequestHandler( @@ -284,7 +305,7 @@ describe('withSentry', () => { mockRequest.headers.set('content-length', '10'); let sentryEvent: Event = {}; - await wrapRequestHandler( + const result = await wrapRequestHandler( { options: { ...MOCK_OPTIONS, @@ -299,10 +320,17 @@ describe('withSentry', () => { }, () => { SentryCore.captureMessage('sentry-trace'); - return new Response('test'); + const response = new Response('test'); + return response; }, ); + // Consume response to trigger span end for non-streaming responses + await result.text(); + + // Wait for async span end and transaction capture + await new Promise(resolve => setTimeout(resolve, 50)); + expect(sentryEvent.transaction).toEqual('GET /'); expect(sentryEvent.spans).toHaveLength(0); expect(sentryEvent.contexts?.trace).toEqual({ From eb7a5c130210d81150c9411140c1082733ff160d Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 13:23:07 +0100 Subject: [PATCH 03/10] quick refactor --- packages/cloudflare/src/request.ts | 33 +++----- packages/cloudflare/src/utils/streaming.ts | 93 +++++++++++++--------- packages/cloudflare/test/request.test.ts | 22 ++--- 3 files changed, 77 insertions(+), 71 deletions(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index 4b3943f0f14d..c9fc15831ed8 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -108,7 +108,7 @@ export function wrapRequestHandler( res = await handler(); setHttpStatus(span, res.status); } catch (e) { - span.end(); // End span on error + span.end(); if (captureErrors) { captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); } @@ -119,26 +119,16 @@ export function wrapRequestHandler( // Classify response to detect actual streaming const classification = await classifyResponseStreaming(res); - if (classification.isStreaming) { + if (classification.isStreaming && classification.response.body) { // Streaming response detected - monitor consumption to keep span alive - if (!classification.response.body) { - // Shouldn't happen since isStreaming requires body, but handle gracefully - span.end(); - waitUntil?.(flush(2000)); - return classification.response; - } - const [clientStream, monitorStream] = classification.response.body.tee(); // Monitor stream consumption and end span when complete const streamMonitor = (async () => { const reader = monitorStream.getReader(); - // Safety timeout to prevent infinite loops if stream hangs - const timeout = setTimeout(() => { - span.end(); - reader.cancel().catch(() => {}); - }, 30000); // 30 second max + // Safety timeout - abort reading and end span after 5s even if stream hasn't finished + const timeout = setTimeout(() => reader.cancel(), 5000); try { let done = false; @@ -146,18 +136,19 @@ export function wrapRequestHandler( const result = await reader.read(); done = result.done; } - clearTimeout(timeout); - span.end(); - } catch (err) { - clearTimeout(timeout); - span.end(); + } catch { + // Stream error or cancellation - will end span in finally } finally { + clearTimeout(timeout); reader.releaseLock(); + span.end(); + waitUntil?.(flush(2000)); } })(); - // Use waitUntil to keep context alive and flush after span ends - waitUntil?.(streamMonitor.then(() => flush(2000))); + if (waitUntil) { + waitUntil(streamMonitor); + } // Return response with client stream return new Response(clientStream, { diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index d4753a5430c8..6225b1112e01 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -4,7 +4,18 @@ export type StreamingGuess = { }; /** + * Classifies a Response as streaming or non-streaming. * + * Uses multiple heuristics: + * - Content-Type: text/event-stream → streaming + * - Content-Length header present → not streaming + * - Otherwise: probes stream with timeout to detect behavior + * + * Note: Probing will tee() the stream and return a new Response object. + * + * @param res - The Response to classify + * @param opts.timeoutMs - Probe timeout in ms (default: 25) + * @returns Classification result with safe-to-return Response */ export async function classifyResponseStreaming( res: Response, @@ -16,57 +27,61 @@ export async function classifyResponseStreaming( return { response: res, isStreaming: false }; } - const ct = res.headers.get('content-type') ?? ''; - const cl = res.headers.get('content-length'); + const contentType = res.headers.get('content-type') ?? ''; + const contentLength = res.headers.get('content-length'); - // Definitive streaming indicators - if (/^text\/event-stream\b/i.test(ct)) { + // Fast path: Server-Sent Events + if (/^text\/event-stream\b/i.test(contentType)) { return { response: res, isStreaming: true }; } - // Definitive non-streaming indicators - if (cl && /^\d+$/.test(cl)) { + // Fast path: Content-Length indicates buffered response + if (contentLength && /^\d+$/.test(contentLength)) { return { response: res, isStreaming: false }; } - // Probe the stream to detect streaming behavior - // NOTE: This tees the stream and returns a new Response object - const [probe, pass] = res.body.tee(); - const reader = probe.getReader(); + // Uncertain - probe the stream to determine behavior + // After tee(), must use the teed stream (original is locked) + const [probeStream, passStream] = res.body.tee(); + const reader = probeStream.getReader(); - const firstChunkPromise = (async () => { - try { - const { value, done } = await reader.read(); - reader.releaseLock(); - if (done) return { arrivedBytes: 0, done: true }; - const bytes = - value && typeof value === 'object' && 'byteLength' in value ? (value as { byteLength: number }).byteLength : 0; - return { arrivedBytes: bytes, done: false }; - } catch { - return { arrivedBytes: 0, done: false }; - } - })(); + const probeResult = await Promise.race([ + // Try to read first chunk + (async () => { + try { + const { value, done } = await reader.read(); + reader.releaseLock(); - const timeout = new Promise<{ arrivedBytes: number; done: boolean }>(r => - setTimeout(() => r({ arrivedBytes: 0, done: false }), timeoutMs), - ); + if (done) { + return { arrivedBytes: 0, done: true }; + } - const peek = await Promise.race([firstChunkPromise, timeout]); + const bytes = + value && typeof value === 'object' && 'byteLength' in value + ? (value as { byteLength: number }).byteLength + : 0; + return { arrivedBytes: bytes, done: false }; + } catch { + return { arrivedBytes: 0, done: false }; + } + })(), + // Timeout if first chunk takes too long + new Promise<{ arrivedBytes: number; done: boolean }>(resolve => + setTimeout(() => resolve({ arrivedBytes: 0, done: false }), timeoutMs), + ), + ]); - // We must return the teed response since original is now locked - const preserved = new Response(pass, res); + const teededResponse = new Response(passStream, res); - let isStreaming = false; - if (peek.done) { - // Stream completed immediately - isStreaming = false; - } else if (peek.arrivedBytes === 0) { - // Timeout waiting for first chunk - definitely streaming - isStreaming = true; + // Determine if streaming based on probe result + if (probeResult.done) { + // Stream completed immediately - buffered + return { response: teededResponse, isStreaming: false }; + } else if (probeResult.arrivedBytes === 0) { + // Timeout waiting - definitely streaming + return { response: teededResponse, isStreaming: true }; } else { - // Got first chunk - streaming if no Content-Length - isStreaming = cl == null; + // Got chunk quickly - streaming if no Content-Length + return { response: teededResponse, isStreaming: contentLength == null }; } - - return { response: preserved, isStreaming }; } diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index 68f7dcdea7f2..0ef32618062f 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -90,7 +90,7 @@ describe('withSentry', () => { }); test('flush must be called when all waitUntil are done', async () => { - const flush = vi.spyOn(SentryCore.Client.prototype, 'flush'); + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush'); vi.useFakeTimers(); onTestFinished(() => { vi.useRealTimers(); @@ -104,13 +104,17 @@ describe('withSentry', () => { await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => { addDelayedWaitUntil(context); - return new Response('test'); + const response = new Response('test'); + // Add Content-Length to skip probing + response.headers.set('content-length', '4'); + return response; }); - expect(flush).not.toBeCalled(); + expect(flushSpy).not.toBeCalled(); expect(waitUntil).toBeCalled(); - vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers()); + await vi.advanceTimersToNextTimerAsync(); + vi.runAllTimers(); await Promise.all(waits); - expect(flush).toHaveBeenCalledOnce(); + expect(flushSpy).toHaveBeenCalledOnce(); }); describe('scope instrumentation', () => { @@ -305,7 +309,7 @@ describe('withSentry', () => { mockRequest.headers.set('content-length', '10'); let sentryEvent: Event = {}; - const result = await wrapRequestHandler( + await wrapRequestHandler( { options: { ...MOCK_OPTIONS, @@ -320,14 +324,10 @@ describe('withSentry', () => { }, () => { SentryCore.captureMessage('sentry-trace'); - const response = new Response('test'); - return response; + return new Response('test'); }, ); - // Consume response to trigger span end for non-streaming responses - await result.text(); - // Wait for async span end and transaction capture await new Promise(resolve => setTimeout(resolve, 50)); From ae8e345a162e031d8a3af26a7537e449450ba57e Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 15:53:26 +0100 Subject: [PATCH 04/10] test buffered without timeout --- packages/cloudflare/src/utils/streaming.ts | 57 ++++++---------------- 1 file changed, 15 insertions(+), 42 deletions(-) diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index 6225b1112e01..76593a22d169 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -14,15 +14,9 @@ export type StreamingGuess = { * Note: Probing will tee() the stream and return a new Response object. * * @param res - The Response to classify - * @param opts.timeoutMs - Probe timeout in ms (default: 25) * @returns Classification result with safe-to-return Response */ -export async function classifyResponseStreaming( - res: Response, - opts: { timeoutMs?: number } = {}, -): Promise { - const timeoutMs = opts.timeoutMs ?? 25; - +export async function classifyResponseStreaming(res: Response): Promise { if (!res.body) { return { response: res, isStreaming: false }; } @@ -40,48 +34,27 @@ export async function classifyResponseStreaming( return { response: res, isStreaming: false }; } - // Uncertain - probe the stream to determine behavior + // Probe the stream by trying to read first chunk immediately // After tee(), must use the teed stream (original is locked) const [probeStream, passStream] = res.body.tee(); const reader = probeStream.getReader(); - const probeResult = await Promise.race([ - // Try to read first chunk - (async () => { - try { - const { value, done } = await reader.read(); - reader.releaseLock(); - - if (done) { - return { arrivedBytes: 0, done: true }; - } + try { + const { done } = await reader.read(); + reader.releaseLock(); - const bytes = - value && typeof value === 'object' && 'byteLength' in value - ? (value as { byteLength: number }).byteLength - : 0; - return { arrivedBytes: bytes, done: false }; - } catch { - return { arrivedBytes: 0, done: false }; - } - })(), - // Timeout if first chunk takes too long - new Promise<{ arrivedBytes: number; done: boolean }>(resolve => - setTimeout(() => resolve({ arrivedBytes: 0, done: false }), timeoutMs), - ), - ]); + const teededResponse = new Response(passStream, res); - const teededResponse = new Response(passStream, res); + if (done) { + // Stream completed immediately - buffered (empty body) + return { response: teededResponse, isStreaming: false }; + } - // Determine if streaming based on probe result - if (probeResult.done) { - // Stream completed immediately - buffered - return { response: teededResponse, isStreaming: false }; - } else if (probeResult.arrivedBytes === 0) { - // Timeout waiting - definitely streaming - return { response: teededResponse, isStreaming: true }; - } else { - // Got chunk quickly - streaming if no Content-Length + // Got data - treat as streaming if no Content-Length header return { response: teededResponse, isStreaming: contentLength == null }; + } catch { + reader.releaseLock(); + // Error reading - treat as non-streaming to be safe + return { response: new Response(passStream, res), isStreaming: false }; } } From 62a1b2c2ddb2ad91125d1938b92694f5de63783e Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 16:03:51 +0100 Subject: [PATCH 05/10] remove timeout --- packages/cloudflare/src/request.ts | 8 +------- packages/cloudflare/src/utils/streaming.ts | 6 +++++- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index c9fc15831ed8..7ac6b924c876 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -127,9 +127,6 @@ export function wrapRequestHandler( const streamMonitor = (async () => { const reader = monitorStream.getReader(); - // Safety timeout - abort reading and end span after 5s even if stream hasn't finished - const timeout = setTimeout(() => reader.cancel(), 5000); - try { let done = false; while (!done) { @@ -139,16 +136,13 @@ export function wrapRequestHandler( } catch { // Stream error or cancellation - will end span in finally } finally { - clearTimeout(timeout); reader.releaseLock(); span.end(); waitUntil?.(flush(2000)); } })(); - if (waitUntil) { - waitUntil(streamMonitor); - } + waitUntil?.(streamMonitor); // Return response with client stream return new Response(clientStream, { diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index 76593a22d169..2184cac48a3d 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -7,9 +7,13 @@ export type StreamingGuess = { * Classifies a Response as streaming or non-streaming. * * Uses multiple heuristics: + * - No body → not streaming * - Content-Type: text/event-stream → streaming * - Content-Length header present → not streaming - * - Otherwise: probes stream with timeout to detect behavior + * - Otherwise: attempts immediate read to detect behavior + * - Stream empty (done) → not streaming + * - Got data without Content-Length → streaming + * - Got data with Content-Length → not streaming * * Note: Probing will tee() the stream and return a new Response object. * From ca4bafaa9533ce89759fcab5006649f4dbb7e2a6 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Mon, 17 Nov 2025 11:01:37 +0100 Subject: [PATCH 06/10] give this a shot --- packages/cloudflare/src/utils/streaming.ts | 34 ++++++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index 2184cac48a3d..79d0f0c5e342 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -10,11 +10,15 @@ export type StreamingGuess = { * - No body → not streaming * - Content-Type: text/event-stream → streaming * - Content-Length header present → not streaming - * - Otherwise: attempts immediate read to detect behavior + * - Otherwise: attempts immediate read with timeout to detect behavior + * - Timeout (no data ready) → not streaming (typical SSR/buffered response) * - Stream empty (done) → not streaming - * - Got data without Content-Length → streaming + * - Got data without Content-Length → streaming (e.g., Vercel AI SDK) * - Got data with Content-Length → not streaming * + * The timeout prevents blocking on responses that are being generated (like SSR), + * while still detecting true streaming responses that produce data immediately. + * * Note: Probing will tee() the stream and return a new Response object. * * @param res - The Response to classify @@ -38,23 +42,41 @@ export async function classifyResponseStreaming(res: Response): Promise(resolve => { + setTimeout(() => resolve({ done: false, value: undefined, timedOut: true }), PROBE_TIMEOUT_MS); + }); + + const readPromise = reader.read().then(result => ({ ...result, timedOut: false as const })); + const result = await Promise.race([readPromise, timeoutPromise]); + reader.releaseLock(); const teededResponse = new Response(passStream, res); - if (done) { + if (result.timedOut) { + // Timeout means data isn't immediately available - likely a buffered response + // being generated (like SSR). Treat as non-streaming. + return { response: teededResponse, isStreaming: false }; + } + + if (result.done) { // Stream completed immediately - buffered (empty body) return { response: teededResponse, isStreaming: false }; } - // Got data - treat as streaming if no Content-Length header + // Got data immediately without Content-Length - likely streaming + // Got data immediately with Content-Length - buffered return { response: teededResponse, isStreaming: contentLength == null }; } catch { reader.releaseLock(); From 31ecf585cfa5a6787b4f98ed700cc7adad89713f Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Thu, 20 Nov 2025 12:22:29 +0100 Subject: [PATCH 07/10] revert and fix tests --- packages/cloudflare/src/request.ts | 2 +- packages/cloudflare/src/utils/streaming.ts | 84 ++++++---------------- packages/cloudflare/test/request.test.ts | 20 ++++-- 3 files changed, 36 insertions(+), 70 deletions(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index 7ac6b924c876..aa267a6bf000 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -117,7 +117,7 @@ export function wrapRequestHandler( } // Classify response to detect actual streaming - const classification = await classifyResponseStreaming(res); + const classification = classifyResponseStreaming(res); if (classification.isStreaming && classification.response.body) { // Streaming response detected - monitor consumption to keep span alive diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index 79d0f0c5e342..996f5f31542d 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -6,25 +6,16 @@ export type StreamingGuess = { /** * Classifies a Response as streaming or non-streaming. * - * Uses multiple heuristics: + * Heuristics: * - No body → not streaming - * - Content-Type: text/event-stream → streaming - * - Content-Length header present → not streaming - * - Otherwise: attempts immediate read with timeout to detect behavior - * - Timeout (no data ready) → not streaming (typical SSR/buffered response) - * - Stream empty (done) → not streaming - * - Got data without Content-Length → streaming (e.g., Vercel AI SDK) - * - Got data with Content-Length → not streaming + * - Known streaming Content-Types → streaming (SSE, NDJSON, JSON streaming) + * - text/plain without Content-Length → streaming (some AI APIs) + * - Otherwise → not streaming (conservative default, including HTML/SSR) * - * The timeout prevents blocking on responses that are being generated (like SSR), - * while still detecting true streaming responses that produce data immediately. - * - * Note: Probing will tee() the stream and return a new Response object. - * - * @param res - The Response to classify - * @returns Classification result with safe-to-return Response + * We avoid probing the stream to prevent blocking on transform streams (like injectTraceMetaTags) + * or SSR streams that may not have data ready immediately. */ -export async function classifyResponseStreaming(res: Response): Promise { +export function classifyResponseStreaming(res: Response): StreamingGuess { if (!res.body) { return { response: res, isStreaming: false }; } @@ -32,55 +23,20 @@ export async function classifyResponseStreaming(res: Response): Promise(resolve => { - setTimeout(() => resolve({ done: false, value: undefined, timedOut: true }), PROBE_TIMEOUT_MS); - }); - - const readPromise = reader.read().then(result => ({ ...result, timedOut: false as const })); - const result = await Promise.race([readPromise, timeoutPromise]); - - reader.releaseLock(); - - const teededResponse = new Response(passStream, res); - - if (result.timedOut) { - // Timeout means data isn't immediately available - likely a buffered response - // being generated (like SSR). Treat as non-streaming. - return { response: teededResponse, isStreaming: false }; - } - - if (result.done) { - // Stream completed immediately - buffered (empty body) - return { response: teededResponse, isStreaming: false }; - } - - // Got data immediately without Content-Length - likely streaming - // Got data immediately with Content-Length - buffered - return { response: teededResponse, isStreaming: contentLength == null }; - } catch { - reader.releaseLock(); - // Error reading - treat as non-streaming to be safe - return { response: new Response(passStream, res), isStreaming: false }; - } + // Default: treat as non-streaming + return { response: res, isStreaming: false }; } diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index 0ef32618062f..94b5d89e4ae0 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -90,11 +90,18 @@ describe('withSentry', () => { }); test('flush must be called when all waitUntil are done', async () => { - const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush'); + // Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); vi.useFakeTimers(); onTestFinished(() => { vi.useRealTimers(); }); + + // Measure delta instead of absolute call count to avoid interference from parallel tests. + // Since we spy on the prototype, other tests running in parallel may also call flush. + // By measuring before/after, we only verify that THIS test triggered exactly one flush call. + const before = flushSpy.mock.calls.length; + const waits: Promise[] = []; const waitUntil = vi.fn(promise => waits.push(promise)); @@ -109,12 +116,15 @@ describe('withSentry', () => { response.headers.set('content-length', '4'); return response; }); - expect(flushSpy).not.toBeCalled(); expect(waitUntil).toBeCalled(); - await vi.advanceTimersToNextTimerAsync(); - vi.runAllTimers(); + vi.advanceTimersToNextTimer().runAllTimers(); await Promise.all(waits); - expect(flushSpy).toHaveBeenCalledOnce(); + + const after = flushSpy.mock.calls.length; + const delta = after - before; + + // Verify that exactly one flush call was made during this test + expect(delta).toBe(1); }); describe('scope instrumentation', () => { From 17a03a79f13d96f4e3759397dcc45e1453419bc6 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Fri, 21 Nov 2025 13:24:17 +0100 Subject: [PATCH 08/10] fix test --- packages/cloudflare/test/durableobject.test.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/cloudflare/test/durableobject.test.ts b/packages/cloudflare/test/durableobject.test.ts index 42a9ef45b735..d665abf95c86 100644 --- a/packages/cloudflare/test/durableobject.test.ts +++ b/packages/cloudflare/test/durableobject.test.ts @@ -116,11 +116,18 @@ describe('instrumentDurableObjectWithSentry', () => { }); it('flush performs after all waitUntil promises are finished', async () => { + // Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers + const flush = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); vi.useFakeTimers(); onTestFinished(() => { vi.useRealTimers(); }); - const flush = vi.spyOn(SentryCore.Client.prototype, 'flush'); + + // Measure delta instead of absolute call count to avoid interference from parallel tests. + // Since we spy on the prototype, other tests running in parallel may also call flush. + // By measuring before/after, we only verify that THIS test triggered exactly one flush call. + const before = flush.mock.calls.length; + const waitUntil = vi.fn(); const testClass = vi.fn(context => ({ fetch: () => { @@ -151,8 +158,11 @@ describe('instrumentDurableObjectWithSentry', () => { vi.advanceTimersToNextTimer(); await Promise.all(waitUntil.mock.calls.map(([p]) => p)); - // Now flush should have been called - expect(flush).toBeCalled(); + const after = flush.mock.calls.length; + const delta = after - before; + + // Verify that exactly one flush call was made during this test + expect(delta).toBe(1); }); describe('instrumentPrototypeMethods option', () => { From 1f75f5c5a6f66804b86013897edff95b130e3c05 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Fri, 28 Nov 2025 14:41:17 +0100 Subject: [PATCH 09/10] remove res from classify fn, quick refactor --- packages/cloudflare/src/request.ts | 68 ++++++++++++---------- packages/cloudflare/src/utils/streaming.ts | 7 +-- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index aa267a6bf000..ab30f6618b10 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -119,43 +119,51 @@ export function wrapRequestHandler( // Classify response to detect actual streaming const classification = classifyResponseStreaming(res); - if (classification.isStreaming && classification.response.body) { + if (classification.isStreaming && res.body) { // Streaming response detected - monitor consumption to keep span alive - const [clientStream, monitorStream] = classification.response.body.tee(); - - // Monitor stream consumption and end span when complete - const streamMonitor = (async () => { - const reader = monitorStream.getReader(); - - try { - let done = false; - while (!done) { - const result = await reader.read(); - done = result.done; + try { + const [clientStream, monitorStream] = res.body.tee(); + + // Monitor stream consumption and end span when complete + const streamMonitor = (async () => { + const reader = monitorStream.getReader(); + + try { + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + } + } catch { + // Stream error or cancellation - will end span in finally + } finally { + reader.releaseLock(); + span.end(); + waitUntil?.(flush(2000)); } - } catch { - // Stream error or cancellation - will end span in finally - } finally { - reader.releaseLock(); - span.end(); - waitUntil?.(flush(2000)); - } - })(); - - waitUntil?.(streamMonitor); - - // Return response with client stream - return new Response(clientStream, { - status: classification.response.status, - statusText: classification.response.statusText, - headers: classification.response.headers, - }); + })(); + + // Keep worker alive until stream monitoring completes (otherwise span won't end) + waitUntil?.(streamMonitor); + + // Return response with client stream + return new Response(clientStream, { + status: res.status, + statusText: res.statusText, + headers: res.headers, + }); + } catch (e) { + // tee() failed (e.g stream already locked) - fall back to non-streaming handling + span.end(); + waitUntil?.(flush(2000)); + return res; + } } // Non-streaming response - end span immediately and return original span.end(); waitUntil?.(flush(2000)); - return classification.response; + return res; }); }, ); diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index 996f5f31542d..fee67cbb9f2a 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -1,5 +1,4 @@ export type StreamingGuess = { - response: Response; isStreaming: boolean; }; @@ -17,7 +16,7 @@ export type StreamingGuess = { */ export function classifyResponseStreaming(res: Response): StreamingGuess { if (!res.body) { - return { response: res, isStreaming: false }; + return { isStreaming: false }; } const contentType = res.headers.get('content-type') ?? ''; @@ -34,9 +33,9 @@ export function classifyResponseStreaming(res: Response): StreamingGuess { /^application\/stream\+json\b/i.test(contentType) || (/^text\/plain\b/i.test(contentType) && !contentLength) ) { - return { response: res, isStreaming: true }; + return { isStreaming: true }; } // Default: treat as non-streaming - return { response: res, isStreaming: false }; + return { isStreaming: false }; } From a5e65f3fe7935a2bec63da7af2b820d5c1d961ae Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Fri, 28 Nov 2025 15:43:11 +0100 Subject: [PATCH 10/10] lint --- packages/cloudflare/src/request.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index f6f2e75e9ca3..20706e8b9146 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -106,7 +106,7 @@ export function wrapRequestHandler( try { res = await handler(); setHttpStatus(span, res.status); - + // After the handler runs, the span name might have been updated by nested instrumentation // (e.g., Remix parameterizing routes). The span should already have the correct name // from that instrumentation, so we don't need to do anything here.