Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void bindingExample() throws Exception {
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Http.get(system).newServerAt("localhost", 8080).connectionSource();

CompletionStage<ServerBinding> serverBindingFuture =
serverSource
Expand All @@ -73,7 +73,7 @@ public static void bindingFailureExample() throws Exception {
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 80));
Http.get(system).newServerAt("localhost", 80).connectionSource();

CompletionStage<ServerBinding> serverBindingFuture =
serverSource
Expand Down Expand Up @@ -101,7 +101,7 @@ public static void connectionSourceFailureExample() throws Exception {
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Http.get(system).newServerAt("localhost", 8080).connectionSource();

Flow<IncomingConnection, IncomingConnection, NotUsed> failureDetection =
Flow.of(IncomingConnection.class)
Expand Down Expand Up @@ -137,7 +137,7 @@ public static void connectionStreamFailureExample() throws Exception {
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Http.get(system).newServerAt("localhost", 8080).connectionSource();

Flow<HttpRequest, HttpRequest, NotUsed> failureDetection =
Flow.of(HttpRequest.class)
Expand Down Expand Up @@ -186,7 +186,7 @@ public static void fullServerExample() throws Exception {
final Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Http.get(system).newServerAt("localhost", 8080).connectionSource();

// #request-handler
final Function<HttpRequest, HttpResponse> requestHandler =
Expand Down Expand Up @@ -379,10 +379,9 @@ public static void gracefulTerminationExample() throws Exception {

CompletionStage<ServerBinding> binding =
Http.get(system)
.bindAndHandle(
Directives.complete("Hello world!").flow(system, materializer),
ConnectHttp.toHost("localhost", 8080),
materializer);
.newServerAt("localhost", 8080)
.bindFlow(
Directives.complete("Hello world!").flow(system, materializer));

ServerBinding serverBinding = binding.toCompletableFuture().get(3, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,8 @@
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.stream.Materializer;

@SuppressWarnings("deprecation")
public class PekkoHttp1020MigrationExample {
public static void main(String[] args) {
{
// #old-binding
// only worked with classic actor system
org.apache.pekko.actor.ActorSystem system =
org.apache.pekko.actor.ActorSystem.create("TheSystem");
Materializer mat = Materializer.createMaterializer(system);
Route route = get(() -> complete("Hello World!"));
Http.get(system)
.bindAndHandle(route.flow(system), ConnectHttp.toHost("localhost", 8080), mat);
// #old-binding
}

{
// #new-binding
// works with classic or typed actor system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,10 @@ package docs.http.scaladsl.server
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Route
import scala.annotation.nowarn

@nowarn("msg=is deprecated")
class PekkoHttp1020MigrationSpec {
import org.apache.pekko.http.scaladsl.server.Directives._

{
// #old-binding
// only worked with classic actor system
implicit val system = org.apache.pekko.actor.ActorSystem("TheSystem")
val route: Route =
get {
complete("Hello world")
}
Http().bindAndHandle(route, "localhost", 8080)
// #old-binding
}

{
// #new-binding
// works with both classic and typed ActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import pekko.Done

import javax.net.ssl.SSLEngine
import scala.annotation.nowarn

Check warning on line 53 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.12, 17)

Unused import

Check warning on line 53 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 17)

Unused import

Check warning on line 53 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala

View workflow job for this annotation

GitHub Actions / validate-links

Unused import
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -224,7 +224,7 @@
engine.setUseClientMode(false)
Http2AlpnSupport.enableForServer(engine, setChosenProtocol)
}
@nowarn("msg=deprecated") // TODO find an alternative way to do this
// TODO find an alternative way to do this
val tls = TLS(() => createEngine(), _ => Success(()), IgnoreComplete)

ProtocolSwitch(_ => getChosenProtocol(), http1, http2).join(
Expand All @@ -234,7 +234,7 @@
def outgoingConnection(host: String, port: Int, connectionContext: HttpsConnectionContext,
clientConnectionSettings: ClientConnectionSettings, log: LoggingAdapter)
: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
@nowarn("msg=deprecated") // TODO find an alternative way to do this
// TODO find an alternative way to do this
def createEngine(): SSLEngine = {
val engine = connectionContext.engineCreator(Some((host, port)))
engine.setUseClientMode(true)
Expand Down
140 changes: 1 addition & 139 deletions http-core/src/main/scala/org/apache/pekko/http/javadsl/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import pekko.japi.Pair
import pekko.stream.TLSProtocol._
import pekko.stream.Materializer
import pekko.stream.javadsl.{ BidiFlow, Flow, Source }
import pekko.stream.javadsl.{ BidiFlow, Flow }
import pekko.stream.scaladsl.Keep
import pekko.util.FutureConverters._

Expand Down Expand Up @@ -100,144 +100,6 @@
*/
def newServerAt(interface: String, port: Int): ServerBuilder = ServerBuilder(interface, port, system)

/**
* Creates a [[pekko.stream.javadsl.Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*
* @deprecated since Akka HTTP 10.2.0: Use Http.get(system).newServerAt(interface, port).connectionSource() instead
*/
@Deprecated
@deprecated("Use newServerAt instead", since = "Akka HTTP 10.2.0")
def bind(connect: ConnectHttp): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).asJava))
}

/**
* Creates a [[pekko.stream.javadsl.Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*
* @deprecated since Akka HTTP 10.2.0: Use Http.get(system).newServerAt(interface, port).withSettings(settings).connectionSource() instead
*/
@Deprecated
@deprecated("Use newServerAt instead", since = "Akka HTTP 10.2.0")
def bind(
connect: ConnectHttp,
settings: ServerSettings): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, settings = settings.asScala,
connectionContext = connectionContext)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).asJava))
}

/**
* Creates a [[pekko.stream.javadsl.Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*
* @deprecated since Akka HTTP 10.2.0: Use Http.get(system).newServerAt(interface, port).withSettings(settings).logTo(log).connectionSource() instead
*/
@Deprecated
@deprecated("Use newServerAt instead", since = "Akka HTTP 10.2.0")
def bind(
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext, settings.asScala, log)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).asJava))
}

/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[pekko.stream.javadsl.Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `pekko.http.server.max-connections` setting. Please see the documentation in the reference.conf for more
* information about what kind of guarantees to expect.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*
* @deprecated since Akka HTTP 10.2.0: Use Http.get(system).newServerAt(interface, port).bindFlow(handler) instead.
*/
@Deprecated
@deprecated("Use newServerAt instead", since = "Akka HTTP 10.2.0")
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, _],
connect: ConnectHttp,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandle(
handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
connect.host, connect.port, connectionContext)(materializer)
.map(new ServerBinding(_))(ec).asJava
}

/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[pekko.stream.javadsl.Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `pekko.http.server.max-connections` setting. Please see the documentation in the reference.conf for more
* information about what kind of guarantees to expect.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*
* @deprecated since Akka HTTP 10.2.0: Use Http.get(system).newServerAt(interface, port).withSettings(settings).logTo(log).bindFlow(handler) instead.
*/
@Deprecated
@deprecated("Use newServerAt instead", since = "Akka HTTP 10.2.0")
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, _],
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandle(
handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
.map(new ServerBinding(_))(ec).asJava
}

/**
* Constructs a client layer stage using the configured default [[pekko.http.javadsl.settings.ClientConnectionSettings]].
*/
Expand Down Expand Up @@ -369,7 +231,7 @@
case https: HttpsConnectionContext =>
delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
case _ =>

Check warning on line 234 in http-core/src/main/scala/org/apache/pekko/http/javadsl/Http.scala

View workflow job for this annotation

GitHub Actions / Compile and test (3.3, 17)

Unreachable case except for null (if this is intentional, consider writing case null => instead).
delegate.newHostConnectionPool[T](to.host, to.port, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
}
Expand Down
58 changes: 13 additions & 45 deletions http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,10 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
* To configure additional settings for a server started using this method,
* use the `pekko.http.server` config section or pass in a [[pekko.http.scaladsl.settings.ServerSettings]] explicitly.
*/
@deprecated(
"Use Http().newServerAt(...)...connectionSource() to create a source that can be materialized to a binding.",
since = "Akka HTTP 10.2.0")
def bind(interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log): Source[Http.IncomingConnection, Future[ServerBinding]] = {
private[http] def bindImpl(interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
log: LoggingAdapter): Source[Http.IncomingConnection, Future[ServerBinding]] = {
if (settings.previewServerSettings.enableHttp2)
log.warning(
s"Binding with a connection source not supported with HTTP/2. Falling back to HTTP/1.1 for port [$port]")
Expand All @@ -212,14 +209,6 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
}
}

// forwarder to allow internal code to call deprecated method without warning
@nowarn("msg=deprecated")
private[http] def bindImpl(interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
log: LoggingAdapter): Source[Http.IncomingConnection, Future[ServerBinding]] =
bind(interface, port, connectionContext, settings, log)

/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[pekko.stream.scaladsl.Flow]] for processing all incoming connections.
Expand All @@ -231,13 +220,12 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
* To configure additional settings for a server started using this method,
* use the `pekko.http.server` config section or pass in a [[pekko.http.scaladsl.settings.ServerSettings]] explicitly.
*/
@deprecated("Use Http().newServerAt(...)...bindFlow() to create server bindings.", since = "Akka HTTP 10.2.0")
def bindAndHandle(
private[http] def bindAndHandleImpl(
handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer = systemMaterializer): Future[ServerBinding] = {
interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = {
if (settings.previewServerSettings.enableHttp2)
log.warning(
s"Binding with a connection source not supported with HTTP/2. Falling back to HTTP/1.1 for port [$port].")
Expand Down Expand Up @@ -292,19 +280,9 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
}

// forwarder to allow internal code to call deprecated method without warning
@nowarn("msg=deprecated")
private[http] def bindAndHandleImpl(
handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] =
bindAndHandle(handler, interface, port, connectionContext, settings, log)(fm)

private def bindAndHandleAsync(
private[http] def bindAndHandleAsyncImpl(
handler: HttpRequest => Future[HttpResponse],
interface: String,
port: Int,
interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
parallelism: Int,
Expand All @@ -327,16 +305,6 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
}
}

// forwarder to allow internal code to call deprecated method without warning
private[http] def bindAndHandleAsyncImpl(
handler: HttpRequest => Future[HttpResponse],
interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
parallelism: Int,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] =
bindAndHandleAsync(handler, interface, port, connectionContext, settings, parallelism, log)(fm)

type ServerLayer = Http.ServerLayer

/**
Expand Down Expand Up @@ -794,8 +762,8 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme

private[http] def sslTlsServerStage(connectionContext: ConnectionContext) =
sslTlsStage(connectionContext, Server, None)

@nowarn("msg=deprecated") // TODO find an alternative way to do this
// TODO find an alternative way to do this
private def sslTlsStage(connectionContext: ConnectionContext, role: TLSRole, hostInfo: Option[(String, Int)]) =
connectionContext match {
case hctx: HttpsConnectionContext =>
Expand Down
Loading
Loading