@@ -357,59 +357,75 @@ extension AsyncIO {
357357 )
358358 var readLength : Int = 0
359359 let signalStream = self . registerFileDescriptor ( fileDescriptor, for: . read)
360- /// Outer loop: every iteration signals we are ready to read more data
361- for try await _ in signalStream {
362- /// Inner loop: repeatedly call `.read()` and read more data until:
363- /// 1. We reached EOF (read length is 0), in which case return the result
364- /// 2. We read `maxLength` bytes, in which case return the result
365- /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
366- /// this case we `break` out of the inner loop and wait `.read()` to be
367- /// ready by `await`ing the next signal in the outer loop.
368- while true {
369- let bytesRead = resultBuffer. withUnsafeMutableBufferPointer { bufferPointer in
370- // Get a pointer to the memory at the specified offset
371- let targetCount = bufferPointer. count - readLength
372-
373- let offsetAddress = bufferPointer. baseAddress!. advanced ( by: readLength)
374-
375- // Read directly into the buffer at the offset
376- return _subprocess_read ( fileDescriptor. rawValue, offsetAddress, targetCount)
377- }
378- if bytesRead > 0 {
379- // Read some data
380- readLength += bytesRead
381- if maxLength == . max {
382- // Grow resultBuffer if needed
383- guard Double ( readLength) > 0.8 * Double( resultBuffer. count) else {
384- continue
360+
361+ do {
362+ /// Outer loop: every iteration signals we are ready to read more data
363+ for try await _ in signalStream {
364+ /// Inner loop: repeatedly call `.read()` and read more data until:
365+ /// 1. We reached EOF (read length is 0), in which case return the result
366+ /// 2. We read `maxLength` bytes, in which case return the result
367+ /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
368+ /// this case we `break` out of the inner loop and wait `.read()` to be
369+ /// ready by `await`ing the next signal in the outer loop.
370+ while true {
371+ let bytesRead = resultBuffer. withUnsafeMutableBufferPointer { bufferPointer in
372+ // Get a pointer to the memory at the specified offset
373+ let targetCount = bufferPointer. count - readLength
374+
375+ let offsetAddress = bufferPointer. baseAddress!. advanced ( by: readLength)
376+
377+ // Read directly into the buffer at the offset
378+ return _subprocess_read ( fileDescriptor. rawValue, offsetAddress, targetCount)
379+ }
380+ let capturedErrno = errno
381+ if bytesRead > 0 {
382+ // Read some data
383+ readLength += bytesRead
384+ if maxLength == . max {
385+ // Grow resultBuffer if needed
386+ guard Double ( readLength) > 0.8 * Double( resultBuffer. count) else {
387+ continue
388+ }
389+ resultBuffer. append (
390+ contentsOf: Array ( repeating: 0 , count: resultBuffer. count)
391+ )
392+ } else if readLength >= maxLength {
393+ // When we reached maxLength, return!
394+ try self . removeRegistration ( for: fileDescriptor)
395+ return resultBuffer
385396 }
386- resultBuffer. append (
387- contentsOf: Array ( repeating: 0 , count: resultBuffer. count)
388- )
389- } else if readLength >= maxLength {
390- // When we reached maxLength, return!
397+ } else if bytesRead == 0 {
398+ // We reached EOF. Return whatever's left
391399 try self . removeRegistration ( for: fileDescriptor)
400+ guard readLength > 0 else {
401+ return nil
402+ }
403+ resultBuffer. removeLast ( resultBuffer. count - readLength)
392404 return resultBuffer
393- }
394- } else if bytesRead == 0 {
395- // We reached EOF. Return whatever's left
396- try self . removeRegistration ( for: fileDescriptor)
397- guard readLength > 0 else {
398- return nil
399- }
400- resultBuffer. removeLast ( resultBuffer. count - readLength)
401- return resultBuffer
402- } else {
403- if self . shouldWaitForNextSignal ( with: errno) {
404- // No more data for now wait for the next signal
405- break
406405 } else {
407- // Throw all other errors
408- try self . removeRegistration ( for: fileDescriptor)
409- throw SubprocessError . UnderlyingError ( rawValue: errno)
406+ if self . shouldWaitForNextSignal ( with: capturedErrno) {
407+ // No more data for now wait for the next signal
408+ break
409+ } else {
410+ // Throw all other errors
411+ try self . removeRegistration ( for: fileDescriptor)
412+ throw SubprocessError (
413+ code: . init( . failedToReadFromSubprocess) ,
414+ underlyingError: . init( rawValue: capturedErrno)
415+ )
416+ }
410417 }
411418 }
412419 }
420+ } catch {
421+ // Reset error code to .failedToRead to match other platforms
422+ guard let originalError = error as? SubprocessError else {
423+ throw error
424+ }
425+ throw SubprocessError (
426+ code: . init( . failedToReadFromSubprocess) ,
427+ underlyingError: originalError. underlyingError
428+ )
413429 }
414430 resultBuffer. removeLast ( resultBuffer. count - readLength)
415431 return resultBuffer
@@ -432,37 +448,52 @@ extension AsyncIO {
432448 let fileDescriptor = diskIO. channel
433449 let signalStream = self . registerFileDescriptor ( fileDescriptor, for: . write)
434450 var writtenLength : Int = 0
435- /// Outer loop: every iteration signals we are ready to read more data
436- for try await _ in signalStream {
437- /// Inner loop: repeatedly call `.write()` and write more data until:
438- /// 1. We've written bytes.count bytes.
439- /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
440- /// this case we `break` out of the inner loop and wait `.write()` to be
441- /// ready by `await`ing the next signal in the outer loop.
442- while true {
443- let written = bytes. withUnsafeBytes { ptr in
444- let remainingLength = ptr. count - writtenLength
445- let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
446- return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
447- }
448- if written > 0 {
449- writtenLength += written
450- if writtenLength >= bytes. count {
451- // Wrote all data
452- try self . removeRegistration ( for: fileDescriptor)
453- return writtenLength
451+ do {
452+ /// Outer loop: every iteration signals we are ready to read more data
453+ for try await _ in signalStream {
454+ /// Inner loop: repeatedly call `.write()` and write more data until:
455+ /// 1. We've written bytes.count bytes.
456+ /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
457+ /// this case we `break` out of the inner loop and wait `.write()` to be
458+ /// ready by `await`ing the next signal in the outer loop.
459+ while true {
460+ let written = bytes. withUnsafeBytes { ptr in
461+ let remainingLength = ptr. count - writtenLength
462+ let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
463+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
454464 }
455- } else {
456- if self . shouldWaitForNextSignal ( with: errno) {
457- // No more data for now wait for the next signal
458- break
465+ let capturedErrno = errno
466+ if written > 0 {
467+ writtenLength += written
468+ if writtenLength >= bytes. count {
469+ // Wrote all data
470+ try self . removeRegistration ( for: fileDescriptor)
471+ return writtenLength
472+ }
459473 } else {
460- // Throw all other errors
461- try self . removeRegistration ( for: fileDescriptor)
462- throw SubprocessError . UnderlyingError ( rawValue: errno)
474+ if self . shouldWaitForNextSignal ( with: capturedErrno) {
475+ // No more data for now wait for the next signal
476+ break
477+ } else {
478+ // Throw all other errors
479+ try self . removeRegistration ( for: fileDescriptor)
480+ throw SubprocessError (
481+ code: . init( . failedToWriteToSubprocess) ,
482+ underlyingError: . init( rawValue: capturedErrno)
483+ )
484+ }
463485 }
464486 }
465487 }
488+ } catch {
489+ // Reset error code to .failedToWrite to match other platforms
490+ guard let originalError = error as? SubprocessError else {
491+ throw error
492+ }
493+ throw SubprocessError (
494+ code: . init( . failedToWriteToSubprocess) ,
495+ underlyingError: originalError. underlyingError
496+ )
466497 }
467498 return 0
468499 }
@@ -478,37 +509,52 @@ extension AsyncIO {
478509 let fileDescriptor = diskIO. channel
479510 let signalStream = self . registerFileDescriptor ( fileDescriptor, for: . write)
480511 var writtenLength : Int = 0
481- /// Outer loop: every iteration signals we are ready to read more data
482- for try await _ in signalStream {
483- /// Inner loop: repeatedly call `.write()` and write more data until:
484- /// 1. We've written bytes.count bytes.
485- /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
486- /// this case we `break` out of the inner loop and wait `.write()` to be
487- /// ready by `await`ing the next signal in the outer loop.
488- while true {
489- let written = span. withUnsafeBytes { ptr in
490- let remainingLength = ptr. count - writtenLength
491- let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
492- return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
493- }
494- if written > 0 {
495- writtenLength += written
496- if writtenLength >= span. byteCount {
497- // Wrote all data
498- try self . removeRegistration ( for: fileDescriptor)
499- return writtenLength
512+ do {
513+ /// Outer loop: every iteration signals we are ready to read more data
514+ for try await _ in signalStream {
515+ /// Inner loop: repeatedly call `.write()` and write more data until:
516+ /// 1. We've written bytes.count bytes.
517+ /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In
518+ /// this case we `break` out of the inner loop and wait `.write()` to be
519+ /// ready by `await`ing the next signal in the outer loop.
520+ while true {
521+ let written = span. withUnsafeBytes { ptr in
522+ let remainingLength = ptr. count - writtenLength
523+ let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
524+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
500525 }
501- } else {
502- if self . shouldWaitForNextSignal ( with: errno) {
503- // No more data for now wait for the next signal
504- break
526+ let capturedErrno = errno
527+ if written > 0 {
528+ writtenLength += written
529+ if writtenLength >= span. byteCount {
530+ // Wrote all data
531+ try self . removeRegistration ( for: fileDescriptor)
532+ return writtenLength
533+ }
505534 } else {
506- // Throw all other errors
507- try self . removeRegistration ( for: fileDescriptor)
508- throw SubprocessError . UnderlyingError ( rawValue: errno)
535+ if self . shouldWaitForNextSignal ( with: capturedErrno) {
536+ // No more data for now wait for the next signal
537+ break
538+ } else {
539+ // Throw all other errors
540+ try self . removeRegistration ( for: fileDescriptor)
541+ throw SubprocessError (
542+ code: . init( . failedToWriteToSubprocess) ,
543+ underlyingError: . init( rawValue: capturedErrno)
544+ )
545+ }
509546 }
510547 }
511548 }
549+ } catch {
550+ // Reset error code to .failedToWrite to match other platforms
551+ guard let originalError = error as? SubprocessError else {
552+ throw error
553+ }
554+ throw SubprocessError (
555+ code: . init( . failedToWriteToSubprocess) ,
556+ underlyingError: originalError. underlyingError
557+ )
512558 }
513559 return 0
514560 }
0 commit comments