Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ INTEG_API_INVOKE := RestApiUrl HttpApiUrl
INTEG_EXTENSIONS := extension-fn extension-trait logs-trait
# Using musl to run extensions on both AL1 and AL2
INTEG_ARCH := x86_64-unknown-linux-musl
RIE_MAX_CONCURRENCY ?= 4

define uppercase
$(shell sed -r 's/(^|-)(\w)/\U\2/g' <<< $(1))
Expand Down Expand Up @@ -111,4 +112,8 @@ fmt:
cargo +nightly fmt --all

test-rie:
./scripts/test-rie.sh $(EXAMPLE)
./scripts/test-rie.sh $(EXAMPLE)

# Run RIE in Lambda Managed Instance (LMI) mode with concurrent polling.
test-rie-lmi:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: is this makefile target + configurable variable worth mentioning in the README?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thank you. I added a short note in the README under RIE testing showing make test-rie-lmi EXAMPLE=basic-lambda-concurrent

RIE_MAX_CONCURRENCY=$(RIE_MAX_CONCURRENCY) ./scripts/test-rie.sh $(EXAMPLE)
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@ make test-rie EXAMPLE=basic-sqs
make test-rie EXAMPLE=http-basic-lambda
```

To test Lambda Managed Instances (concurrent polling), use:

```bash
make test-rie-lmi EXAMPLE=basic-lambda-concurrent
```

This command will:
1. Build a Docker image with Rust toolchain and RIE
2. Compile the specified example inside the Linux container
Expand Down
9 changes: 9 additions & 0 deletions examples/basic-lambda-concurrent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "basic-lambda-concurrent"
version = "0.1.0"
edition = "2021"

[dependencies]
lambda_runtime = { path = "../../lambda-runtime" }
serde = "1.0.219"
tokio = { version = "1", features = ["macros"] }
74 changes: 74 additions & 0 deletions examples/basic-lambda-concurrent/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// This example requires the following input to succeed:
// { "command": "do something" }

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use serde::{Deserialize, Serialize};

/// This is also a made-up example. Requests come into the runtime as unicode
/// strings in json format, which can map to any structure that implements `serde::Deserialize`
/// The runtime pays no attention to the contents of the request payload.
#[derive(Deserialize)]
struct Request {
command: String,
}

/// This is a made-up example of what a response structure may look like.
/// There is no restriction on what it can be. The runtime requires responses
/// to be serialized into json. The runtime pays no attention
/// to the contents of the response payload.
#[derive(Serialize)]
struct Response {
req_id: String,
msg: String,
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
tracing::init_default_subscriber();

let func = service_fn(my_handler);
if let Err(err) = lambda_runtime::run_concurrent(func).await {
tracing::error!(error = %err, "run error");
return Err(err);
}
Ok(())
}

pub(crate) async fn my_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// extract some useful info from the request
let command = event.payload.command;

// prepare the response
let resp = Response {
req_id: event.context.request_id,
msg: format!("Command {command} executed."),
};

// return `Response` (it will be serialized to JSON automatically by the runtime)
Ok(resp)
}

#[cfg(test)]
mod tests {
use crate::{my_handler, Request};
use lambda_runtime::{Context, LambdaEvent};

#[tokio::test]
async fn response_is_good_for_simple_input() {
let id = "ID";

let mut context = Context::default();
context.request_id = id.to_string();

let payload = Request {
command: "X".to_string(),
};
let event = LambdaEvent { payload, context };

let result = my_handler(event).await.unwrap();

assert_eq!(result.msg, "Command X executed.");
assert_eq!(result.req_id, id.to_string());
}
}
5 changes: 4 additions & 1 deletion examples/basic-lambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();

let func = service_fn(my_handler);
lambda_runtime::run(func).await?;
if let Err(err) = lambda_runtime::run(func).await {
eprintln!("run error: {:?}", err);
return Err(err);
}
Ok(())
}

Expand Down
55 changes: 51 additions & 4 deletions lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
pub use streaming::{run_with_streaming_response, StreamAdapter};
pub use streaming::{run_with_streaming_response, run_with_streaming_response_concurrent, StreamAdapter};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down Expand Up @@ -151,6 +151,18 @@ pub struct Adapter<'a, R, S> {
_phantom_data: PhantomData<&'a R>,
}

impl<'a, R, S> Clone for Adapter<'a, R, S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
_phantom_data: PhantomData,
}
}
}

impl<'a, R, S, E> From<S> for Adapter<'a, R, S>
where
S: Service<Request, Response = R, Error = E>,
Expand Down Expand Up @@ -180,9 +192,11 @@ where
}

fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {
let request_origin = req.payload.request_origin();
let event: Request = req.payload.into();
let fut = Box::pin(self.service.call(event.with_lambda_context(req.context)));
let LambdaEvent { payload, context } = req;
let request_origin = payload.request_origin();
let mut event: Request = payload.into();
update_xray_trace_id_header_from_context(event.headers_mut(), &context);
let fut = Box::pin(self.service.call(event.with_lambda_context(context)));

TransformResponse::Request(request_origin, fut)
}
Expand All @@ -193,6 +207,12 @@ where
///
/// This takes care of transforming the LambdaEvent into a [`Request`] and then
/// converting the result into a `LambdaResponse`.
///
/// # Managed concurrency
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
/// prefer [`run_concurrent`], which honors managed concurrency and falls back to
/// sequential behavior when unset.
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = R, Error = E>,
Expand All @@ -203,6 +223,33 @@ where
lambda_runtime::run(Adapter::from(handler)).await
}

/// Starts the Lambda Rust runtime in a mode that is compatible with
/// Lambda Managed Instances (concurrent invocations).
///
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
/// `/next` polling loop. When the environment variable is unset or `<= 1`,
/// it falls back to the same sequential behavior as [`run`], so the same
/// handler can run on both classic Lambda and Lambda Managed Instances.
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Question, not sure): If the fallback case if AWS_LAMBDA_MAX_CONCURRENCY if unset is just lambda_runtime::run(), should this be our new suggested entrypoint, with fn run() marked as deprecated? I guess the main issue would be the Clone bounds are more restrictive, so maybe deprecation is a bit aggressive, but we could add that in the run() doc comments?

The regular run() should definitely also add a warning that it WON'T respect AWS_LAMBDA_MAX_CONCURRENCY.

Context: My suspicion is, most consumers don't mind the Clone bound, and the 'principal of least surprise' would be for it to respect the ENV. So we probably should be pointing people to prefer this method where possible.

I would suggest at minimum the following:

  • probably this should be named run_maybe_concurrent? Or maybe run_extended to avoid a combinatorial explosion of new entrypoint names as featureset grows that needs Clone?
  • We should have a guard clause in lambda_runtime::run() that throws either either crashes the lambda runtime (preferred), or emits error logs (more conservative), if AWS_LAMBDA_MAX_CONCURRENCY is set, since that is clearly a misconfiguration that would prove confusing to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added a doc warning on run() and a fail‑fast guard when AWS_LAMBDA_MAX_CONCURRENCY is set. I kept the *_concurrent naming for now to limit "code churn", and also because _extended seems to me a little vague. But I am open to change it if there's consensus on renaming all of them to what you suggested.
I have also added a note to the run() docstring to make it more clean that run_concurrent is preferred if the handler can satisfy Clone

where
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
R: IntoResponse + Send + Sync + 'static,
E: std::fmt::Debug + Into<Diagnostic> + Send + 'static,
{
lambda_runtime::run_concurrent(Adapter::from(handler)).await
}

// Replaces update_xray_trace_id_header (env var), now set from Context
fn update_xray_trace_id_header_from_context(headers: &mut http::HeaderMap, context: &Context) {
if let Some(trace_id) = context.xray_trace_id.as_deref() {
if let Ok(header_value) = http::HeaderValue::from_str(trace_id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: What are the cases where http::HeaderValue::from_str() would fail? Should we have a warn or error-level log line if it does?

(I guess this is an existing behavior and this is more of a lift and shift?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this was the existing behavior. It could fail if the xray_trace_id contains invalid header characters, but it should always be valid so I didn't add logging to avoid noise. We could add a debug level log though. Let me know.

headers.insert(http::header::HeaderName::from_static("x-amzn-trace-id"), header_value);
}
}
}

#[cfg(test)]
mod test_adapter {
use std::task::{Context, Poll};
Expand Down
13 changes: 0 additions & 13 deletions lambda-http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ fn into_api_gateway_v2_request(ag: ApiGatewayV2httpRequest) -> http::Request<Bod
.extension(RequestContext::ApiGatewayV2(ag.request_context));

let mut headers = ag.headers;
update_xray_trace_id_header(&mut headers);
if let Some(cookies) = ag.cookies {
if let Ok(header_value) = HeaderValue::from_str(&cookies.join(";")) {
headers.insert(http::header::COOKIE, header_value);
Expand All @@ -170,13 +169,6 @@ fn into_api_gateway_v2_request(ag: ApiGatewayV2httpRequest) -> http::Request<Bod
req
}

fn update_xray_trace_id_header(headers: &mut HeaderMap) {
if let Ok(xray_trace_id) = env::var("_X_AMZN_TRACE_ID") {
if let Ok(header_value) = HeaderValue::from_str(&xray_trace_id) {
headers.insert(HeaderName::from_static("x-amzn-trace-id"), header_value);
}
}
}
#[cfg(feature = "apigw_rest")]
fn into_proxy_request(ag: ApiGatewayProxyRequest) -> http::Request<Body> {
let http_method = ag.http_method;
Expand Down Expand Up @@ -214,7 +206,6 @@ fn into_proxy_request(ag: ApiGatewayProxyRequest) -> http::Request<Body> {
// multi-value_headers our cannoncial source of request headers
let mut headers = ag.multi_value_headers;
headers.extend(ag.headers);
update_xray_trace_id_header(&mut headers);

let base64 = ag.is_base64_encoded;
let mut req = builder
Expand Down Expand Up @@ -265,7 +256,6 @@ fn into_alb_request(alb: AlbTargetGroupRequest) -> http::Request<Body> {
// multi-value_headers our cannoncial source of request headers
let mut headers = alb.multi_value_headers;
headers.extend(alb.headers);
update_xray_trace_id_header(&mut headers);

let base64 = alb.is_base64_encoded;

Expand Down Expand Up @@ -330,7 +320,6 @@ fn into_websocket_request(ag: ApiGatewayWebsocketProxyRequest) -> http::Request<
// multi-value_headers our canonical source of request headers
let mut headers = ag.multi_value_headers;
headers.extend(ag.headers);
update_xray_trace_id_header(&mut headers);

let base64 = ag.is_base64_encoded;
let mut req = builder
Expand All @@ -355,8 +344,6 @@ fn into_pass_through_request(data: String) -> http::Request<Body> {
let headers = builder.headers_mut().unwrap();
headers.insert("Content-Type", "application/json".parse().unwrap());

update_xray_trace_id_header(headers);

let raw_path = "/events";

builder
Expand Down
79 changes: 72 additions & 7 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt};
use crate::{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want a ClonableStreamAdapter too right? To have Clone bounds to support building custom pre-lambda-event-conversion tower stacks, on concurrent handlers?

This relates to: #1013

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense. I added Clone for StreamAdapter when the inner service is Clone, so custom stacks can still work with concurrent handlers.

http::header::SET_COOKIE, request::LambdaRequest, update_xray_trace_id_header_from_context, Request, RequestExt,
};
use bytes::Bytes;
use core::{
fmt::Debug,
Expand Down Expand Up @@ -27,6 +29,18 @@ pub struct StreamAdapter<'a, S, B> {
_phantom_data: PhantomData<&'a B>,
}

impl<'a, S, B> Clone for StreamAdapter<'a, S, B>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
_phantom_data: PhantomData,
}
}
}

impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
where
S: Service<Request, Response = Response<B>, Error = E>,
Expand Down Expand Up @@ -60,10 +74,12 @@ where
}

fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {
let event: Request = req.payload.into();
let LambdaEvent { payload, context } = req;
let mut event: Request = payload.into();
update_xray_trace_id_header_from_context(event.headers_mut(), &context);
Box::pin(
self.service
.call(event.with_lambda_context(req.context))
.call(event.with_lambda_context(context))
.map_ok(into_stream_response),
)
}
Expand Down Expand Up @@ -93,10 +109,29 @@ where
B::Error: Into<Error> + Send + Debug,
{
ServiceBuilder::new()
.map_request(|req: LambdaEvent<LambdaRequest>| {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
})
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
.service(handler)
.map_response(into_stream_response)
}

/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
type EventToRequest = fn(LambdaEvent<LambdaRequest>) -> Request;

#[allow(clippy::type_complexity)]
fn into_stream_service_cloneable<S, B, E>(
handler: S,
) -> MapResponse<MapRequest<S, EventToRequest>, impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone>
where
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
E: Debug + Into<Diagnostic> + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
ServiceBuilder::new()
.map_request(event_to_request as EventToRequest)
.service(handler)
.map_response(into_stream_response)
}
Expand Down Expand Up @@ -128,11 +163,23 @@ where
}
}

fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
let LambdaEvent { payload, context } = req;
let mut event: Request = payload.into();
update_xray_trace_id_header_from_context(event.headers_mut(), &context);
event.with_lambda_context(context)
}

/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses.
///
/// See the [AWS docs for response streaming].
///
/// # Managed concurrency
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
/// it does not enable concurrent polling. Use [`run_with_streaming_response_concurrent`]
/// instead.
///
/// [AWS docs for response streaming]:
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
Expand All @@ -147,6 +194,24 @@ where
lambda_runtime::run(into_stream_service(handler)).await
}

/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses, in a mode that is compatible with Lambda Managed Instances.
///
/// This uses a cloneable, boxed service internally so it can be driven by the
/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`,
/// it falls back to the same sequential behavior as [`run_with_streaming_response`].
pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question around naming run_with_streaming_response_extended / run_with_streaming_response_maybe_concurrent / etc, and nudging people to prefer this to run_with_streaming_response + warning that it doesn't respect the concurrency env

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the non-streaming case, keeping the _concurrent naming for now, but happy to rename it. I also added a doc warning and the runtime now fails fast if AWS_LAMBDA_MAX_CONCURRENCY is set and the non‑concurrent entrypoint is used.

where
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
E: Debug + Into<Diagnostic> + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
lambda_runtime::run_concurrent(into_stream_service_cloneable(handler)).await
}

pin_project_lite::pin_project! {
#[non_exhaustive]
pub struct BodyStream<B> {
Expand Down
Loading