From fee180ae6897e59b4ee2c99896f67cd9ddde05d1 Mon Sep 17 00:00:00 2001 From: wedens Date: Tue, 4 Jul 2017 21:23:23 +0700 Subject: [PATCH] Use `maxUnanswered` parameter in ws pings --- .../scala/spinoco/fs2/http/websocket/WebSocket.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index 28da3cf..9b663b1 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -229,7 +229,7 @@ object WebSocket { case fin: FiniteDuration => time.awakeEvery[F](fin).map { _ => () } case inf => Stream.empty } - val control = controlStream(pingPongQ.dequeue, metronome, maxUnanswered = 3, flag = client2Server) + val control = controlStream(pingPongQ.dequeue, metronome, maxUnanswered = 3) source .through(decodeWebSocketFrame[F](maxFrameSize, client2Server)) @@ -402,7 +402,6 @@ object WebSocket { pingPongs: Stream[F, PingPong] , metronome: Stream[F, Unit] , maxUnanswered: Int - , flag: Boolean )(implicit F: Async[F]): Stream[F, WebSocketFrame] = { (pingPongs either metronome) .mapAccumulate(0) { case (pingsSent, in) => in match { @@ -410,9 +409,9 @@ object WebSocket { case Left(PingPong.Ping) => (pingsSent, Stream.emit(pongFrame)) case Right(_) => (pingsSent + 1, Stream.emit(pingFrame)) }} - .flatMap { case (unconfirmed, out) => - if (unconfirmed < 3) out - else Stream.fail(new Throwable(s"Maximum number of unconfirmed pings exceeded: $unconfirmed")) + .flatMap { case (unanswered, out) => + if (unanswered < maxUnanswered) out + else Stream.fail(new Throwable(s"Maximum number of unanswered pings exceeded: $unanswered")) } }