diff --git a/.travis.yml b/.travis.yml index b92c9b8..ba1066f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,8 +2,8 @@ language : scala scala: - - 2.11.12 - - 2.12.6 + - 2.12.11 + - 2.13.0 dist: trusty diff --git a/build.sbt b/build.sbt index 2c03bcc..ceb0dfd 100644 --- a/build.sbt +++ b/build.sbt @@ -9,8 +9,8 @@ lazy val contributors = Seq( lazy val commonSettings = Seq( organization := "com.spinoco", - scalaVersion := "2.12.6", - crossScalaVersions := Seq("2.11.12", "2.12.6"), + scalaVersion := "2.12.11", + crossScalaVersions := Seq("2.12.11", "2.13.0"), scalacOptions ++= Seq( "-feature", "-deprecation", @@ -19,23 +19,37 @@ lazy val commonSettings = Seq( "-language:existentials", "-language:postfixOps", "-Xfatal-warnings", - "-Yno-adapted-args", - "-Ywarn-value-discard", - "-Ywarn-unused-import" + "-Ywarn-value-discard" ), + scalacOptions ++= { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, v)) if v >= 13 => + Seq("-Ymacro-annotations", "-Ywarn-unused:imports") + case _ => + Seq("-Yno-adapted-args", "-Ywarn-unused-import") + } + }, scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)}, scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value, libraryDependencies ++= Seq( - compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full) - , "org.scodec" %% "scodec-bits" % "1.1.4" - , "org.scodec" %% "scodec-core" % "1.10.3" - , "com.spinoco" %% "protocol-http" % "0.3.15" - , "com.spinoco" %% "protocol-websocket" % "0.3.15" - , "co.fs2" %% "fs2-core" % "1.0.0" - , "co.fs2" %% "fs2-io" % "1.0.0" - , "com.spinoco" %% "fs2-crypto" % "0.4.0" - , "org.scalacheck" %% "scalacheck" % "1.13.4" % "test" + "org.scalacheck" %% "scalacheck" % "1.14.3" % "test" + , "com.spinoco" %% "protocol-http" % "0.4.0-M1" + , "com.spinoco" %% "protocol-websocket" % "0.4.0-M1" + , "co.fs2" %% "fs2-core" % "2.3.0" + , "co.fs2" %% "fs2-io" % "2.3.0" ), + libraryDependencies ++= { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, v)) if v <= 12 => + Seq( + compilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full) + ) + case _ => + // if scala 2.13.0-M4 or later, macro annotations merged into scala-reflect + // https://github.com/scala/scala/pull/6606 + Nil + } + }, scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")), homepage := None, licenses += ("MIT", url("http://opensource.org/licenses/MIT")), diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 7b75ad9..c288288 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -1,14 +1,15 @@ package spinoco.fs2.http -import java.nio.channels.AsynchronousChannelGroup + +import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer} import java.util.concurrent.TimeUnit import cats.Applicative -import javax.net.ssl.SSLContext import cats.effect._ import fs2._ import fs2.concurrent.SignallingRef -import fs2.io.tcp.Socket +import fs2.io.tcp.{Socket, SocketGroup} +import fs2.io.tls.TLSContext import scodec.{Codec, Decoder, Encoder} import spinoco.fs2.http.internal.{addressForRequest, clientLiftToSecure, readWithTimeout} import spinoco.fs2.http.sse.{SSEDecoder, SSEEncoding} @@ -17,7 +18,6 @@ import spinoco.protocol.http.header._ import spinoco.protocol.mime.MediaType import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader} -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ @@ -101,25 +101,24 @@ trait HttpClient[F[_]] { } - object HttpClient { +object HttpClient { + @inline def apply[F[_]](implicit instance: HttpClient[F]): HttpClient[F] = instance - /** - * Creates an Http Client - * @param requestCodec Codec used to decode request header - * @param responseCodec Codec used to encode response header - * @param sslExecutionContext Strategy used when communication with SSL (https or wss) - * @param sslContext SSL Context to use with SSL Client (https, wss) - */ - def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( + /** + * Creates an Http Client + * @param requestCodec Codec used to decode request header + * @param responseCodec Codec used to encode response header + * @param socketGroup Group of sockets from which to create the client for http request. + * @param tlsContext The TLS context used for elevating the http socket to https. + */ + def mk[F[_]: ConcurrentEffect: ContextShift: Timer]( requestCodec : Codec[HttpRequestHeader] , responseCodec : Codec[HttpResponseHeader] - , sslExecutionContext: => ExecutionContext - , sslContext : => SSLContext - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay { - lazy val sslCtx = sslContext - lazy val sslS = sslExecutionContext - + )( + socketGroup: SocketGroup + , tlsContext: TLSContext + ):F[HttpClient[F]] = Applicative[F].pure { new HttpClient[F] { def request( request: HttpRequest[F] @@ -128,10 +127,10 @@ trait HttpClient[F[_]] { , timeout: Duration ): Stream[F, HttpResponse[F]] = { Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address => - Stream.resource(io.tcp.client[F](address)) - .evalMap { socket => - if (!request.isSecure) Applicative[F].pure(socket) - else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host) + Stream.resource(socketGroup.client(address)) + .flatMap { socket => + if (!request.isSecure) Stream.emit(socket) + else Stream.resource(clientLiftToSecure[F](tlsContext)(socket, request.host)) } .flatMap { impl.request[F](request, chunkSize, maxResponseHeaderSize, timeout, requestCodec, responseCodec ) }} } @@ -143,7 +142,7 @@ trait HttpClient[F[_]] { , chunkSize: Int , maxFrameSize: Int ): Stream[F, Option[HttpResponseHeader]] = - WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx) + WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec)(socketGroup, tlsContext) def sse[A : SSEDecoder](rq: HttpRequest[F], maxResponseHeaderSize: Int, chunkSize: Int): Stream[F, A] = @@ -174,7 +173,7 @@ trait HttpClient[F[_]] { timeout match { case fin: FiniteDuration => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start => - HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent => val remains = fin - (sent - start).millis @@ -186,14 +185,11 @@ trait HttpClient[F[_]] { }}}} case _ => - HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec) } } } - } - - } diff --git a/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala b/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala index 11066c4..122738e 100644 --- a/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala +++ b/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala @@ -22,7 +22,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** yields to true, if body of this request shall be chunked **/ lazy val bodyIsChunked : Boolean = - withHeaders(internal.bodyIsChunked) + withHeaders(spinoco.fs2.http.internal.bodyIsChunked) /** allows to stream arbitrary sized stream of `A` to remote party (i.e. upload) **/ def withStreamBody[A](body: Stream[F, A])(implicit E: StreamBodyEncoder[F, A]): Self = { @@ -37,7 +37,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** sets body size to supplied value **/ def withBodySize(sz: Long): Self = - updateHeaders(withHeaders(internal.swapHeader(`Content-Length`(sz)))) + updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Length`(sz)))) /** gets body size, if one specified **/ def bodySize: Option[Long] = @@ -46,6 +46,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => protected def body: Stream[F, Byte] /** encodes body `A` given BodyEncoder exists **/ + def withBody[A](a: A)(implicit W: BodyEncoder[A], ev: RaiseThrowable[F]): Self = { W.encode(a) match { case Failure(err) => updateBody(body = Stream.raiseError(new Throwable(s"failed to encode $a: $err"))) @@ -70,7 +71,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => withHeaders { _.collectFirst { case `Content-Type`(ct) => ct } match { case None => F.pure(Attempt.failure(Err("Content type is not known"))) case Some(ct) => - F.map(self.body.chunks.map(util.chunk2ByteVector).compile.toVector) { bs => + F.map(self.body.chunks.map(_.toByteVector).compile.toVector) { bs => if (bs.isEmpty) Attempt.failure(Err("Body is empty")) else D.decode(bs.reduce(_ ++ _), ct) } @@ -79,7 +80,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** gets body as stream of byteVectors **/ def bodyAsByteVectorStream:Stream[F,ByteVector] = - self.body.chunks.map(util.chunk2ByteVector) + self.body.chunks.map(_.toByteVector) /** decodes body as string with encoding supplied in ContentType **/ def bodyAsString(implicit F: Sync[F]): F[Attempt[String]] = @@ -87,7 +88,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** updates content type to one specified **/ def withContentType(ct: ContentType): Self = - updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ct)))) + updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ct)))) /** gets ContentType, if one specififed **/ def contentType: Option[ContentType] = @@ -96,7 +97,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** configures encoding as chunked **/ def chunkedEncoding: Self = - updateHeaders(withHeaders(internal.swapHeader(`Transfer-Encoding`(List("chunked"))))) + updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Transfer-Encoding`(List("chunked"))))) def withHeaders[A](f: List[HttpHeader] => A): A = self match { case HttpRequest(_,_,header,_) => f(header.headers) @@ -190,10 +191,10 @@ object HttpRequest { ) , body = Stream.empty) - def post[F[_] : RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = + def post[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = get(uri).withMethod(HttpMethod.POST).withBody(a) - def put[F[_] : RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = + def put[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = get(uri).withMethod(HttpMethod.PUT).withBody(a) def delete[F[_]](uri: Uri): HttpRequest[F] = @@ -210,11 +211,11 @@ object HttpRequest { * @tparam F * @return */ - def fromStream[F[_] : RaiseThrowable]( + def fromStream[F[_]: RaiseThrowable]( maxHeaderSize: Int , headerCodec: Codec[HttpRequestHeader] ): Pipe[F, Byte, (HttpRequestHeader, Stream[F, Byte])] = { - import internal._ + import spinoco.fs2.http.internal._ _ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) => headerCodec.decodeValue(header.bits) match { case Failure(err) => Stream.raiseError(new Throwable(s"Decoding of the request header failed: $err")) @@ -240,11 +241,11 @@ object HttpRequest { * @param request request to convert to stream * @param headerCodec Codec to convert the header to bytes */ - def toStream[F[_] : RaiseThrowable]( + def toStream[F[_]: RaiseThrowable]( request: HttpRequest[F] , headerCodec: Codec[HttpRequestHeader] ): Stream[F, Byte] = Stream.suspend { - import internal._ + import spinoco.fs2.http.internal._ headerCodec.encode(request.header) match { case Failure(err) => Stream.raiseError(new Throwable(s"Encoding of the header failed: $err")) @@ -283,7 +284,7 @@ final case class HttpResponse[F[_]]( def sseBody[A](in: Stream[F, A])(implicit E: SSEEncoder[A], ev: RaiseThrowable[F]): Self = self .updateBody(in through SSEEncoding.encodeA[F, A]) - .updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None))))) + .updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None))))) } @@ -301,11 +302,11 @@ object HttpResponse { /** * Decodes stream of bytes as HttpResponse. */ - def fromStream[F[_] : RaiseThrowable]( + def fromStream[F[_]: RaiseThrowable]( maxHeaderSize: Int , responseCodec: Codec[HttpResponseHeader] ): Pipe[F,Byte, HttpResponse[F]] = { - import internal._ + import spinoco.fs2.http.internal._ _ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) => responseCodec.decodeValue(header.bits) match { @@ -325,11 +326,11 @@ object HttpResponse { /** Encodes response to stream of bytes **/ - def toStream[F[_] : RaiseThrowable]( + def toStream[F[_]: RaiseThrowable]( response: HttpResponse[F] , headerCodec: Codec[HttpResponseHeader] ): Stream[F, Byte] = Stream.suspend { - import internal._ + import spinoco.fs2.http.internal._ headerCodec.encode(response.header) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode http response : $response :$err ")) diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index 11d9b27..b6c1e90 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -1,12 +1,12 @@ package spinoco.fs2.http import java.net.InetSocketAddress -import java.nio.channels.AsynchronousChannelGroup -import cats.effect.{ConcurrentEffect, Sync, Timer} +import cats.effect.{ConcurrentEffect, ContextShift, Sync} import cats.syntax.all._ import fs2._ import fs2.concurrent.SignallingRef +import fs2.io.tcp.SocketGroup import scodec.Codec import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec} import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader, HttpStatusCode} @@ -34,9 +34,9 @@ object HttpServer { * This is also evaluated when the server failed to process the request itself (i.e. `service` did not handle the failure ) * @param sendFailure A function to be evaluated on failure to process the the response. * Request is not suplied if failure happened before request was constructed. - * + * @param socketGroup Group of sockets from which to create the server socket. */ - def apply[F[_] : ConcurrentEffect : Timer]( + def mk[F[_]: ConcurrentEffect: ContextShift]( maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 , maxHeaderSize: Int = 10 *1024 @@ -48,19 +48,18 @@ object HttpServer { , requestFailure : Throwable => Stream[F, HttpResponse[F]] , sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing] )( - implicit - AG: AsynchronousChannelGroup + socketGroup: SocketGroup ): Stream[F, Unit] = { import Stream._ - import internal._ + import spinoco.fs2.http.internal._ val (initial, readDuration) = requestHeaderReceiveTimeout match { case fin: FiniteDuration => (true, fin) case _ => (false, 0.millis) } - io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => + socketGroup.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => - eval(SignallingRef(initial)).flatMap { timeoutSignal => + eval(SignallingRef[F, Boolean](initial)).flatMap { timeoutSignal => readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize) .through(HttpRequest.fromStream(maxHeaderSize, requestCodec)) .flatMap { case (request, body) => diff --git a/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala b/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala index 97916bd..7efe247 100644 --- a/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala +++ b/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala @@ -48,7 +48,7 @@ object StreamBodyEncoder { StreamBodyEncoder.instance(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) { _.flatMap { bv => Stream.chunk(ByteVectorChunk(bv)) } } /** encoder that encodes utf8 string, with `text/plain` utf8 content type **/ - def utf8StringEncoder[F[_]](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] = + def utf8StringEncoder[F[_]: RaiseThrowable](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] = byteVectorEncoder mapInF[String] { s => ByteVector.encodeUtf8(s) match { case Right(bv) => F.pure(bv) @@ -57,7 +57,7 @@ object StreamBodyEncoder { } withContentType ContentType.TextContent(MediaType.`text/plain`, Some(MIMECharset.`UTF-8`)) /** a convenience wrapper to convert body encoder to StreamBodyEncoder **/ - def fromBodyEncoder[F[_] : RaiseThrowable, A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] = + def fromBodyEncoder[F[_]: RaiseThrowable, A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] = StreamBodyEncoder.instance(E.contentType) { _.flatMap { a => E.encode(a) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode: $err ($a)")) diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 94624ec..a078a98 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -1,17 +1,14 @@ package spinoco.fs2 import java.net.InetSocketAddress -import java.nio.channels.AsynchronousChannelGroup -import java.util.concurrent.Executors - -import javax.net.ssl.SSLContext import cats.effect.{ConcurrentEffect, ContextShift, Timer} import fs2._ +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import scodec.Codec import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader} import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec} -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ @@ -29,8 +26,9 @@ package object http { * @param requestHeaderReceiveTimeout A timeout to await request header to be fully received. * Request will fail, if the header won't be read within this timeout. * @param service Pipe that defines handling of each incoming request and produces a response + * @param socketGroup Group of sockets from which to create the server socket. */ - def server[F[_] : ConcurrentEffect : Timer]( + def server[F[_] : ConcurrentEffect: ContextShift]( bindTo: InetSocketAddress , maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 @@ -40,7 +38,7 @@ package object http { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec )( service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] - )(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer( + )(socketGroup: SocketGroup):Stream[F,Unit] = HttpServer.mk( maxConcurrent = maxConcurrent , receiveBufferSize = receiveBufferSize , maxHeaderSize = maxHeaderSize @@ -51,7 +49,7 @@ package object http { , service = service , requestFailure = HttpServer.handleRequestParseError[F] _ , sendFailure = HttpServer.handleSendFailure[F] _ - ) + )(socketGroup) /** @@ -59,14 +57,17 @@ package object http { * * @param requestCodec Codec used to decode request header * @param responseCodec Codec used to encode response header - * @param sslStrategy Strategy used to perform blocking SSL operations + * @param socketGroup Group of sockets from which to create the client for http request. + * @param tlsContext The TLS context used for elevating the http socket to https. */ - def client[F[_]: ConcurrentEffect : ContextShift : Timer]( - requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec - , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec - , sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true))) - , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = - HttpClient(requestCodec, responseCodec, sslStrategy, sslContext) + def client[F[_]: ConcurrentEffect :Timer: ContextShift]( + requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec + , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec + )( + socketGroup: SocketGroup + , tlsContext: TLSContext + ):F[HttpClient[F]] = { + HttpClient.mk(requestCodec, responseCodec)(socketGroup, tlsContext) + } } diff --git a/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala b/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala index 102c31e..bed2e6c 100644 --- a/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala +++ b/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala @@ -4,7 +4,6 @@ import fs2.Chunk.ByteVectorChunk import fs2._ import scodec.bits.ByteVector -import spinoco.fs2.http.util.chunk2ByteVector /** * Created by pach on 20/01/17. @@ -15,14 +14,14 @@ object ChunkedEncoding { /** decodes from the HTTP chunked encoding. After last chunk this terminates. Allows to specify max header size, after which this terminates * Please see https://en.wikipedia.org/wiki/Chunked_transfer_encoding for details */ - def decode[F[_] : RaiseThrowable](maxChunkHeaderSize:Int): Pipe[F, Byte, Byte] = { + def decode[F[_]: RaiseThrowable](maxChunkHeaderSize: Int): Pipe[F, Byte, Byte] = { // on left reading the header of chunk (acting as buffer) // on right reading the chunk itself, and storing remaining bytes of the chunk def go(expect:Either[ByteVector,Long], in: Stream[F, Byte]): Pull[F, Byte, Unit] = { in.pull.uncons.flatMap { case None => Pull.done case Some((h, tl)) => - val bv = chunk2ByteVector(h) + val bv = h.toByteVector expect match { case Left(header) => val nh = header ++ bv @@ -65,14 +64,14 @@ object ChunkedEncoding { if (bv.isEmpty) Chunk.empty else ByteVectorChunk(ByteVector.view(bv.size.toHexString.toUpperCase.getBytes) ++ `\r\n` ++ bv ++ `\r\n` ) } - _.mapChunks { ch => encodeChunk(chunk2ByteVector(ch)) } ++ Stream.chunk(lastChunk) + _.mapChunks { ch => encodeChunk(ch.toByteVector) } ++ Stream.chunk(lastChunk) } /** yields to size of header in case the chunked header was succesfully parsed, else yields to None **/ private def readChunkedHeader(hdr:ByteVector):Option[Long] = { - hdr.decodeUtf8.right.toOption.flatMap { s => + hdr.decodeUtf8.toOption.flatMap { s => val parts = s.split(';') // lets ignore any extensions if (parts.isEmpty) None else { @@ -82,5 +81,4 @@ object ChunkedEncoding { } } - } diff --git a/src/main/scala/spinoco/fs2/http/internal/internal.scala b/src/main/scala/spinoco/fs2/http/internal/internal.scala index 76a31f7..238ea27 100644 --- a/src/main/scala/spinoco/fs2/http/internal/internal.scala +++ b/src/main/scala/spinoco/fs2/http/internal/internal.scala @@ -3,20 +3,17 @@ package spinoco.fs2.http import java.net.InetSocketAddress import java.util.concurrent.TimeoutException -import javax.net.ssl.SSLContext -import cats.effect.{Concurrent, Sync, Timer} -import javax.net.ssl.{SNIHostName, SNIServerName, SSLContext} -import cats.syntax.all._ +import javax.net.ssl.{SNIHostName, SNIServerName} +import cats.effect.{Concurrent, ContextShift, Resource, Sync} import fs2.Chunk.ByteVectorChunk import fs2.Stream._ import fs2.io.tcp.Socket +import fs2.io.tls.{TLSContext, TLSParameters} import fs2.{Stream, _} import scodec.bits.ByteVector -import spinoco.fs2.crypto.io.tcp.TLSSocket import spinoco.protocol.http.{HostPort, HttpScheme, Scheme} import spinoco.protocol.http.header.{HttpHeader, `Transfer-Encoding`} -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -46,13 +43,13 @@ package object internal { /** * From the stream of bytes this extracts Http Header and body part. */ - def httpHeaderAndBody[F[_] : RaiseThrowable](maxHeaderSize: Int): Pipe[F, Byte, (ByteVector, Stream[F, Byte])] = { + def httpHeaderAndBody[F[_]: RaiseThrowable](maxHeaderSize: Int): Pipe[F, Byte, (ByteVector, Stream[F, Byte])] = { def go(buff: ByteVector, in: Stream[F, Byte]): Pull[F, (ByteVector, Stream[F, Byte]), Unit] = { in.pull.uncons flatMap { case None => Pull.raiseError(new Throwable(s"Incomplete Header received (sz = ${buff.size}): ${buff.decodeUtf8}")) case Some((chunk, tl)) => - val bv = spinoco.fs2.http.util.chunk2ByteVector(chunk) + val bv = chunk.toByteVector val all = buff ++ bv val idx = all.indexOfSlice(`\r\n\r\n`) if (idx < 0) { @@ -97,7 +94,7 @@ package object internal { * @param shallTimeout If true, timeout will be applied, if false timeout won't be applied. * @param chunkSize Size of chunk to read up to */ - def readWithTimeout[F[_] : Sync]( + def readWithTimeout[F[_]: Sync]( socket: Socket[F] , timeout: FiniteDuration , shallTimeout: F[Boolean] @@ -107,7 +104,7 @@ package object internal { eval(shallTimeout).flatMap { shallTimeout => if (!shallTimeout) socket.reads(chunkSize, None) else { - if (remains <= 0.millis) Stream.raiseError(new TimeoutException()) + if (remains <= 0.millis) Stream.raiseError[F](new TimeoutException()) else { eval(Sync[F].delay(System.currentTimeMillis())).flatMap { start => eval(socket.read(chunkSize, Some(remains))).flatMap { read => @@ -124,19 +121,12 @@ package object internal { } /** creates a function that lifts supplied socket to secure socket **/ - def clientLiftToSecure[F[_] : Concurrent : Timer](sslES: => ExecutionContext, sslContext: => SSLContext)(socket: Socket[F], server: HostPort): F[Socket[F]] = { - import collection.JavaConverters._ - Sync[F].delay { - val engine = sslContext.createSSLEngine(server.host, server.port.getOrElse(443)) - engine.setUseClientMode(true) - val sslParams = engine.getSSLParameters - sslParams.setServerNames(List[SNIServerName](new SNIHostName(server.host)).asJava) - engine.setSSLParameters(sslParams) - engine - } flatMap { - TLSSocket.instance(socket, _, sslES) - .map(identity) //This is here just to make scala understand types properly - } + def clientLiftToSecure[F[_] : Concurrent : ContextShift](tlsContext: TLSContext)(socket: Socket[F], server: HostPort): Resource[F, Socket[F]] = { + + tlsContext.client( + socket + , TLSParameters(serverNames = Some(List[SNIServerName](new SNIHostName(server.host)))) + ) } } diff --git a/src/main/scala/spinoco/fs2/http/routing/routing.scala b/src/main/scala/spinoco/fs2/http/routing/routing.scala index 7303717..1520806 100644 --- a/src/main/scala/spinoco/fs2/http/routing/routing.scala +++ b/src/main/scala/spinoco/fs2/http/routing/routing.scala @@ -12,7 +12,6 @@ import spinoco.fs2.http.routing.MatchResult._ import spinoco.fs2.http.routing.Matcher.{Eval, Match} import spinoco.protocol.http.header._ import spinoco.protocol.http.{HttpMethod, HttpRequestHeader, HttpStatusCode, Uri} -import spinoco.fs2.http.util.chunk2ByteVector import spinoco.fs2.http.websocket.{Frame, WebSocket} import scala.concurrent.duration._ @@ -188,7 +187,7 @@ package object routing { F.map(s.chunks.compile.toVector) { chunks => val bytes = if (chunks.isEmpty) ByteVector.empty - else chunks.map(chunk2ByteVector).reduce(_ ++ _) + else chunks.map(_.toByteVector).reduce(_ ++ _) D.decode(bytes, ct.value) } }}.flatMap { diff --git a/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala b/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala index 608cf35..d086c5c 100644 --- a/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala +++ b/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala @@ -5,7 +5,6 @@ import fs2._ import scodec.Attempt import scodec.bits.ByteVector -import spinoco.fs2.http.util.chunk2ByteVector import scala.util.Try import scala.concurrent.duration._ @@ -35,7 +34,7 @@ object SSEEncoding { } /** encodes stream of `A` as SSE Stream **/ - def encodeA[F[_] : RaiseThrowable, A](implicit E: SSEEncoder[A]): Pipe[F, A, Byte] = { + def encodeA[F[_]: RaiseThrowable, A](implicit E: SSEEncoder[A]): Pipe[F, A, Byte] = { _ flatMap { a => E.encode(a) match { case Attempt.Successful(msg) => Stream.emit(msg) case Attempt.Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode $a : $err")) @@ -47,14 +46,14 @@ object SSEEncoding { /** * Decodes stream of bytes to SSE Messages */ - def decode[F[_] : RaiseThrowable]: Pipe[F, Byte, SSEMessage] = { + def decode[F[_]: RaiseThrowable]: Pipe[F, Byte, SSEMessage] = { // drops initial Byte Order Mark, if present def dropInitial(buff:ByteVector): Pipe[F, Byte, Byte] = { _.pull.uncons.flatMap { case None => Pull.raiseError(new Throwable("SSE Socket did not contain any data")) case Some((chunk, next)) => - val all = buff ++ chunk2ByteVector(chunk) + val all = buff ++ chunk.toByteVector if (all.size < 2) (next through dropInitial(all)).pull.echo else { if (all.startsWith(StartBom)) Pull.output(ByteVectorChunk(all.drop(2))) >> next.pull.echo @@ -132,7 +131,7 @@ object SSEEncoding { } /** decodes stream of sse messages to `A`, given supplied decoder **/ - def decodeA[F[_] : RaiseThrowable, A](implicit D: SSEDecoder[A]): Pipe[F, Byte, A] = { + def decodeA[F[_]: RaiseThrowable, A](implicit D: SSEDecoder[A]): Pipe[F, Byte, A] = { _ through decode flatMap { msg => D.decode(msg) match { case Attempt.Successful(a) => Stream.emit(a) diff --git a/src/main/scala/spinoco/fs2/http/util/util.scala b/src/main/scala/spinoco/fs2/http/util/util.scala index 3718fbd..9e57bcf 100644 --- a/src/main/scala/spinoco/fs2/http/util/util.scala +++ b/src/main/scala/spinoco/fs2/http/util/util.scala @@ -4,132 +4,12 @@ import java.lang.Thread.UncaughtExceptionHandler import java.util.concurrent.{Executors, ThreadFactory} import java.util.concurrent.atomic.AtomicInteger -import fs2.Chunk.ByteVectorChunk -import fs2._ -import scodec.bits.{BitVector, ByteVector} -import scodec.bits.Bases.{Alphabets, Base64Alphabet} - import spinoco.protocol.mime.{ContentType, MIMECharset} import scala.concurrent.ExecutionContext import scala.util.control.NonFatal package object util { - - /** - * Encodes bytes to base64 encoded bytes [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]] - * Encoding is done lazily to support very large Base64 bodies i.e. email, attachments..) - * @param alphabet Alphabet to use - * @return - */ - def encodeBase64Raw[F[_]](alphabet:Base64Alphabet): Pipe[F, Byte, Byte] = { - def go(rem:ByteVector): Stream[F,Byte] => Pull[F, Byte, Unit] = { - _.pull.uncons flatMap { - case None => - if (rem.size == 0) Pull.done - else Pull.output(ByteVectorChunk(ByteVector.view(rem.toBase64(alphabet).getBytes))) - - case Some((chunk, tl)) => - val n = rem ++ chunk2ByteVector(chunk) - if (n.size/3 > 0) { - val pad = n.size % 3 - val enc = n.dropRight(pad) - val out = Array.ofDim[Byte]((enc.size.toInt / 3) * 4) - var pos = 0 - enc.toBitVector.grouped(6) foreach { group => - val idx = group.padTo(8).shiftRight(2, signExtension = false).toByteVector.head - out(pos) = alphabet.toChar(idx).toByte - pos = pos + 1 - } - Pull.output(ByteVectorChunk(ByteVector.view(out))) >> go(n.takeRight(pad))(tl) - } else { - go(n)(tl) - } - - } - - } - src => go(ByteVector.empty)(src).stream - } - - /** encodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/ - def encodeBase64Url[F[_]]:Pipe[F, Byte, Byte] = - encodeBase64Raw(Alphabets.Base64Url) - - /** encodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/ - def encodeBase64[F[_]]:Pipe[F, Byte, Byte] = - encodeBase64Raw[F](Alphabets.Base64) - - - /** - * Decodes base64 encoded stream with supplied alphabet. Whitespaces are ignored. - * Decoding is lazy to support very large Base64 bodies (i.e. email) - */ - def decodeBase64Raw[F[_] : RaiseThrowable](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = { - val Pad = alphabet.pad - def go(remAcc:BitVector): Stream[F, Byte] => Pull[F, Byte, Unit] = { - _.pull.uncons flatMap { - case None => Pull.done - - case Some((chunk,tl)) => - val bv = chunk2ByteVector(chunk) - var acc = remAcc - var idx = 0 - var term = false - try { - bv.foreach { b => - b.toChar match { - case c if alphabet.ignore(c) => // ignore no-op - case Pad => term = true - case c => - if (!term) acc = acc ++ BitVector(alphabet.toIndex(c)).drop(2) - else { - throw new IllegalArgumentException(s"Unexpected character '$c' at index $idx after padding character; only '=' and whitespace characters allowed after first padding character") - } - } - idx = idx + 1 - } - val aligned = (acc.size / 8) * 8 - if (aligned <= 0 && !term) go(acc)(tl) - else { - val (out, rem) = acc.splitAt(aligned) - if (term) Pull.output(ByteVectorChunk(out.toByteVector)) - else Pull.output(ByteVectorChunk(out.toByteVector)) >> go(rem)(tl) - } - - } catch { - case e: IllegalArgumentException => - Pull.raiseError(new Throwable(s"Invalid base 64 encoding at index $idx", e)) - } - } - } - src => go(BitVector.empty)(src).stream - - } - - /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/ - def decodeBase64Url[F[_] : RaiseThrowable]:Pipe[F, Byte, Byte] = - decodeBase64Raw(Alphabets.Base64Url) - - /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/ - def decodeBase64[F[_] : RaiseThrowable]:Pipe[F, Byte, Byte] = - decodeBase64Raw(Alphabets.Base64) - - /** converts chunk of bytes to ByteVector **/ - def chunk2ByteVector(chunk: Chunk[Byte]):ByteVector = { - chunk match { - case bv: ByteVectorChunk => bv.toByteVector - case other => - val bs = other.toBytes - ByteVector(bs.values, bs.offset, bs.size) - } - } - - /** converts ByteVector to chunk **/ - def byteVector2Chunk(bv: ByteVector): Chunk[Byte] = { - ByteVectorChunk(bv) - } - /** helper to create named daemon thread factories **/ def mkThreadFactory(name: String, daemon: Boolean, exitJvmOnFatalError: Boolean = true): ThreadFactory = { new ThreadFactory { diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index 38dfaec..e780da0 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -1,15 +1,12 @@ package spinoco.fs2.http.websocket -import java.nio.channels.AsynchronousChannelGroup -import java.util.concurrent.Executors - -import cats.Applicative -import javax.net.ssl.SSLContext import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer} import fs2.Chunk.ByteVectorChunk import fs2._ import fs2.concurrent.Queue +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import scodec.Attempt.{Failure, Successful} import scodec.bits.ByteVector import scodec.{Codec, Decoder, Encoder} @@ -21,9 +18,6 @@ import spinoco.protocol.http.header.value.ProductDescription import spinoco.protocol.mime.{ContentType, MIMECharset, MediaType} import spinoco.protocol.websocket.{OpCode, WebSocketFrame} import spinoco.protocol.websocket.codec.WebSocketFrameCodec -import spinoco.fs2.http.util.chunk2ByteVector - -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.Random @@ -52,7 +46,7 @@ object WebSocket { , maxFrameSize: Int = 1024*1024 )(header: HttpRequestHeader, input:Stream[F,Byte]): Stream[F,HttpResponse[F]] = { Stream.emit( - impl.verifyHeaderRequest[F](header).right.map { key => + impl.verifyHeaderRequest[F](header).map { key => val respHeader = impl.computeHandshakeResponse(header, key) HttpResponse(respHeader, input through impl.webSocketOf(pipe, pingInterval, maxFrameSize, client2Server = false)) }.merge @@ -81,7 +75,7 @@ object WebSocket { * @param responseCodec Codec to decode HttpResponse Header * */ - def client[F[_] : ConcurrentEffect : ContextShift : Timer, I : Decoder, O : Encoder]( + def client[F[_]: ConcurrentEffect: ContextShift: Timer, I: Decoder, O: Encoder]( request: WebSocketRequest , pipe: Pipe[F, Frame[I], Frame[O]] , maxHeaderSize: Int = 4096 @@ -89,23 +83,24 @@ object WebSocket { , maxFrameSize: Int = 1024*1024 , requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec - , sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true))) - , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = { + )( + socketGroup: SocketGroup + , tlsContext: TLSContext + ): Stream[F, Option[HttpResponseHeader]] = { import spinoco.fs2.http.internal._ import Stream._ eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address => - Stream.resource(io.tcp.client[F](address, receiveBufferSize = receiveBufferSize)) - .evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) } + Stream.resource(socketGroup.client[F](address, receiveBufferSize = receiveBufferSize)) + .flatMap { socket => if (request.secure) Stream.resource(clientLiftToSecure(tlsContext)(socket, request.hostPort)) else Stream.emit(socket) } .flatMap { socket => val (header, fingerprint) = impl.createRequestHeaders(request.header) requestCodec.encode(header) match { - case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode websocket request: $err")) + case Failure(err) => Stream.raiseError[F](new Throwable(s"Failed to encode websocket request: $err")) case Successful(headerBits) => eval(socket.write(ByteVectorChunk(headerBits.bytes ++ `\r\n\r\n`))).flatMap { _ => socket.reads(receiveBufferSize) through httpHeaderAndBody(maxHeaderSize) flatMap { case (respHeaderBytes, body) => responseCodec.decodeValue(respHeaderBytes.bits) match { - case Failure(err) => raiseError(new Throwable(s"Failed to decode websocket response: $err")) + case Failure(err) => Stream.raiseError[F](new Throwable(s"Failed to decode websocket response: $err")) case Successful(responseHeader) => impl.validateResponse[F](header, responseHeader, fingerprint).flatMap { case Some(resp) => emit(Some(resp)) @@ -169,11 +164,11 @@ object WebSocket { }.getOrElse(Left(badRequest("Missing Sec-WebSocket-Key header"))) for { - _ <- version.right - _ <- host.right - _ <- upgrade.right - _ <- connection.right - key <- webSocketKey.right + _ <- version + _ <- host + _ <- upgrade + _ <- connection + key <- webSocketKey } yield key } @@ -269,7 +264,7 @@ object WebSocket { * * @param maxFrameSize Maximum size of the frame, including its header. */ - def decodeWebSocketFrame[F[_] : RaiseThrowable](maxFrameSize: Int , flag: Boolean): Pipe[F, Byte, WebSocketFrame] = { + def decodeWebSocketFrame[F[_]: RaiseThrowable](maxFrameSize: Int , flag: Boolean): Pipe[F, Byte, WebSocketFrame] = { // Returns list of raw frames and tail of // the buffer. Tail of the buffer cant be empty // (or non-empty if last one frame isn't finalized). @@ -286,7 +281,7 @@ object WebSocket { case None => Pull.done // todo: is ok to silently ignore buffer remainder ? case Some((chunk, tl)) => - val data = buff ++ chunk2ByteVector(chunk) + val data = buff ++ chunk.toByteVector cutFrames(data) match { case (rawFrames, _) if rawFrames.isEmpty => go(data)(tl) case (rawFrames, dataTail) => @@ -316,7 +311,7 @@ object WebSocket { * * @param pongQ Queue to notify about ping/pong frames. */ - def webSocketFrame2Frame[F[_] : RaiseThrowable, A](pongQ: Queue[F, PingPong])(implicit R: Decoder[A]): Pipe[F, WebSocketFrame, Frame[A]] = { + def webSocketFrame2Frame[F[_]: RaiseThrowable, A](pongQ: Queue[F, PingPong])(implicit R: Decoder[A]): Pipe[F, WebSocketFrame, Frame[A]] = { def decode(from: Vector[WebSocketFrame]):Pull[F, Frame[A], A] = { val bs = from.map(_.payload).reduce(_ ++ _) R.decodeValue(bs.bits) match { @@ -347,7 +342,7 @@ object WebSocket { * Encodes received frome to WebSocketFrame. * @param maskKey A funtion that allows to generate random masking key. Masking is applied at client -> server direction only. */ - def frame2WebSocketFrame[F[_] : RaiseThrowable, A](maskKey: => Option[Int])(implicit W: Encoder[A]): Pipe[F, Frame[A], WebSocketFrame] = { + def frame2WebSocketFrame[F[_]: RaiseThrowable, A](maskKey: => Option[Int])(implicit W: Encoder[A]): Pipe[F, Frame[A], WebSocketFrame] = { _.flatMap { frame => W.encode(frame.a) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode frame: $err (frame: $frame)")) @@ -368,7 +363,7 @@ object WebSocket { * @tparam F * @return */ - def encodeWebSocketFrame[F[_] : RaiseThrowable](flag: Boolean): Pipe[F, WebSocketFrame, Byte] = { + def encodeWebSocketFrame[F[_]: RaiseThrowable](flag: Boolean): Pipe[F, WebSocketFrame, Byte] = { _.append(Stream.emit(closeFrame)).flatMap { wsf => WebSocketFrameCodec.codec.encode(wsf) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode websocket frame: $err (frame: $wsf)")) @@ -454,7 +449,7 @@ object WebSocket { * @param expectFingerPrint expected fingerprint in header * @return */ - def validateResponse[F[_] : RaiseThrowable]( + def validateResponse[F[_]: RaiseThrowable]( request: HttpRequestHeader , response: HttpResponseHeader , expectFingerPrint: ByteVector diff --git a/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala b/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala index 0732d92..c428d1b 100644 --- a/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala @@ -11,7 +11,6 @@ import spinoco.protocol.mime.{ContentType, MIMECharset, MediaType} object HttpRequestSpec extends Properties("HttpRequest") { - import spinoco.fs2.http.util.chunk2ByteVector property("encode") = secure { @@ -22,7 +21,7 @@ object HttpRequestSpec extends Properties("HttpRequest") { HttpRequest.toStream(request, HttpRequestHeaderCodec.defaultCodec) - .chunks.compile.toVector.map { _.map(chunk2ByteVector).reduce { _ ++ _ }.decodeUtf8 } + .chunks.compile.toVector.map { _.map(_.toByteVector).reduce { _ ++ _ }.decodeUtf8 } .unsafeRunSync() ?= Right(Seq( "GET /hello-world.html HTTP/1.1" @@ -49,7 +48,7 @@ object HttpRequestSpec extends Properties("HttpRequest") { .covary[IO] .through(HttpRequest.fromStream[IO](4096,HttpRequestHeaderCodec.defaultCodec)) .flatMap { case (header, body) => - Stream.eval(body.chunks.compile.toVector.map(_.map(chunk2ByteVector).reduce(_ ++ _).decodeUtf8)).map { bodyString => + Stream.eval(body.chunks.compile.toVector.map(_.map(_.toByteVector).reduce(_ ++ _).decodeUtf8)).map { bodyString => header -> bodyString } }.compile.toVector.unsafeRunSync() ?= Vector( diff --git a/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala b/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala index 6a278b1..2bdbfc2 100644 --- a/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala @@ -8,7 +8,6 @@ import scodec.Attempt import spinoco.protocol.http.header._ import spinoco.protocol.http.codec.HttpResponseHeaderCodec import spinoco.protocol.http.{HttpResponseHeader, HttpStatusCode} -import spinoco.fs2.http.util.chunk2ByteVector import spinoco.protocol.mime.{ContentType, MIMECharset, MediaType} @@ -21,7 +20,7 @@ object HttpResponseSpec extends Properties("HttpResponse") { .withUtf8Body("Hello World") HttpResponse.toStream(response, HttpResponseHeaderCodec.defaultCodec) - .chunks.compile.toVector.map { _.map(chunk2ByteVector).reduce { _ ++ _ }.decodeUtf8 } + .chunks.compile.toVector.map { _.map(_.toByteVector).reduce { _ ++ _ }.decodeUtf8 } .unsafeRunSync() ?= Right(Seq( "HTTP/1.1 200 OK" diff --git a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala index 0880d70..b6606c2 100644 --- a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala @@ -4,6 +4,8 @@ import java.net.InetSocketAddress import cats.effect.IO import fs2._ +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import org.scalacheck.Properties import org.scalacheck.Prop._ import spinoco.fs2.http @@ -23,7 +25,7 @@ object HttpServerSpec extends Properties("HttpServer"){ if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] else { val ct = request.headers.collectFirst { case `Content-Type`(ct0) => ct0 }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) - val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l) + val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L) val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size) Stream.emit(ok.copy(body = body.take(size))) @@ -43,23 +45,23 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests and then make sure all of them pass within timeout val count = 100 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext) : Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).map(resp => idx -> (resp.header.status == HttpStatusCode.Ok)) }} } - (Stream( - http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService).drain - ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients) - .parJoin(MaxConcurrency) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) - + Stream.resource(httpResources).flatMap { case (group, tls) => + (Stream( + http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService)(group).drain + ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients(group, tls)) + .parJoin[IO, (Int, Boolean)](MaxConcurrency) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) } @@ -68,12 +70,12 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests with body, and then make sure all of them pass within timeout with body echoed back val count = 100 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext): Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) .withBody("Hello")(BodyEncoder.utf8String, RaiseThrowable.fromApplicativeError[IO]) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).flatMap { resp => Stream.eval(resp.bodyAsString).map { attempt => @@ -84,13 +86,14 @@ object HttpServerSpec extends Properties("HttpServer"){ }} } - ( Stream.sleep_[IO](3.second) ++ - (Stream( - http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService).drain - ).covary[IO] ++ Stream.sleep_[IO](3.second) ++ clients).parJoin(MaxConcurrency)) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(60.seconds).map { _.size } ?= Some(count) + Stream.resource(httpResources).flatMap { case (group, tls) => + ( Stream.sleep_[IO](3.second) ++ + (Stream( + http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService)(group).drain + ).covary[IO] ++ Stream.sleep_[IO](3.second) ++ clients(group, tls)).parJoin[IO, (Int, Boolean)](MaxConcurrency)) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(60.seconds).map { _.size } ?= Some(count) } @@ -99,29 +102,30 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests with body, server shall fail each, nevertheless response shall be delivered. val count = 1 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext): Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).map { resp => idx -> (resp.header.status == HttpStatusCode.BadRequest) }}} } - (Stream.sleep_[IO](3.second) ++ - (Stream( - HttpServer[IO]( - bindTo = new InetSocketAddress("127.0.0.1", 9090) - , service = failRouteService - , requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] } - , sendFailure = HttpServer.handleSendFailure[IO] _ - ).drain - ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) + Stream.resource(httpResources).flatMap { case (group, tls) => + (Stream.sleep_[IO](3.second) ++ + (Stream( + HttpServer.mk[IO]( + bindTo = new InetSocketAddress("127.0.0.1", 9090) + , service = failRouteService + , requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] } + , sendFailure = HttpServer.handleSendFailure[IO] _ + )(group).drain + ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients(group, tls)).parJoin[IO, (Int, Boolean)](MaxConcurrency)) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) } @@ -130,11 +134,11 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests with body, server shall fail each (when sending body), nevertheless response shall be delivered. val count = 100 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext): Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).map { resp => idx -> (resp.header.status == HttpStatusCode.Ok) // body won't be consumed, and request was succesfully sent @@ -143,18 +147,19 @@ object HttpServerSpec extends Properties("HttpServer"){ } } - (Stream.sleep_[IO](3.second) ++ - (Stream( - HttpServer[IO]( - bindTo = new InetSocketAddress("127.0.0.1", 9090) - , service = failingResponse - , requestFailure = HttpServer.handleRequestParseError[IO] _ - , sendFailure = (_, _, _) => Stream.empty - ).drain - ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) + Stream.resource(httpResources).flatMap { case (group, tls) => + (Stream.sleep_[IO](3.second) ++ + (Stream( + HttpServer.mk[IO]( + bindTo = new InetSocketAddress("127.0.0.1", 9090) + , service = failingResponse + , requestFailure = HttpServer.handleRequestParseError[IO] _ + , sendFailure = (_, _, _) => Stream.empty + )(group).drain + ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients(group, tls)).parJoin[IO, (Int, Boolean)](MaxConcurrency)) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) } diff --git a/src/test/scala/spinoco/fs2/http/Resources.scala b/src/test/scala/spinoco/fs2/http/Resources.scala index 744f677..83b6e04 100644 --- a/src/test/scala/spinoco/fs2/http/Resources.scala +++ b/src/test/scala/spinoco/fs2/http/Resources.scala @@ -3,17 +3,26 @@ package spinoco.fs2.http import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.Executors -import cats.effect.{Concurrent, ContextShift, IO, Timer} +import cats.effect.{Blocker, Concurrent, ContextShift, IO, Resource, Timer} +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import scala.concurrent.ExecutionContext object Resources { - implicit val _cxs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - implicit val _timer: Timer[IO] = IO.timer(ExecutionContext.Implicits.global) - implicit val _concurrent: Concurrent[IO] = IO.ioConcurrentEffect(_cxs) + implicit val _timer: Timer[IO] =IO.timer(ExecutionContext.Implicits.global) + implicit val _contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) + implicit val _concurrent: Concurrent[IO] = IO.ioConcurrentEffect(_contextShift) implicit val AG: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-spec-AG", daemon = true))) + val httpResources: Resource[IO, (SocketGroup, TLSContext)] = { + Blocker[IO].flatMap { blocker => + SocketGroup(blocker).evalMap { group => + TLSContext.system(blocker).map(group -> _) + } + } + } } diff --git a/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala b/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala index 3ca1457..8ae2b03 100644 --- a/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala +++ b/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala @@ -5,7 +5,6 @@ import fs2._ import org.scalacheck.Properties import org.scalacheck.Prop._ import scodec.bits.ByteVector -import spinoco.fs2.http.util.chunk2ByteVector object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { @@ -16,7 +15,7 @@ object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { (in through ChunkedEncoding.encode through ChunkedEncoding.decode(1024)) .chunks .compile.toVector - .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ chunk2ByteVector(n) }) + .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ n.toByteVector }) .map(_.decodeUtf8) .unsafeRunSync() ?= Right( strings.mkString @@ -42,7 +41,7 @@ object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { .covary[IO] .chunks .compile.toVector - .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ chunk2ByteVector(n) }) + .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ n.toByteVector }) .map(_.decodeUtf8) .unsafeRunSync() ?= Right( "Wikipedia in\r\n\r\nchunks." @@ -62,7 +61,7 @@ object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { (chunks through ChunkedEncoding.encode) .chunks .compile.toVector - .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ chunk2ByteVector(n) }) + .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ n.toByteVector }) .map(_.decodeUtf8) .unsafeRunSync() ?= Right(wikiExample) } diff --git a/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala b/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala index 5170f75..dcbdd19 100644 --- a/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala +++ b/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala @@ -13,7 +13,8 @@ object HttpClientApp extends App { - http.client[IO]().flatMap { httpClient => + httpResources.use { case (group, tls) => + http.client[IO]()(group, tls).flatMap { httpClient => httpClient.request(HttpRequest.get(Uri.https("www.google.cz", "/"))).flatMap { resp => Stream.eval(resp.bodyAsString) @@ -21,5 +22,5 @@ object HttpClientApp extends App { println } - }.unsafeRunSync() + }}.unsafeRunSync() } diff --git a/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala b/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala index 1c9399a..566a38f 100644 --- a/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala +++ b/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala @@ -19,13 +19,15 @@ object HttpServerApp extends App { if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] else { val ct = request.headers.collectFirst { case `Content-Type`(ct) => ct }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) - val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l) + val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L) val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size) Stream.emit(ok.copy(body = body.take(size))) } } - http.server(new InetSocketAddress("127.0.0.1", 9090))(service).compile.drain.unsafeRunSync() + Stream.resource(httpResources).flatMap { case (group, _) => + http.server(new InetSocketAddress("127.0.0.1", 9090))(service)(group) + }.compile.drain.unsafeRunSync() } diff --git a/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala b/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala index 2a42c6a..fe0de28 100644 --- a/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala +++ b/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala @@ -8,7 +8,6 @@ import org.scalacheck.Prop._ import scodec.bits.ByteVector import spinoco.fs2.http.sse.SSEMessage.SSEData -import spinoco.fs2.http.util.chunk2ByteVector object SSEEncodingSpec extends Properties("SSEEncoding") { @@ -20,7 +19,7 @@ object SSEEncodingSpec extends Properties("SSEEncoding") { , SSEMessage.SSEData(Seq("data4"), Some("event1"), None) , SSEMessage.SSEData(Seq("data5"), None, Some("id1")) , SSEMessage.SSEData(Seq("data6"), Some("event2"), Some("id2")) - ).covary[IO].through(SSEEncoding.encode[IO]).chunks.compile.toVector.map { _ map chunk2ByteVector reduce (_ ++ _) decodeUtf8 }.unsafeRunSync() ?= + ).covary[IO].through(SSEEncoding.encode[IO]).chunks.compile.toVector.map { _ map(_.toByteVector) reduce (_ ++ _) decodeUtf8 }.unsafeRunSync() ?= Right( "data: data1\n\ndata: data2\ndata: data3\n\nevent: event1\ndata: data4\n\ndata: data5\nid: id1\n\nevent: event2\ndata: data6\nid: id2\n\n" ) diff --git a/src/test/scala/spinoco/fs2/http/util/UtilSpec.scala b/src/test/scala/spinoco/fs2/http/util/UtilSpec.scala deleted file mode 100644 index c4cdb00..0000000 --- a/src/test/scala/spinoco/fs2/http/util/UtilSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -package spinoco.fs2.http.util - -import cats.effect.IO -import fs2._ -import org.scalacheck.Prop._ -import org.scalacheck.{Arbitrary, Gen, Properties} -import scodec.bits.Bases.{Alphabets, Base64Alphabet} -import scodec.bits.ByteVector -import shapeless.the -import spinoco.fs2.http.util - - - -object UtilSpec extends Properties("util"){ - - case class EncodingSample(chunkSize:Int, text:String, alphabet: Base64Alphabet) - - implicit val encodingTestInstance : Arbitrary[EncodingSample] = Arbitrary { - for { - s <- the[Arbitrary[String]].arbitrary - chunkSize <- Gen.choose(1,s.length max 1) - alphabet <- Gen.oneOf(Seq(Alphabets.Base64Url, Alphabets.Base64)) - } yield EncodingSample(chunkSize, s, alphabet) - } - - property("encodes.base64") = forAll { sample: EncodingSample => - Stream.chunk[IO, Byte](Chunk.bytes(sample.text.getBytes)).chunkLimit(sample.chunkSize).flatMap(Stream.chunk[IO, Byte]) - .through(util.encodeBase64Raw(sample.alphabet)) - .chunks - .fold(ByteVector.empty){ case (acc, n) => acc ++ chunk2ByteVector(n)} - .map(_.decodeUtf8) - .compile.toVector.unsafeRunSync() ?= Vector( - Right(ByteVector.view(sample.text.getBytes).toBase64(sample.alphabet)) - ) - } - - - property("decodes.base64") = forAll { sample: EncodingSample => - val encoded = ByteVector.view(sample.text.getBytes).toBase64(sample.alphabet) - Stream.chunk[IO, Byte](Chunk.bytes(encoded.getBytes)) - .chunkLimit(sample.chunkSize).flatMap(Stream.chunk[IO, Byte]) - .through(util.decodeBase64Raw(sample.alphabet)) - .chunks - .fold(ByteVector.empty){ case (acc, n) => acc ++ chunk2ByteVector(n)} - .map(_.decodeUtf8) - .compile.toVector.unsafeRunSync() ?= Vector( - Right(sample.text) - ) - } - - property("encodes.decodes.base64") = forAll { sample: EncodingSample => - val r = - Stream.chunk[IO, Byte](Chunk.bytes(sample.text.getBytes)).covary[IO].chunkLimit(sample.chunkSize).flatMap(Stream.chunk[IO, Byte]) - .through(util.encodeBase64Raw(sample.alphabet)) - .through(util.decodeBase64Raw(sample.alphabet)) - .chunks - .fold(ByteVector.empty){ case (acc, n) => acc ++ chunk2ByteVector(n)} - .compile.toVector.unsafeRunSync() - - - - r ?= Vector(ByteVector.view(sample.text.getBytes)) - - } - - - -} diff --git a/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala b/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala index ffbea25..43aba9a 100644 --- a/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala +++ b/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala @@ -22,11 +22,13 @@ object WebSocketClientApp extends App { implicit val codecString: Codec[String] = utf8 - WebSocket.client( - WebSocketRequest.ws("echo.websocket.org", "/", QueryParameter.single("encoding", "text")) - , wspipe - ).map { x => - println(("RESULT OF WS", x)) + Stream.resource(httpResources).flatMap { case (group, tls) => + WebSocket.client( + WebSocketRequest.ws("echo.websocket.org", "/", QueryParameter.single("encoding", "text")) + , wspipe + )(group, tls).map { x => + println(("RESULT OF WS", x)) + } }.compile.drain.unsafeRunSync() } diff --git a/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala b/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala index bd542b1..25572be 100644 --- a/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala +++ b/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala @@ -4,6 +4,8 @@ import java.net.InetSocketAddress import cats.effect.IO import fs2._ +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import org.scalacheck.{Gen, Prop, Properties} import org.scalacheck.Prop._ import scodec.Codec @@ -43,24 +45,27 @@ object WebSocketSpec extends Properties("WebSocket") { output merge inbound.take(5).evalMap { in => IO { received = received :+ in }}.drain } - val serverStream = + def serverStream(group: SocketGroup) = http.server[IO](new InetSocketAddress("127.0.0.1", 9090))( server ( pipe = serverEcho , pingInterval = 500.millis , handshakeTimeout = 10.seconds ) - ) + )(group) - val clientStream = + def clientStream(group: SocketGroup, tls: TLSContext) = Stream.sleep_[IO](3.seconds) ++ WebSocket.client( WebSocketRequest.ws("127.0.0.1", 9090, "/") , clientData - ) + )(group, tls) - val resultClient = - (serverStream.drain mergeHaltBoth clientStream).compile.toVector.unsafeRunTimed(20.seconds) + val resultClient = { + Stream.resource(httpResources).flatMap { case (group, tls) => + (serverStream(group).drain mergeHaltBoth clientStream(group, tls)) + }.compile.toVector.unsafeRunTimed(20.seconds) + } (resultClient ?= Some(Vector(None))) && (received.size ?= 5)