@@ -40,7 +40,7 @@ final class AsyncIO: @unchecked Sendable {
4040 ) throws -> ResultType ) rethrows -> ResultType
4141 }
4242
43- private final class MonitorThreadContext {
43+ private struct MonitorThreadContext : @ unchecked Sendable {
4444 let ioCompletionPort : HANDLE
4545
4646 init ( ioCompletionPort: HANDLE ) {
@@ -57,9 +57,9 @@ final class AsyncIO: @unchecked Sendable {
5757 internal init ( ) {
5858 var maybeSetupError : SubprocessError ? = nil
5959 // Create the the completion port
60- guard let port = CreateIoCompletionPort (
60+ guard let ioCompletionPort = CreateIoCompletionPort (
6161 INVALID_HANDLE_VALUE, nil , 0 , 0
62- ) , port != INVALID_HANDLE_VALUE else {
62+ ) , ioCompletionPort != INVALID_HANDLE_VALUE else {
6363 let error = SubprocessError (
6464 code: . init( . asyncIOFailed( " CreateIoCompletionPort failed " ) ) ,
6565 underlyingError: . init( rawValue: GetLastError ( ) )
@@ -68,89 +68,81 @@ final class AsyncIO: @unchecked Sendable {
6868 self . monitorThread = . failure( error)
6969 return
7070 }
71- self . ioCompletionPort = . success( port )
71+ self . ioCompletionPort = . success( ioCompletionPort )
7272 // Create monitor thread
73- let threadContext = MonitorThreadContext ( ioCompletionPort: port)
74- let threadContextPtr = Unmanaged . passRetained ( threadContext)
75- /// Microsoft documentation for `CreateThread` states:
76- /// > A thread in an executable that calls the C run-time library (CRT)
77- /// > should use the _beginthreadex and _endthreadex functions for
78- /// > thread management rather than CreateThread and ExitThread
79- let threadHandleValue = _beginthreadex ( nil , 0 , { args in
80- func reportError( _ error: SubprocessError ) {
81- let continuations = _registration. withLock { store in
82- return store. values
83- }
84- for continuation in continuations {
85- continuation. finish ( throwing: error)
73+ let context = MonitorThreadContext ( ioCompletionPort: ioCompletionPort)
74+ let threadHandle : HANDLE
75+ do {
76+ threadHandle = try begin_thread_x {
77+ func reportError( _ error: SubprocessError ) {
78+ let continuations = _registration. withLock { store in
79+ return store. values
80+ }
81+ for continuation in continuations {
82+ continuation. finish ( throwing: error)
83+ }
8684 }
87- }
8885
89- let unmanaged = Unmanaged< MonitorThreadContext> . fromOpaque( args!)
90- let context = unmanaged. takeRetainedValue ( )
91-
92- // Monitor loop
93- while true {
94- var bytesTransferred : DWORD = 0
95- var targetFileDescriptor : UInt64 = 0
96- var overlapped : LPOVERLAPPED ? = nil
97-
98- let monitorResult = GetQueuedCompletionStatus (
99- context. ioCompletionPort,
100- & bytesTransferred,
101- & targetFileDescriptor,
102- & overlapped,
103- INFINITE
104- )
105- if !monitorResult {
106- let lastError = GetLastError ( )
107- if lastError == ERROR_BROKEN_PIPE {
108- // We finished reading the handle. Signal EOF by
109- // finishing the stream.
110- // NOTE: here we deliberately leave now unused continuation
111- // in the store. Windows does not offer an API to remove a
112- // HANDLE from an IOCP port, therefore we leave the registration
113- // to signify the HANDLE has already been resisted.
114- let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
115- if let continuation = store [ targetFileDescriptor] {
116- return continuation
86+ // Monitor loop
87+ while true {
88+ var bytesTransferred : DWORD = 0
89+ var targetFileDescriptor : UInt64 = 0
90+ var overlapped : LPOVERLAPPED ? = nil
91+
92+ let monitorResult = GetQueuedCompletionStatus (
93+ context. ioCompletionPort,
94+ & bytesTransferred,
95+ & targetFileDescriptor,
96+ & overlapped,
97+ INFINITE
98+ )
99+ if !monitorResult {
100+ let lastError = GetLastError ( )
101+ if lastError == ERROR_BROKEN_PIPE {
102+ // We finished reading the handle. Signal EOF by
103+ // finishing the stream.
104+ // NOTE: here we deliberately leave now unused continuation
105+ // in the store. Windows does not offer an API to remove a
106+ // HANDLE from an IOCP port, therefore we leave the registration
107+ // to signify the HANDLE has already been resisted.
108+ let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
109+ if let continuation = store [ targetFileDescriptor] {
110+ return continuation
111+ }
112+ return nil
117113 }
118- return nil
114+ continuation? . finish ( )
115+ continue
116+ } else {
117+ let error = SubprocessError (
118+ code: . init( . asyncIOFailed( " GetQueuedCompletionStatus failed " ) ) ,
119+ underlyingError: . init( rawValue: lastError)
120+ )
121+ reportError ( error)
122+ break
119123 }
120- continuation? . finish ( )
121- continue
122- } else {
123- let error = SubprocessError (
124- code: . init( . asyncIOFailed( " GetQueuedCompletionStatus failed " ) ) ,
125- underlyingError: . init( rawValue: lastError)
126- )
127- reportError ( error)
128- break
129124 }
130- }
131125
132- // Breakout the monitor loop if we received shutdown from the shutdownFD
133- if targetFileDescriptor == shutdownPort {
134- break
135- }
136- // Notify the continuations
137- let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
138- if let continuation = store [ targetFileDescriptor] {
139- return continuation
126+ // Breakout the monitor loop if we received shutdown from the shutdownFD
127+ if targetFileDescriptor == shutdownPort {
128+ break
140129 }
141- return nil
130+ // Notify the continuations
131+ let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
132+ if let continuation = store [ targetFileDescriptor] {
133+ return continuation
134+ }
135+ return nil
136+ }
137+ continuation? . yield ( bytesTransferred)
142138 }
143- continuation? . yield ( bytesTransferred)
139+
140+ return 0
144141 }
145- return 0
146- } , threadContextPtr. toOpaque ( ) , 0 , nil )
147- guard threadHandleValue > 0 ,
148- let threadHandle = HANDLE ( bitPattern: threadHandleValue) else {
149- // _beginthreadex uses errno instead of GetLastError()
150- let capturedError = _subprocess_windows_get_errno ( )
142+ } catch let underlyingError {
151143 let error = SubprocessError (
152- code: . init( . asyncIOFailed( " _beginthreadex failed " ) ) ,
153- underlyingError: . init ( rawValue : capturedError )
144+ code: . init( . asyncIOFailed( " Failed to create monitor thread " ) ) ,
145+ underlyingError: underlyingError
154146 )
155147 self . monitorThread = . failure( error)
156148 return
0 commit comments