1111 */
1212package com.redhat.devtools.gateway.openshift
1313
14+ import com.intellij.openapi.diagnostic.logger
15+ import com.redhat.devtools.gateway.view.ui.Dialogs
1416import io.kubernetes.client.Exec
1517import io.kubernetes.client.PortForward
1618import io.kubernetes.client.openapi.ApiClient
@@ -21,12 +23,14 @@ import io.kubernetes.client.openapi.models.V1PodList
2123import kotlinx.coroutines.CoroutineScope
2224import kotlinx.coroutines.Dispatchers
2325import kotlinx.coroutines.Job
26+ import kotlinx.coroutines.NonCancellable
27+ import kotlinx.coroutines.SupervisorJob
2428import kotlinx.coroutines.cancel
2529import kotlinx.coroutines.cancelAndJoin
2630import kotlinx.coroutines.coroutineScope
31+ import kotlinx.coroutines.delay
2732import kotlinx.coroutines.ensureActive
2833import kotlinx.coroutines.isActive
29- import kotlinx.coroutines.job
3034import kotlinx.coroutines.launch
3135import kotlinx.coroutines.runBlocking
3236import kotlinx.coroutines.withContext
@@ -35,13 +39,22 @@ import java.io.Closeable
3539import java.io.IOException
3640import java.io.InputStream
3741import java.io.OutputStream
42+ import java.net.BindException
3843import java.net.InetAddress
44+ import java.net.InetSocketAddress
3945import java.net.ServerSocket
4046import java.net.Socket
4147import java.util.concurrent.TimeUnit
4248
4349class Pods (private val client : ApiClient ) {
4450
51+ companion object {
52+ private const val CONNECT_ATTEMPTS = 5
53+ private const val RECONNECT_DELAY : Long = 1000
54+ }
55+
56+ private val logger = logger<Pods >()
57+
4558 // Example:
4659 // https://github.com/kubernetes-client/java/blob/master/examples/examples-release-latest/src/main/java/io/kubernetes/client/examples/ExecExample.java
4760 @Throws(IOException ::class )
@@ -89,47 +102,103 @@ class Pods(private val client: ApiClient) {
89102 }
90103
91104 private fun CoroutineScope.copyStream (input : InputStream , output : ByteArrayOutputStream
92- ): Job = launch(Dispatchers .IO ) {
93- input.copyTo(output)
94- }
105+ ): Job = launch(Dispatchers .IO ) {
106+ input.copyTo(output)
107+ }
95108
96- // Example:
97- // https://github.com/kubernetes-client/java/blob/master/examples/examples-release-latest/src/main/java/io/kubernetes/client/examples/PortForwardExample.java
98109 @Throws(IOException ::class )
99110 fun forward (pod : V1Pod , localPort : Int , remotePort : Int ): Closeable {
100- val portForward = PortForward (client)
101- val forwardResult = portForward.forward(pod, listOf (remotePort))
102111 val serverSocket = ServerSocket (localPort, 50 , InetAddress .getLoopbackAddress())
112+ val scope = CoroutineScope (
113+ // dont cancel if child coroutine fails + use blocking I/O scope
114+ SupervisorJob () + Dispatchers .IO
115+ )
103116
104- val scope = CoroutineScope (Dispatchers .IO )
105- scope.launch {
106- val clientSocket = serverSocket.accept()
107- try {
108- copyStreams(clientSocket, forwardResult, remotePort)
109- } catch (e: IOException ) {
110- if (coroutineContext.isActive) throw e
111- } finally {
112- clientSocket.close()
113- }
114- }
117+ scope.acceptConnections(serverSocket, pod, localPort, remotePort)
115118
116119 return Closeable {
117- try {
118- scope.cancel()
119- runBlocking {
120- scope.coroutineContext.job.join()
120+ runCatching { serverSocket.close() }
121+ scope.cancel()
122+ }
123+ }
124+
125+ private fun CoroutineScope.acceptConnections (
126+ serverSocket : ServerSocket ,
127+ pod : V1Pod ,
128+ localPort : Int ,
129+ remotePort : Int
130+ ) {
131+ launch {
132+ logger.info(" Starting port forward on local port $localPort ..." )
133+
134+ while (isActive) {
135+ val clientSocket = createClientSocket(serverSocket) ? : break
136+
137+ launch {
138+ handleConnection(
139+ clientSocket,
140+ pod,
141+ localPort,
142+ remotePort
143+ )
121144 }
145+ }
146+ }
147+ }
122148
123- serverSocket.close()
149+ private suspend fun createClientSocket (serverSocket : ServerSocket ): Socket ? {
150+ return try {
151+ withContext(NonCancellable ) {
152+ // block until connection is accepted
153+ serverSocket.accept()
154+ }
155+ } catch (_: Exception ) {
156+ logger.info(" Server socket stopped accepting connections." )
157+ null
158+ }
159+ }
160+
161+ private suspend fun CoroutineScope.handleConnection (
162+ clientSocket : Socket ,
163+ pod : V1Pod ,
164+ localPort : Int ,
165+ remotePort : Int
166+ ) {
167+ try {
168+ repeat(CONNECT_ATTEMPTS ) { attempt ->
169+ if (! isActive) return @repeat
170+
171+ var forwardResult: PortForward .PortForwardResult ? = null
124172 try {
125- forwardResult.getInputStream(remotePort).close()
126- forwardResult.getOutboundStream(remotePort).close()
127- } catch (_: Exception ) {
128- // Ignore errors when closing streams
173+ logger.info(" Attempt #${attempt + 1 } : Connecting $localPort -> $remotePort ..." )
174+ val portForward = PortForward (client)
175+ forwardResult = portForward.forward(pod, listOf (remotePort))
176+ copyStreams(clientSocket, forwardResult, remotePort)
177+ return
178+ } catch (e: IOException ) {
179+ logger.info(
180+ " Could not port forward $localPort -> $remotePort : ${e.message} . " +
181+ " Retrying in ${RECONNECT_DELAY } ms..."
182+ )
183+ if (isActive) {
184+ delay(RECONNECT_DELAY )
185+ }
186+ } finally {
187+ runCatching {
188+ forwardResult?.getInputStream(remotePort)?.close()
189+ forwardResult?.getOutboundStream(remotePort)?.close()
190+ }
129191 }
130- } catch (_: Exception ) {
131- // Ignore cleanup errors
132192 }
193+ } catch (e: Exception ) {
194+ logger.info(
195+ " Could not port forward to pod ${pod.metadata?.name} using port $localPort -> $remotePort " ,
196+ e)
197+ Dialogs .error(
198+ " Could not port forward to pod ${pod.metadata?.name} using port $localPort -> $remotePort : ${e.message} " ,
199+ " Port Forward Error" )
200+ } finally {
201+ runCatching { clientSocket.close() }
133202 }
134203 }
135204
@@ -142,18 +211,22 @@ class Pods(private val client: ApiClient) {
142211 coroutineScope {
143212 ensureActive()
144213 launch {
145- copyStreams(forwardResult .getInputStream(remotePort), clientSocket.getOutputStream( ))
214+ clientSocket .getInputStream().copyToAndHandleExceptions(forwardResult.getOutboundStream(remotePort ))
146215 }
147216 launch {
148- copyStreams(clientSocket .getInputStream(), forwardResult.getOutboundStream(remotePort ))
217+ forwardResult .getInputStream(remotePort).copyToAndHandleExceptions(clientSocket.getOutputStream( ))
149218 }
150219 }
151220 }
152221
153- @Throws(IOException ::class )
154- private fun copyStreams (source : InputStream , destination : OutputStream ) {
155- source.copyTo(destination)
156- destination.run { flush() }
222+ private fun InputStream.copyToAndHandleExceptions (destination : OutputStream ) {
223+ try {
224+ this .copyTo(destination)
225+ destination.flush()
226+ } catch (e: IOException ) {
227+ logger.info(" IOException during stream copy: ${e.message} " )
228+ throw e
229+ }
157230 }
158231
159232 @Throws(IOException ::class )
@@ -164,11 +237,11 @@ class Pods(private val client: ApiClient) {
164237 repeat(maxRetries) { attempt ->
165238 try {
166239 val testSocket = ServerSocket ()
167- testSocket.bind(java.net. InetSocketAddress (" 127.0.0.1" , port))
240+ testSocket.bind(InetSocketAddress (" 127.0.0.1" , port))
168241 testSocket.close()
169242 // If we can bind to the port, it means port forwarding is not ready yet
170243 Thread .sleep(retryDelay)
171- } catch (_: java.net. BindException ) {
244+ } catch (_: BindException ) {
172245 // Port is already in use, which means port forwarding is ready
173246 return
174247 } catch (e: Exception ) {
@@ -195,4 +268,4 @@ class Pods(private val client: ApiClient) {
195268 .labelSelector(labelSelector)
196269 .execute();
197270 }
198- }
271+ }
0 commit comments