From 6c11369d65c2918c7b76c78524836d3de4ffb3fc Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Wed, 1 Feb 2023 22:52:39 -0500 Subject: [PATCH 1/2] Revert #50 --- .../http4s/servlet/AsyncHttp4sServlet.scala | 2 +- .../servlet/BlockingHttp4sServlet.scala | 2 +- .../org/http4s/servlet/Http4sServlet.scala | 2 +- .../scala/org/http4s/servlet/ServletIo.scala | 154 ++---------------- 4 files changed, 13 insertions(+), 147 deletions(-) diff --git a/servlet/src/main/scala/org/http4s/servlet/AsyncHttp4sServlet.scala b/servlet/src/main/scala/org/http4s/servlet/AsyncHttp4sServlet.scala index baa78880..a726b135 100644 --- a/servlet/src/main/scala/org/http4s/servlet/AsyncHttp4sServlet.scala +++ b/servlet/src/main/scala/org/http4s/servlet/AsyncHttp4sServlet.scala @@ -61,7 +61,7 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0. val ctx = servletRequest.startAsync() ctx.setTimeout(asyncTimeoutMillis) // Must be done on the container thread for Tomcat's sake when using async I/O. - val bodyWriter = servletIo.bodyWriter(servletResponse, dispatcher) _ + val bodyWriter = servletIo.initWriter(servletResponse) val result = F .attempt( toRequest(servletRequest).fold( diff --git a/servlet/src/main/scala/org/http4s/servlet/BlockingHttp4sServlet.scala b/servlet/src/main/scala/org/http4s/servlet/BlockingHttp4sServlet.scala index 266d4ee0..3685fcae 100644 --- a/servlet/src/main/scala/org/http4s/servlet/BlockingHttp4sServlet.scala +++ b/servlet/src/main/scala/org/http4s/servlet/BlockingHttp4sServlet.scala @@ -58,7 +58,7 @@ class BlockingHttp4sServlet[F[_]] private ( ): Unit = { val result = F .defer { - val bodyWriter = servletIo.bodyWriter(servletResponse, dispatcher) _ + val bodyWriter = servletIo.initWriter(servletResponse) val render = toRequest(servletRequest).fold( onParseFailure(_, servletResponse, bodyWriter), diff --git a/servlet/src/main/scala/org/http4s/servlet/Http4sServlet.scala b/servlet/src/main/scala/org/http4s/servlet/Http4sServlet.scala index c463c63c..c6893266 100644 --- a/servlet/src/main/scala/org/http4s/servlet/Http4sServlet.scala +++ b/servlet/src/main/scala/org/http4s/servlet/Http4sServlet.scala @@ -161,7 +161,7 @@ abstract class Http4sServlet[F[_]]( uri = uri, httpVersion = version, headers = toHeaders(req), - body = servletIo.requestBody(req, dispatcher), + body = servletIo.reader(req), attributes = attributes, ) diff --git a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala index 6d179e35..8d88431c 100644 --- a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala +++ b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala @@ -19,19 +19,16 @@ package servlet import cats.effect._ import cats.effect.std.Dispatcher -import cats.effect.std.Queue import cats.syntax.all._ import fs2._ import org.http4s.internal.bug import org.log4s.getLogger -import java.util.Arrays import java.util.concurrent.atomic.AtomicReference import javax.servlet.ReadListener import javax.servlet.WriteListener import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletResponse -import scala.annotation.nowarn import scala.annotation.tailrec /** Determines the mode of I/O used for reading request bodies and writing response bodies. @@ -39,10 +36,13 @@ import scala.annotation.tailrec sealed abstract class ServletIo[F[_]: Async] { protected[servlet] val F: Async[F] = Async[F] - @deprecated("Prefer requestBody, which has access to a Dispatcher", "0.23.12") protected[servlet] def reader(servletRequest: HttpServletRequest): EntityBody[F] - @nowarn("cat=deprecation") + /** An alias for [[reader]]. In the future, this will be optimized with + * the dispatcher. + * + * @dispatcher currently ignored + */ def requestBody( servletRequest: HttpServletRequest, dispatcher: Dispatcher[F], @@ -52,10 +52,13 @@ sealed abstract class ServletIo[F[_]: Async] { } /** May install a listener on the servlet response. */ - @deprecated("Prefer bodyWriter, which has access to a Dispatcher", "0.23.12") protected[servlet] def initWriter(servletResponse: HttpServletResponse): BodyWriter[F] - @nowarn("cat=deprecation") + /** An alias for [[initWriter]]. In the future, this will be + * optimized with the dispatcher. + * + * @dispatcher currently ignored + */ def bodyWriter(servletResponse: HttpServletResponse, dispatcher: Dispatcher[F])( response: Response[F] ): F[Unit] = { @@ -206,73 +209,6 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl } } - /* The queue implementation is influenced by ideas in jetty4s - * https://github.com/IndiscriminateCoding/jetty4s/blob/0.0.10/server/src/main/scala/jetty4s/server/HttpResourceHandler.scala - */ - override def requestBody( - servletRequest: HttpServletRequest, - dispatcher: Dispatcher[F], - ): Stream[F, Byte] = { - sealed trait Read - final case class Bytes(chunk: Chunk[Byte]) extends Read - case object End extends Read - final case class Error(t: Throwable) extends Read - - Stream.eval(F.delay(servletRequest.getInputStream)).flatMap { in => - Stream.eval(Queue.bounded[F, Read](4)).flatMap { q => - val readBody = Stream.exec(F.delay(in.setReadListener(new ReadListener { - var buf: Array[Byte] = _ - unsafeReplaceBuffer() - - def unsafeReplaceBuffer() = - buf = new Array[Byte](chunkSize) - - def onDataAvailable(): Unit = { - def loopIfReady = - F.delay(in.isReady()).flatMap { - case true => go - case false => F.unit - } - - def go: F[Unit] = - F.delay(in.read(buf)).flatMap { - case len if len == chunkSize => - // We used the whole buffer. Replace it new before next read. - q.offer(Bytes(Chunk.array(buf))) >> F.delay(unsafeReplaceBuffer()) >> loopIfReady - case len if len >= 0 => - // Got a partial chunk. Copy it, and reuse the current buffer. - q.offer(Bytes(Chunk.array(Arrays.copyOf(buf, len)))) >> loopIfReady - case _ => - F.unit - } - - unsafeRunAndForget(go) - } - - def onAllDataRead(): Unit = - unsafeRunAndForget(q.offer(End)) - - def onError(t: Throwable): Unit = - unsafeRunAndForget(q.offer(Error(t))) - - def unsafeRunAndForget[A](fa: F[A]): Unit = - dispatcher.unsafeRunAndForget( - fa.onError { case t => F.delay(logger.error(t)("Error in servlet read listener")) } - ) - }))) - - def pullBody: Pull[F, Byte, Unit] = - Pull.eval(q.take).flatMap { - case Bytes(chunk) => Pull.output(chunk) >> pullBody - case End => Pull.done - case Error(t) => Pull.raiseError[F](t) - } - - pullBody.stream.concurrently(readBody) - } - } - } - override protected[servlet] def initWriter( servletResponse: HttpServletResponse ): BodyWriter[F] = { @@ -367,74 +303,4 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl .drain } } - - /* The queue implementation is influenced by ideas in jetty4s - * https://github.com/IndiscriminateCoding/jetty4s/blob/0.0.10/server/src/main/scala/jetty4s/server/HttpResourceHandler.scala - */ - override def bodyWriter( - servletResponse: HttpServletResponse, - dispatcher: Dispatcher[F], - )(response: Response[F]): F[Unit] = { - sealed trait Write - final case class Bytes(chunk: Chunk[Byte]) extends Write - case object End extends Write - case object Init extends Write - - val autoFlush = response.isChunked - - F.delay(servletResponse.getOutputStream).flatMap { out => - Queue.bounded[F, Write](4).flatMap { q => - Deferred[F, Either[Throwable, Unit]].flatMap { done => - val writeBody = F.delay(out.setWriteListener(new WriteListener { - def onWritePossible(): Unit = { - def loopIfReady = F.delay(out.isReady()).flatMap { - case true => go - case false => F.unit - } - - def flush = - if (autoFlush) { - F.delay(out.isReady()).flatMap { - case true => F.delay(out.flush()) >> loopIfReady - case false => F.unit - } - } else - loopIfReady - - def go: F[Unit] = - q.take.flatMap { - case Bytes(slice: Chunk.ArraySlice[_]) => - F.delay( - out.write(slice.values.asInstanceOf[Array[Byte]], slice.offset, slice.length) - ) >> flush - case Bytes(chunk) => - F.delay(out.write(chunk.toArray)) >> flush - case End => - F.delay(out.flush()) >> done.complete(Either.unit).attempt.void - case Init => - if (autoFlush) flush else go - } - - unsafeRunAndForget(go) - } - def onError(t: Throwable): Unit = - unsafeRunAndForget(done.complete(Left(t))) - - def unsafeRunAndForget[A](fa: F[A]): Unit = - dispatcher.unsafeRunAndForget( - fa.onError { case t => F.delay(logger.error(t)("Error in servlet write listener")) } - ) - })) - - val writes = Stream.emit(Init) ++ response.body.chunks.map(Bytes(_)) ++ Stream.emit(End) - - Stream - .eval(writeBody >> done.get.rethrow) - .mergeHaltL(writes.foreach(q.offer)) - .compile - .drain - } - } - } - } } From 71ff55074f085ef85fb99b64150cf632a8eab6b8 Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 23:34:21 -0400 Subject: [PATCH 2/2] Colonel Mustard in the Library with the Invalid Scaladoc --- servlet/src/main/scala/org/http4s/servlet/ServletIo.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala index 8d88431c..0ed72287 100644 --- a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala +++ b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala @@ -41,7 +41,7 @@ sealed abstract class ServletIo[F[_]: Async] { /** An alias for [[reader]]. In the future, this will be optimized with * the dispatcher. * - * @dispatcher currently ignored + * @param dispatcher currently ignored */ def requestBody( servletRequest: HttpServletRequest, @@ -57,7 +57,7 @@ sealed abstract class ServletIo[F[_]: Async] { /** An alias for [[initWriter]]. In the future, this will be * optimized with the dispatcher. * - * @dispatcher currently ignored + * @param dispatcher currently ignored */ def bodyWriter(servletResponse: HttpServletResponse, dispatcher: Dispatcher[F])( response: Response[F]