@@ -357,59 +357,77 @@ extension AsyncIO {
357357 )
358358 var readLength : Int = 0
359359 let signalStream = self . registerFileDescriptor ( fileDescriptor, for: . read)
360- /// Outer loop: every iteration signals we are ready to read more data
361- for try await _ in signalStream {
362- /// Inner loop: repeatedly call `.read()` and read more data until:
363- /// 1. We reached EOF (read length is 0), in which case return the result
364- /// 2. We read `maxLength` bytes, in which case return the result
365- /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
366- /// this case we `break` out of the inner loop and wait `.read()` to be
367- /// ready by `await`ing the next signal in the outer loop.
368- while true {
369- let bytesRead = resultBuffer. withUnsafeMutableBufferPointer { bufferPointer in
370- // Get a pointer to the memory at the specified offset
371- let targetCount = bufferPointer. count - readLength
372-
373- let offsetAddress = bufferPointer. baseAddress!. advanced ( by: readLength)
374-
375- // Read directly into the buffer at the offset
376- return _subprocess_read ( fileDescriptor. rawValue, offsetAddress, targetCount)
377- }
378- if bytesRead > 0 {
379- // Read some data
380- readLength += bytesRead
381- if maxLength == . max {
382- // Grow resultBuffer if needed
383- guard Double ( readLength) > 0.8 * Double( resultBuffer. count) else {
384- continue
360+
361+ do {
362+ /// Outer loop: every iteration signals we are ready to read more data
363+ for try await _ in signalStream {
364+ /// Inner loop: repeatedly call `.read()` and read more data until:
365+ /// 1. We reached EOF (read length is 0), in which case return the result
366+ /// 2. We read `maxLength` bytes, in which case return the result
367+ /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
368+ /// this case we `break` out of the inner loop and wait `.read()` to be
369+ /// ready by `await`ing the next signal in the outer loop.
370+ while true {
371+ let bytesRead = resultBuffer. withUnsafeMutableBufferPointer { bufferPointer in
372+ // Get a pointer to the memory at the specified offset
373+ let targetCount = bufferPointer. count - readLength
374+
375+ let offsetAddress = bufferPointer. baseAddress!. advanced ( by: readLength)
376+
377+ // Read directly into the buffer at the offset
378+ return _subprocess_read ( fileDescriptor. rawValue, offsetAddress, targetCount)
379+ }
380+ let capturedErrno = errno
381+ if bytesRead > 0 {
382+ // Read some data
383+ readLength += bytesRead
384+ if maxLength == . max {
385+ // Grow resultBuffer if needed
386+ guard Double ( readLength) > 0.8 * Double( resultBuffer. count) else {
387+ continue
388+ }
389+ resultBuffer. append (
390+ contentsOf: Array ( repeating: 0 , count: resultBuffer. count)
391+ )
392+ } else if readLength >= maxLength {
393+ // When we reached maxLength, return!
394+ try self . removeRegistration ( for: fileDescriptor)
395+ return resultBuffer
385396 }
386- resultBuffer. append (
387- contentsOf: Array ( repeating: 0 , count: resultBuffer. count)
388- )
389- } else if readLength >= maxLength {
390- // When we reached maxLength, return!
397+ } else if bytesRead == 0 {
398+ // We reached EOF. Return whatever's left
391399 try self . removeRegistration ( for: fileDescriptor)
400+ guard readLength > 0 else {
401+ return nil
402+ }
403+ resultBuffer. removeLast ( resultBuffer. count - readLength)
392404 return resultBuffer
393- }
394- } else if bytesRead == 0 {
395- // We reached EOF. Return whatever's left
396- try self . removeRegistration ( for: fileDescriptor)
397- guard readLength > 0 else {
398- return nil
399- }
400- resultBuffer. removeLast ( resultBuffer. count - readLength)
401- return resultBuffer
402- } else {
403- if self . shouldWaitForNextSignal ( with: errno) {
404- // No more data for now wait for the next signal
405- break
406405 } else {
407- // Throw all other errors
408- try self . removeRegistration ( for: fileDescriptor)
409- throw SubprocessError . UnderlyingError ( rawValue: errno)
406+ if self . shouldWaitForNextSignal ( with: capturedErrno) {
407+ // No more data for now wait for the next signal
408+ break
409+ } else {
410+ // Throw all other errors
411+ try self . removeRegistration ( for: fileDescriptor)
412+ perror ( " Throw error directly " )
413+ throw SubprocessError (
414+ code: . init( . failedToReadFromSubprocess) ,
415+ underlyingError: . init( rawValue: capturedErrno)
416+ )
417+ }
410418 }
411419 }
412420 }
421+ } catch {
422+ // Reset error code to .failedToRead to match other platforms
423+ perror ( " Re threw the signal error: \( error) " )
424+ guard let originalError = error as? SubprocessError else {
425+ throw error
426+ }
427+ throw SubprocessError (
428+ code: . init( . failedToReadFromSubprocess) ,
429+ underlyingError: originalError. underlyingError
430+ )
413431 }
414432 resultBuffer. removeLast ( resultBuffer. count - readLength)
415433 return resultBuffer
@@ -432,37 +450,52 @@ extension AsyncIO {
432450 let fileDescriptor = diskIO. channel
433451 let signalStream = self . registerFileDescriptor ( fileDescriptor, for: . write)
434452 var writtenLength : Int = 0
435- /// Outer loop: every iteration signals we are ready to read more data
436- for try await _ in signalStream {
437- /// Inner loop: repeatedly call `.write()` and write more data until:
438- /// 1. We've written bytes.count bytes.
439- /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
440- /// this case we `break` out of the inner loop and wait `.write()` to be
441- /// ready by `await`ing the next signal in the outer loop.
442- while true {
443- let written = bytes. withUnsafeBytes { ptr in
444- let remainingLength = ptr. count - writtenLength
445- let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
446- return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
447- }
448- if written > 0 {
449- writtenLength += written
450- if writtenLength >= bytes. count {
451- // Wrote all data
452- try self . removeRegistration ( for: fileDescriptor)
453- return writtenLength
453+ do {
454+ /// Outer loop: every iteration signals we are ready to read more data
455+ for try await _ in signalStream {
456+ /// Inner loop: repeatedly call `.write()` and write more data until:
457+ /// 1. We've written bytes.count bytes.
458+ /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
459+ /// this case we `break` out of the inner loop and wait `.write()` to be
460+ /// ready by `await`ing the next signal in the outer loop.
461+ while true {
462+ let written = bytes. withUnsafeBytes { ptr in
463+ let remainingLength = ptr. count - writtenLength
464+ let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
465+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
454466 }
455- } else {
456- if self . shouldWaitForNextSignal ( with: errno) {
457- // No more data for now wait for the next signal
458- break
467+ let capturedErrno = errno
468+ if written > 0 {
469+ writtenLength += written
470+ if writtenLength >= bytes. count {
471+ // Wrote all data
472+ try self . removeRegistration ( for: fileDescriptor)
473+ return writtenLength
474+ }
459475 } else {
460- // Throw all other errors
461- try self . removeRegistration ( for: fileDescriptor)
462- throw SubprocessError . UnderlyingError ( rawValue: errno)
476+ if self . shouldWaitForNextSignal ( with: capturedErrno) {
477+ // No more data for now wait for the next signal
478+ break
479+ } else {
480+ // Throw all other errors
481+ try self . removeRegistration ( for: fileDescriptor)
482+ throw SubprocessError (
483+ code: . init( . failedToWriteToSubprocess) ,
484+ underlyingError: . init( rawValue: capturedErrno)
485+ )
486+ }
463487 }
464488 }
465489 }
490+ } catch {
491+ // Reset error code to .failedToWrite to match other platforms
492+ guard let originalError = error as? SubprocessError else {
493+ throw error
494+ }
495+ throw SubprocessError (
496+ code: . init( . failedToWriteToSubprocess) ,
497+ underlyingError: originalError. underlyingError
498+ )
466499 }
467500 return 0
468501 }
@@ -478,37 +511,52 @@ extension AsyncIO {
478511 let fileDescriptor = diskIO. channel
479512 let signalStream = self . registerFileDescriptor ( fileDescriptor, for: . write)
480513 var writtenLength : Int = 0
481- /// Outer loop: every iteration signals we are ready to read more data
482- for try await _ in signalStream {
483- /// Inner loop: repeatedly call `.write()` and write more data until:
484- /// 1. We've written bytes.count bytes.
485- /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
486- /// this case we `break` out of the inner loop and wait `.write()` to be
487- /// ready by `await`ing the next signal in the outer loop.
488- while true {
489- let written = span. withUnsafeBytes { ptr in
490- let remainingLength = ptr. count - writtenLength
491- let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
492- return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
493- }
494- if written > 0 {
495- writtenLength += written
496- if writtenLength >= span. byteCount {
497- // Wrote all data
498- try self . removeRegistration ( for: fileDescriptor)
499- return writtenLength
514+ do {
515+ /// Outer loop: every iteration signals we are ready to read more data
516+ for try await _ in signalStream {
517+ /// Inner loop: repeatedly call `.write()` and write more data until:
518+ /// 1. We've written bytes.count bytes.
519+ /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
520+ /// this case we `break` out of the inner loop and wait `.write()` to be
521+ /// ready by `await`ing the next signal in the outer loop.
522+ while true {
523+ let written = span. withUnsafeBytes { ptr in
524+ let remainingLength = ptr. count - writtenLength
525+ let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
526+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
500527 }
501- } else {
502- if self . shouldWaitForNextSignal ( with: errno) {
503- // No more data for now wait for the next signal
504- break
528+ let capturedErrno = errno
529+ if written > 0 {
530+ writtenLength += written
531+ if writtenLength >= span. byteCount {
532+ // Wrote all data
533+ try self . removeRegistration ( for: fileDescriptor)
534+ return writtenLength
535+ }
505536 } else {
506- // Throw all other errors
507- try self . removeRegistration ( for: fileDescriptor)
508- throw SubprocessError . UnderlyingError ( rawValue: errno)
537+ if self . shouldWaitForNextSignal ( with: capturedErrno) {
538+ // No more data for now wait for the next signal
539+ break
540+ } else {
541+ // Throw all other errors
542+ try self . removeRegistration ( for: fileDescriptor)
543+ throw SubprocessError (
544+ code: . init( . failedToWriteToSubprocess) ,
545+ underlyingError: . init( rawValue: capturedErrno)
546+ )
547+ }
509548 }
510549 }
511550 }
551+ } catch {
552+ // Reset error code to .failedToWrite to match other platforms
553+ guard let originalError = error as? SubprocessError else {
554+ throw error
555+ }
556+ throw SubprocessError (
557+ code: . init( . failedToWriteToSubprocess) ,
558+ underlyingError: originalError. underlyingError
559+ )
512560 }
513561 return 0
514562 }
0 commit comments