From 4f7fd60291aa372e3035e5a903791b09228db31f Mon Sep 17 00:00:00 2001 From: marko asplund Date: Mon, 21 Oct 2019 22:58:32 +0300 Subject: [PATCH 1/8] wip --- build.sbt | 6 +++--- src/main/scala/spinoco/fs2/http/HttpClient.scala | 8 +++++--- src/main/scala/spinoco/fs2/http/HttpServer.scala | 7 +++++-- src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala | 4 +++- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index a1ed552..f6be80a 100644 --- a/build.sbt +++ b/build.sbt @@ -31,9 +31,9 @@ lazy val commonSettings = Seq( , "org.scodec" %% "scodec-core" % "1.10.3" , "com.spinoco" %% "protocol-http" % "0.3.15" , "com.spinoco" %% "protocol-websocket" % "0.3.15" - , "co.fs2" %% "fs2-core" % "1.0.0" - , "co.fs2" %% "fs2-io" % "1.0.0" - , "com.spinoco" %% "fs2-crypto" % "0.4.0" + , "co.fs2" %% "fs2-core" % "2.0.1" + , "co.fs2" %% "fs2-io" % "2.0.1" + , "com.spinoco" %% "fs2-crypto" % "0.5.0-SNAPSHOT" , "org.scalacheck" %% "scalacheck" % "1.13.4" % "test" ), scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")), diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 7b75ad9..9edb572 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -128,7 +128,9 @@ trait HttpClient[F[_]] { , timeout: Duration ): Stream[F, HttpResponse[F]] = { Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address => - Stream.resource(io.tcp.client[F](address)) + val b: cats.effect.Blocker = ??? + + Stream.resource(new io.tcp.SocketGroup(AG, b).client[F](address)) .evalMap { socket => if (!request.isSecure) Applicative[F].pure(socket) else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host) @@ -174,7 +176,7 @@ trait HttpClient[F[_]] { timeout match { case fin: FiniteDuration => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start => - HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent => val remains = fin - (sent - start).millis @@ -186,7 +188,7 @@ trait HttpClient[F[_]] { }}}} case _ => - HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec) } } diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index 11d9b27..d1801a7 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -3,7 +3,7 @@ package spinoco.fs2.http import java.net.InetSocketAddress import java.nio.channels.AsynchronousChannelGroup -import cats.effect.{ConcurrentEffect, Sync, Timer} +import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer} import cats.syntax.all._ import fs2._ import fs2.concurrent.SignallingRef @@ -57,8 +57,11 @@ object HttpServer { case fin: FiniteDuration => (true, fin) case _ => (false, 0.millis) } + val b: cats.effect.Blocker = ??? + implicit val cs: ContextShift[F] = ??? - io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => + + new io.tcp.SocketGroup(AG, b).server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => eval(SignallingRef(initial)).flatMap { timeoutSignal => readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize) diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index 38dfaec..cdcadd3 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -95,7 +95,9 @@ object WebSocket { import spinoco.fs2.http.internal._ import Stream._ eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address => - Stream.resource(io.tcp.client[F](address, receiveBufferSize = receiveBufferSize)) + val b: cats.effect.Blocker = ??? + + Stream.resource(new io.tcp.SocketGroup(AG, b).client[F](address, receiveBufferSize = receiveBufferSize)) .evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) } .flatMap { socket => val (header, fingerprint) = impl.createRequestHeaders(request.header) From b5aa915dd4dec273378561b18a3f0862a43632eb Mon Sep 17 00:00:00 2001 From: marko asplund Date: Tue, 22 Oct 2019 08:37:17 +0300 Subject: [PATCH 2/8] properly pass ContextShift --- src/main/scala/spinoco/fs2/http/HttpServer.scala | 4 +--- src/main/scala/spinoco/fs2/http/http.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index d1801a7..e44130a 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -36,7 +36,7 @@ object HttpServer { * Request is not suplied if failure happened before request was constructed. * */ - def apply[F[_] : ConcurrentEffect : Timer]( + def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 , maxHeaderSize: Int = 10 *1024 @@ -58,8 +58,6 @@ object HttpServer { case _ => (false, 0.millis) } val b: cats.effect.Blocker = ??? - implicit val cs: ContextShift[F] = ??? - new io.tcp.SocketGroup(AG, b).server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 94624ec..728d8b1 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -30,7 +30,7 @@ package object http { * Request will fail, if the header won't be read within this timeout. * @param service Pipe that defines handling of each incoming request and produces a response */ - def server[F[_] : ConcurrentEffect : Timer]( + def server[F[_] : ConcurrentEffect : ContextShift : Timer]( bindTo: InetSocketAddress , maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 From 0a75caf7fb13bf6cf55ee253e97462e5f7771799 Mon Sep 17 00:00:00 2001 From: marko asplund Date: Tue, 22 Oct 2019 20:04:57 +0300 Subject: [PATCH 3/8] pass cats.effect.Blocker --- src/main/scala/spinoco/fs2/http/HttpClient.scala | 6 +++--- src/main/scala/spinoco/fs2/http/HttpServer.scala | 8 ++++---- src/main/scala/spinoco/fs2/http/http.scala | 12 +++++++++--- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 9edb572..04123b2 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -110,12 +110,14 @@ trait HttpClient[F[_]] { * @param responseCodec Codec used to encode response header * @param sslExecutionContext Strategy used when communication with SSL (https or wss) * @param sslContext SSL Context to use with SSL Client (https, wss) + * @param blocker An execution context for blocking operations */ def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( requestCodec : Codec[HttpRequestHeader] , responseCodec : Codec[HttpResponseHeader] , sslExecutionContext: => ExecutionContext , sslContext : => SSLContext + , blocker : Blocker )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay { lazy val sslCtx = sslContext lazy val sslS = sslExecutionContext @@ -128,9 +130,7 @@ trait HttpClient[F[_]] { , timeout: Duration ): Stream[F, HttpResponse[F]] = { Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address => - val b: cats.effect.Blocker = ??? - - Stream.resource(new io.tcp.SocketGroup(AG, b).client[F](address)) + Stream.resource(new io.tcp.SocketGroup(AG, blocker).client[F](address)) .evalMap { socket => if (!request.isSecure) Applicative[F].pure(socket) else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host) diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index e44130a..d6aa96e 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -3,7 +3,7 @@ package spinoco.fs2.http import java.net.InetSocketAddress import java.nio.channels.AsynchronousChannelGroup -import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer} +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync, Timer} import cats.syntax.all._ import fs2._ import fs2.concurrent.SignallingRef @@ -34,6 +34,7 @@ object HttpServer { * This is also evaluated when the server failed to process the request itself (i.e. `service` did not handle the failure ) * @param sendFailure A function to be evaluated on failure to process the the response. * Request is not suplied if failure happened before request was constructed. + * @param blocker An execution context for blocking operations * */ def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( @@ -47,6 +48,7 @@ object HttpServer { , service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] , requestFailure : Throwable => Stream[F, HttpResponse[F]] , sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing] + , blocker: Blocker )( implicit AG: AsynchronousChannelGroup @@ -57,9 +59,7 @@ object HttpServer { case fin: FiniteDuration => (true, fin) case _ => (false, 0.millis) } - val b: cats.effect.Blocker = ??? - - new io.tcp.SocketGroup(AG, b).server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => + new io.tcp.SocketGroup(AG, blocker).server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => eval(SignallingRef(initial)).flatMap { timeoutSignal => readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize) diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 728d8b1..3113620 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -5,7 +5,7 @@ import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.Executors import javax.net.ssl.SSLContext -import cats.effect.{ConcurrentEffect, ContextShift, Timer} +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Timer} import fs2._ import scodec.Codec import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader} @@ -29,6 +29,7 @@ package object http { * @param requestHeaderReceiveTimeout A timeout to await request header to be fully received. * Request will fail, if the header won't be read within this timeout. * @param service Pipe that defines handling of each incoming request and produces a response + * @param blockingExecutionContext ExecutionContext for blocking operations */ def server[F[_] : ConcurrentEffect : ContextShift : Timer]( bindTo: InetSocketAddress @@ -38,6 +39,7 @@ package object http { , requestHeaderReceiveTimeout: Duration = 5.seconds , requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec + , blockingExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) )( service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] )(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer( @@ -51,6 +53,7 @@ package object http { , service = service , requestFailure = HttpServer.handleRequestParseError[F] _ , sendFailure = HttpServer.handleSendFailure[F] _ + , blocker = Blocker.liftExecutionContext(blockingExecutionContext) ) @@ -60,13 +63,16 @@ package object http { * @param requestCodec Codec used to decode request header * @param responseCodec Codec used to encode response header * @param sslStrategy Strategy used to perform blocking SSL operations + * @param blockingExecutionContext ExecutionContext for blocking operations */ def client[F[_]: ConcurrentEffect : ContextShift : Timer]( requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = - HttpClient(requestCodec, responseCodec, sslStrategy, sslContext) + , blockingExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) + )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = { + HttpClient(requestCodec, responseCodec, sslStrategy, sslContext, Blocker.liftExecutionContext(blockingExecutionContext)) + } } From a58ebc25efe6ffe661f84a92453bbce842163a92 Mon Sep 17 00:00:00 2001 From: marko asplund Date: Tue, 22 Oct 2019 20:09:33 +0300 Subject: [PATCH 4/8] test fixes --- src/test/scala/spinoco/fs2/http/HttpServerSpec.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala index 0880d70..5e575f3 100644 --- a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala @@ -1,8 +1,9 @@ package spinoco.fs2.http import java.net.InetSocketAddress +import java.util.concurrent.Executors -import cats.effect.IO +import cats.effect.{Blocker, IO} import fs2._ import org.scalacheck.Properties import org.scalacheck.Prop._ @@ -12,12 +13,14 @@ import spinoco.protocol.http.header.{`Content-Length`, `Content-Type`} import spinoco.protocol.mime.{ContentType, MediaType} import spinoco.protocol.http.{HttpRequestHeader, HttpStatusCode, Uri} +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ object HttpServerSpec extends Properties("HttpServer"){ import Resources._ val MaxConcurrency: Int = 10 + val blocker = Blocker.liftExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))) def echoService(request: HttpRequestHeader, body: Stream[IO,Byte]): Stream[IO,HttpResponse[IO]] = { if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] @@ -117,6 +120,7 @@ object HttpServerSpec extends Properties("HttpServer"){ , service = failRouteService , requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] } , sendFailure = HttpServer.handleSendFailure[IO] _ + , blocker = blocker ).drain ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) .take(count) @@ -150,6 +154,7 @@ object HttpServerSpec extends Properties("HttpServer"){ , service = failingResponse , requestFailure = HttpServer.handleRequestParseError[IO] _ , sendFailure = (_, _, _) => Stream.empty + , blocker = blocker ).drain ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) .take(count) From ac49305596846e44d6bded836e88ce67bdec2df7 Mon Sep 17 00:00:00 2001 From: marko asplund Date: Tue, 22 Oct 2019 20:36:28 +0300 Subject: [PATCH 5/8] pass Blocker also for websocket --- src/main/scala/spinoco/fs2/http/HttpClient.scala | 2 +- src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 04123b2..c899783 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -145,7 +145,7 @@ trait HttpClient[F[_]] { , chunkSize: Int , maxFrameSize: Int ): Stream[F, Option[HttpResponseHeader]] = - WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx) + WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx, blocker) def sse[A : SSEDecoder](rq: HttpRequest[F], maxResponseHeaderSize: Int, chunkSize: Int): Stream[F, A] = diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index cdcadd3..fd7b958 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -6,7 +6,7 @@ import java.util.concurrent.Executors import cats.Applicative import javax.net.ssl.SSLContext -import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer} +import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Timer} import fs2.Chunk.ByteVectorChunk import fs2._ import fs2.concurrent.Queue @@ -79,6 +79,7 @@ object WebSocket { * supplied value, websocket will fail. * @param requestCodec Codec to encode HttpRequests Header * @param responseCodec Codec to decode HttpResponse Header + * @param blocker An execution context for blocking operations * */ def client[F[_] : ConcurrentEffect : ContextShift : Timer, I : Decoder, O : Encoder]( @@ -91,13 +92,12 @@ object WebSocket { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } + , blocker: Blocker = Blocker.liftExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))) )(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = { import spinoco.fs2.http.internal._ import Stream._ eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address => - val b: cats.effect.Blocker = ??? - - Stream.resource(new io.tcp.SocketGroup(AG, b).client[F](address, receiveBufferSize = receiveBufferSize)) + Stream.resource(new io.tcp.SocketGroup(AG, blocker).client[F](address, receiveBufferSize = receiveBufferSize)) .evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) } .flatMap { socket => val (header, fingerprint) = impl.createRequestHeaders(request.header) From ddf92869a4a34b27a2c57ccc36c497fe59bf9076 Mon Sep 17 00:00:00 2001 From: marko asplund Date: Tue, 22 Oct 2019 20:47:13 +0300 Subject: [PATCH 6/8] tweaking --- src/main/scala/spinoco/fs2/http/http.scala | 12 ++++++------ src/main/scala/spinoco/fs2/http/util/util.scala | 6 ++++++ src/test/scala/spinoco/fs2/http/HttpServerSpec.scala | 9 +++------ 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 3113620..b062fca 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -29,7 +29,7 @@ package object http { * @param requestHeaderReceiveTimeout A timeout to await request header to be fully received. * Request will fail, if the header won't be read within this timeout. * @param service Pipe that defines handling of each incoming request and produces a response - * @param blockingExecutionContext ExecutionContext for blocking operations + * @param blocker An execution context for blocking operations */ def server[F[_] : ConcurrentEffect : ContextShift : Timer]( bindTo: InetSocketAddress @@ -39,7 +39,7 @@ package object http { , requestHeaderReceiveTimeout: Duration = 5.seconds , requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec - , blockingExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) + , blocker: Blocker = util.mkBlocker(2) )( service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] )(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer( @@ -53,7 +53,7 @@ package object http { , service = service , requestFailure = HttpServer.handleRequestParseError[F] _ , sendFailure = HttpServer.handleSendFailure[F] _ - , blocker = Blocker.liftExecutionContext(blockingExecutionContext) + , blocker = blocker ) @@ -63,16 +63,16 @@ package object http { * @param requestCodec Codec used to decode request header * @param responseCodec Codec used to encode response header * @param sslStrategy Strategy used to perform blocking SSL operations - * @param blockingExecutionContext ExecutionContext for blocking operations + * @param blocker An execution context for blocking operations */ def client[F[_]: ConcurrentEffect : ContextShift : Timer]( requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - , blockingExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) + , blocker: Blocker = util.mkBlocker(2) )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = { - HttpClient(requestCodec, responseCodec, sslStrategy, sslContext, Blocker.liftExecutionContext(blockingExecutionContext)) + HttpClient(requestCodec, responseCodec, sslStrategy, sslContext, blocker) } } diff --git a/src/main/scala/spinoco/fs2/http/util/util.scala b/src/main/scala/spinoco/fs2/http/util/util.scala index 3718fbd..bd51db0 100644 --- a/src/main/scala/spinoco/fs2/http/util/util.scala +++ b/src/main/scala/spinoco/fs2/http/util/util.scala @@ -4,6 +4,7 @@ import java.lang.Thread.UncaughtExceptionHandler import java.util.concurrent.{Executors, ThreadFactory} import java.util.concurrent.atomic.AtomicInteger +import cats.effect.Blocker import fs2.Chunk.ByteVectorChunk import fs2._ import scodec.bits.{BitVector, ByteVector} @@ -155,6 +156,11 @@ package object util { } } + def mkFixedExecutionContext(nThreads: Int) = + ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads)) + + def mkBlocker(nThreads: Int) = Blocker.liftExecutionContext(mkFixedExecutionContext(nThreads)) + def getCharset(ct: ContentType): Option[MIMECharset] = { ct match { case ContentType.TextContent(_, maybeCharset) => maybeCharset diff --git a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala index 5e575f3..d5713d6 100644 --- a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala @@ -1,9 +1,8 @@ package spinoco.fs2.http import java.net.InetSocketAddress -import java.util.concurrent.Executors -import cats.effect.{Blocker, IO} +import cats.effect.IO import fs2._ import org.scalacheck.Properties import org.scalacheck.Prop._ @@ -13,14 +12,12 @@ import spinoco.protocol.http.header.{`Content-Length`, `Content-Type`} import spinoco.protocol.mime.{ContentType, MediaType} import spinoco.protocol.http.{HttpRequestHeader, HttpStatusCode, Uri} -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ object HttpServerSpec extends Properties("HttpServer"){ import Resources._ val MaxConcurrency: Int = 10 - val blocker = Blocker.liftExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))) def echoService(request: HttpRequestHeader, body: Stream[IO,Byte]): Stream[IO,HttpResponse[IO]] = { if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] @@ -120,7 +117,7 @@ object HttpServerSpec extends Properties("HttpServer"){ , service = failRouteService , requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] } , sendFailure = HttpServer.handleSendFailure[IO] _ - , blocker = blocker + , blocker = util.mkBlocker(2) ).drain ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) .take(count) @@ -154,7 +151,7 @@ object HttpServerSpec extends Properties("HttpServer"){ , service = failingResponse , requestFailure = HttpServer.handleRequestParseError[IO] _ , sendFailure = (_, _, _) => Stream.empty - , blocker = blocker + , blocker = util.mkBlocker(2) ).drain ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) .take(count) From 8e44cb01c9ecdf63b3a52166b45be1b7fd83caf3 Mon Sep 17 00:00:00 2001 From: marko asplund Date: Tue, 22 Oct 2019 20:51:25 +0300 Subject: [PATCH 7/8] use util.mkBlocker in websocket --- src/main/scala/spinoco/fs2/http/HttpServer.scala | 1 + src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index d6aa96e..3ba9e51 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -59,6 +59,7 @@ object HttpServer { case fin: FiniteDuration => (true, fin) case _ => (false, 0.millis) } + new io.tcp.SocketGroup(AG, blocker).server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => eval(SignallingRef(initial)).flatMap { timeoutSignal => diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index fd7b958..330203b 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -92,7 +92,7 @@ object WebSocket { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - , blocker: Blocker = Blocker.liftExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))) + , blocker: Blocker = spinoco.fs2.http.util.mkBlocker(2) )(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = { import spinoco.fs2.http.internal._ import Stream._ From 2392ffd8f53e33fc645cca20fa383813b2f4d29e Mon Sep 17 00:00:00 2001 From: marko asplund Date: Sun, 3 Nov 2019 23:07:08 +0200 Subject: [PATCH 8/8] try to fix travis build by explicitly specifying build container linux distribution version --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 2d6873e..d828197 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,6 @@ +dist: trusty + language : scala scala: