@@ -105,6 +105,7 @@ describe("RunSubscription", () => {
105105 runShapeStream : createTestShapeStream ( shapes ) ,
106106 streamFactory : new TestStreamSubscriptionFactory ( ) ,
107107 closeOnComplete : true ,
108+ abortController : new AbortController ( ) ,
108109 } ) ;
109110
110111 const results = await convertAsyncIterableToArray ( subscription ) ;
@@ -144,6 +145,7 @@ describe("RunSubscription", () => {
144145 runShapeStream : createTestShapeStream ( shapes ) ,
145146 streamFactory : new TestStreamSubscriptionFactory ( ) ,
146147 closeOnComplete : true ,
148+ abortController : new AbortController ( ) ,
147149 } ) ;
148150
149151 const results = await convertAsyncIterableToArray ( subscription ) ;
@@ -196,6 +198,7 @@ describe("RunSubscription", () => {
196198 runShapeStream : createDelayedTestShapeStream ( shapes ) ,
197199 streamFactory : new TestStreamSubscriptionFactory ( ) ,
198200 closeOnComplete : false ,
201+ abortController : new AbortController ( ) ,
199202 } ) ;
200203
201204 // Collect 2 results
@@ -247,6 +250,7 @@ describe("RunSubscription", () => {
247250 const subscription = new RunSubscription ( {
248251 runShapeStream : createTestShapeStream ( shapes ) ,
249252 streamFactory,
253+ abortController : new AbortController ( ) ,
250254 } ) ;
251255
252256 const results = await collectNResults (
@@ -336,6 +340,7 @@ describe("RunSubscription", () => {
336340 const subscription = new RunSubscription ( {
337341 runShapeStream : createTestShapeStream ( shapes ) ,
338342 streamFactory,
343+ abortController : new AbortController ( ) ,
339344 } ) ;
340345
341346 const results = await collectNResults (
@@ -415,6 +420,7 @@ describe("RunSubscription", () => {
415420 const subscription = new RunSubscription ( {
416421 runShapeStream : createTestShapeStream ( shapes ) ,
417422 streamFactory,
423+ abortController : new AbortController ( ) ,
418424 } ) ;
419425
420426 const results = await collectNResults (
@@ -459,110 +465,6 @@ describe("RunSubscription", () => {
459465 run : { id : "run_123" } ,
460466 } ) ;
461467 } ) ;
462-
463- it ( "should handle streams that appear in different run updates" , async ( ) => {
464- const streamFactory = new TestStreamSubscriptionFactory ( ) ;
465-
466- // Set up test chunks for two different streams
467- streamFactory . setStreamChunks ( "run_123" , "openai" , [
468- { id : "openai1" , content : "Hello" } ,
469- { id : "openai2" , content : "World" } ,
470- ] ) ;
471- streamFactory . setStreamChunks ( "run_123" , "anthropic" , [
472- { id : "claude1" , message : "Hi" } ,
473- { id : "claude2" , message : "There" } ,
474- ] ) ;
475-
476- const shapes = [
477- // First run update - only has openai stream
478- {
479- id : "123" ,
480- friendlyId : "run_123" ,
481- taskIdentifier : "multi-streaming" ,
482- status : "EXECUTING" ,
483- createdAt : new Date ( ) ,
484- updatedAt : new Date ( ) ,
485- number : 1 ,
486- usageDurationMs : 100 ,
487- costInCents : 0 ,
488- baseCostInCents : 0 ,
489- isTest : false ,
490- runTags : [ ] ,
491- metadata : JSON . stringify ( {
492- $$streams : [ "openai" ] ,
493- } ) ,
494- metadataType : "application/json" ,
495- } ,
496- // Second run update - adds anthropic stream
497- {
498- id : "123" ,
499- friendlyId : "run_123" ,
500- taskIdentifier : "multi-streaming" ,
501- status : "EXECUTING" ,
502- createdAt : new Date ( ) ,
503- updatedAt : new Date ( ) ,
504- number : 1 ,
505- usageDurationMs : 200 ,
506- costInCents : 0 ,
507- baseCostInCents : 0 ,
508- isTest : false ,
509- runTags : [ ] ,
510- metadata : JSON . stringify ( {
511- $$streams : [ "openai" , "anthropic" ] ,
512- } ) ,
513- metadataType : "application/json" ,
514- } ,
515- // Final run update - marks as complete
516- {
517- id : "123" ,
518- friendlyId : "run_123" ,
519- taskIdentifier : "multi-streaming" ,
520- status : "COMPLETED_SUCCESSFULLY" ,
521- createdAt : new Date ( ) ,
522- updatedAt : new Date ( ) ,
523- completedAt : new Date ( ) ,
524- number : 1 ,
525- usageDurationMs : 300 ,
526- costInCents : 0 ,
527- baseCostInCents : 0 ,
528- isTest : false ,
529- runTags : [ ] ,
530- metadata : JSON . stringify ( {
531- $$streams : [ "openai" , "anthropic" ] ,
532- } ) ,
533- metadataType : "application/json" ,
534- } ,
535- ] ;
536-
537- const subscription = new RunSubscription ( {
538- runShapeStream : createTestShapeStream ( shapes ) ,
539- streamFactory,
540- closeOnComplete : true ,
541- } ) ;
542-
543- const results = await collectNResults (
544- subscription . withStreams < {
545- openai : { id : string ; content : string } ;
546- anthropic : { id : string ; message : string } ;
547- } > ( ) ,
548- 7 // 3 runs + 2 openai chunks + 2 anthropic chunks
549- ) ;
550-
551- expect ( results ) . toHaveLength ( 7 ) ;
552-
553- // Verify run updates
554- const runUpdates = results . filter ( ( r ) => r . type === "run" ) ;
555- expect ( runUpdates ) . toHaveLength ( 3 ) ;
556- expect ( runUpdates [ 2 ] ! . run . status ) . toBe ( "COMPLETED" ) ;
557-
558- // Verify openai chunks
559- const openaiChunks = results . filter ( ( r ) => r . type === "openai" ) ;
560- expect ( openaiChunks ) . toHaveLength ( 2 ) ;
561-
562- // Verify anthropic chunks
563- const anthropicChunks = results . filter ( ( r ) => r . type === "anthropic" ) ;
564- expect ( anthropicChunks ) . toHaveLength ( 2 ) ;
565- } ) ;
566468} ) ;
567469
568470export async function convertAsyncIterableToArray < T > ( iterable : AsyncIterable < T > ) : Promise < T [ ] > {
@@ -595,7 +497,12 @@ async function collectNResults<T>(
595497 promise ,
596498 new Promise < T [ ] > ( ( _ , reject ) =>
597499 setTimeout (
598- ( ) => reject ( new Error ( `Timeout waiting for ${ count } results after ${ timeoutMs } ms` ) ) ,
500+ ( ) =>
501+ reject (
502+ new Error (
503+ `Timeout waiting for ${ count } results after ${ timeoutMs } ms, but only had ${ results . length } `
504+ )
505+ ) ,
599506 timeoutMs
600507 )
601508 ) ,
0 commit comments