-
Notifications
You must be signed in to change notification settings - Fork 380
feat: support Lambda Managed Instances (draft) #1067
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
32251ed
7bbab4b
d05a914
1c0b120
c2a1168
2ac1cb6
dd08c2f
2653859
f4a7612
cc4a440
949c002
3ddc948
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| [package] | ||
| name = "basic-lambda-concurrent" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
|
|
||
| [dependencies] | ||
| lambda_runtime = { path = "../../lambda-runtime" } | ||
| serde = "1.0.219" | ||
| tokio = { version = "1", features = ["macros"] } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| // This example requires the following input to succeed: | ||
| // { "command": "do something" } | ||
|
|
||
| use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; | ||
| use serde::{Deserialize, Serialize}; | ||
|
|
||
| /// This is also a made-up example. Requests come into the runtime as unicode | ||
| /// strings in json format, which can map to any structure that implements `serde::Deserialize` | ||
| /// The runtime pays no attention to the contents of the request payload. | ||
| #[derive(Deserialize)] | ||
| struct Request { | ||
| command: String, | ||
| } | ||
|
|
||
| /// This is a made-up example of what a response structure may look like. | ||
| /// There is no restriction on what it can be. The runtime requires responses | ||
| /// to be serialized into json. The runtime pays no attention | ||
| /// to the contents of the response payload. | ||
| #[derive(Serialize)] | ||
| struct Response { | ||
| req_id: String, | ||
| msg: String, | ||
| } | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> Result<(), Error> { | ||
| // required to enable CloudWatch error logging by the runtime | ||
| tracing::init_default_subscriber(); | ||
|
|
||
| let func = service_fn(my_handler); | ||
| if let Err(err) = lambda_runtime::run_concurrent(func).await { | ||
| tracing::error!(error = %err, "run error"); | ||
| return Err(err); | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| pub(crate) async fn my_handler(event: LambdaEvent<Request>) -> Result<Response, Error> { | ||
| // extract some useful info from the request | ||
| let command = event.payload.command; | ||
|
|
||
| // prepare the response | ||
| let resp = Response { | ||
| req_id: event.context.request_id, | ||
| msg: format!("Command {command} executed."), | ||
| }; | ||
|
|
||
| // return `Response` (it will be serialized to JSON automatically by the runtime) | ||
| Ok(resp) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use crate::{my_handler, Request}; | ||
| use lambda_runtime::{Context, LambdaEvent}; | ||
|
|
||
| #[tokio::test] | ||
| async fn response_is_good_for_simple_input() { | ||
| let id = "ID"; | ||
|
|
||
| let mut context = Context::default(); | ||
| context.request_id = id.to_string(); | ||
|
|
||
| let payload = Request { | ||
| command: "X".to_string(), | ||
| }; | ||
| let event = LambdaEvent { payload, context }; | ||
|
|
||
| let result = my_handler(event).await.unwrap(); | ||
|
|
||
| assert_eq!(result.msg, "Command X executed."); | ||
| assert_eq!(result.req_id, id.to_string()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,7 +102,7 @@ use std::{ | |
| }; | ||
|
|
||
| mod streaming; | ||
| pub use streaming::{run_with_streaming_response, StreamAdapter}; | ||
| pub use streaming::{run_with_streaming_response, run_with_streaming_response_concurrent, StreamAdapter}; | ||
|
|
||
| /// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type | ||
| pub type Request = http::Request<Body>; | ||
|
|
@@ -151,6 +151,18 @@ pub struct Adapter<'a, R, S> { | |
| _phantom_data: PhantomData<&'a R>, | ||
| } | ||
|
|
||
| impl<'a, R, S> Clone for Adapter<'a, R, S> | ||
| where | ||
| S: Clone, | ||
| { | ||
| fn clone(&self) -> Self { | ||
| Self { | ||
| service: self.service.clone(), | ||
| _phantom_data: PhantomData, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<'a, R, S, E> From<S> for Adapter<'a, R, S> | ||
| where | ||
| S: Service<Request, Response = R, Error = E>, | ||
|
|
@@ -180,9 +192,11 @@ where | |
| } | ||
|
|
||
| fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future { | ||
| let request_origin = req.payload.request_origin(); | ||
| let event: Request = req.payload.into(); | ||
| let fut = Box::pin(self.service.call(event.with_lambda_context(req.context))); | ||
| let LambdaEvent { payload, context } = req; | ||
| let request_origin = payload.request_origin(); | ||
| let mut event: Request = payload.into(); | ||
| update_xray_trace_id_header_from_context(event.headers_mut(), &context); | ||
| let fut = Box::pin(self.service.call(event.with_lambda_context(context))); | ||
|
|
||
| TransformResponse::Request(request_origin, fut) | ||
| } | ||
|
|
@@ -193,6 +207,12 @@ where | |
| /// | ||
| /// This takes care of transforming the LambdaEvent into a [`Request`] and then | ||
| /// converting the result into a `LambdaResponse`. | ||
| /// | ||
| /// # Managed concurrency | ||
| /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because | ||
| /// it does not enable concurrent polling. If your handler can satisfy `Clone`, | ||
| /// prefer [`run_concurrent`], which honors managed concurrency and falls back to | ||
| /// sequential behavior when unset. | ||
| pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error> | ||
| where | ||
| S: Service<Request, Response = R, Error = E>, | ||
|
|
@@ -203,6 +223,33 @@ where | |
| lambda_runtime::run(Adapter::from(handler)).await | ||
| } | ||
|
|
||
| /// Starts the Lambda Rust runtime in a mode that is compatible with | ||
| /// Lambda Managed Instances (concurrent invocations). | ||
| /// | ||
| /// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this | ||
| /// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own | ||
| /// `/next` polling loop. When the environment variable is unset or `<= 1`, | ||
| /// it falls back to the same sequential behavior as [`run`], so the same | ||
| /// handler can run on both classic Lambda and Lambda Managed Instances. | ||
| pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error> | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Question, not sure): If the fallback case if The regular Context: My suspicion is, most consumers don't mind the I would suggest at minimum the following:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! I added a doc warning on |
||
| where | ||
| S: Service<Request, Response = R, Error = E> + Clone + Send + 'static, | ||
| S::Future: Send + 'static, | ||
| R: IntoResponse + Send + Sync + 'static, | ||
| E: std::fmt::Debug + Into<Diagnostic> + Send + 'static, | ||
| { | ||
| lambda_runtime::run_concurrent(Adapter::from(handler)).await | ||
| } | ||
|
|
||
| // Replaces update_xray_trace_id_header (env var), now set from Context | ||
| fn update_xray_trace_id_header_from_context(headers: &mut http::HeaderMap, context: &Context) { | ||
| if let Some(trace_id) = context.xray_trace_id.as_deref() { | ||
| if let Ok(header_value) = http::HeaderValue::from_str(trace_id) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: What are the cases where (I guess this is an existing behavior and this is more of a lift and shift?)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, this was the existing behavior. It could fail if the |
||
| headers.insert(http::header::HeaderName::from_static("x-amzn-trace-id"), header_value); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod test_adapter { | ||
| use std::task::{Context, Poll}; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,6 @@ | ||
| use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt}; | ||
| use crate::{ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want a This relates to: #1013
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that makes sense. I added |
||
| http::header::SET_COOKIE, request::LambdaRequest, update_xray_trace_id_header_from_context, Request, RequestExt, | ||
| }; | ||
| use bytes::Bytes; | ||
| use core::{ | ||
| fmt::Debug, | ||
|
|
@@ -27,6 +29,18 @@ pub struct StreamAdapter<'a, S, B> { | |
| _phantom_data: PhantomData<&'a B>, | ||
| } | ||
|
|
||
| impl<'a, S, B> Clone for StreamAdapter<'a, S, B> | ||
| where | ||
| S: Clone, | ||
| { | ||
| fn clone(&self) -> Self { | ||
| Self { | ||
| service: self.service.clone(), | ||
| _phantom_data: PhantomData, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B> | ||
| where | ||
| S: Service<Request, Response = Response<B>, Error = E>, | ||
|
|
@@ -60,10 +74,12 @@ where | |
| } | ||
|
|
||
| fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future { | ||
| let event: Request = req.payload.into(); | ||
| let LambdaEvent { payload, context } = req; | ||
| let mut event: Request = payload.into(); | ||
| update_xray_trace_id_header_from_context(event.headers_mut(), &context); | ||
| Box::pin( | ||
| self.service | ||
| .call(event.with_lambda_context(req.context)) | ||
| .call(event.with_lambda_context(context)) | ||
| .map_ok(into_stream_response), | ||
| ) | ||
| } | ||
|
|
@@ -93,10 +109,29 @@ where | |
| B::Error: Into<Error> + Send + Debug, | ||
| { | ||
| ServiceBuilder::new() | ||
| .map_request(|req: LambdaEvent<LambdaRequest>| { | ||
| let event: Request = req.payload.into(); | ||
| event.with_lambda_context(req.context) | ||
| }) | ||
| .map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request) | ||
| .service(handler) | ||
| .map_response(into_stream_response) | ||
| } | ||
|
|
||
| /// Builds a streaming-aware Tower service from a `Service<Request>` that can be | ||
| /// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint. | ||
| type EventToRequest = fn(LambdaEvent<LambdaRequest>) -> Request; | ||
|
|
||
| #[allow(clippy::type_complexity)] | ||
| fn into_stream_service_cloneable<S, B, E>( | ||
| handler: S, | ||
| ) -> MapResponse<MapRequest<S, EventToRequest>, impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone> | ||
| where | ||
| S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static, | ||
| S::Future: Send + 'static, | ||
| E: Debug + Into<Diagnostic> + Send + 'static, | ||
| B: Body + Unpin + Send + 'static, | ||
| B::Data: Into<Bytes> + Send, | ||
| B::Error: Into<Error> + Send + Debug, | ||
| { | ||
| ServiceBuilder::new() | ||
| .map_request(event_to_request as EventToRequest) | ||
| .service(handler) | ||
| .map_response(into_stream_response) | ||
| } | ||
|
|
@@ -128,11 +163,23 @@ where | |
| } | ||
| } | ||
|
|
||
| fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request { | ||
| let LambdaEvent { payload, context } = req; | ||
| let mut event: Request = payload.into(); | ||
| update_xray_trace_id_header_from_context(event.headers_mut(), &context); | ||
| event.with_lambda_context(context) | ||
| } | ||
|
|
||
| /// Runs the Lambda runtime with a handler that returns **streaming** HTTP | ||
| /// responses. | ||
| /// | ||
| /// See the [AWS docs for response streaming]. | ||
| /// | ||
| /// # Managed concurrency | ||
| /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because | ||
| /// it does not enable concurrent polling. Use [`run_with_streaming_response_concurrent`] | ||
| /// instead. | ||
| /// | ||
| /// [AWS docs for response streaming]: | ||
| /// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html | ||
| pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error> | ||
|
|
@@ -147,6 +194,24 @@ where | |
| lambda_runtime::run(into_stream_service(handler)).await | ||
| } | ||
|
|
||
| /// Runs the Lambda runtime with a handler that returns **streaming** HTTP | ||
| /// responses, in a mode that is compatible with Lambda Managed Instances. | ||
| /// | ||
| /// This uses a cloneable, boxed service internally so it can be driven by the | ||
| /// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`, | ||
| /// it falls back to the same sequential behavior as [`run_with_streaming_response`]. | ||
| pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error> | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question around naming
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as in the non-streaming case, keeping the |
||
| where | ||
| S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static, | ||
| S::Future: Send + 'static, | ||
| E: Debug + Into<Diagnostic> + Send + 'static, | ||
| B: Body + Unpin + Send + 'static, | ||
| B::Data: Into<Bytes> + Send, | ||
| B::Error: Into<Error> + Send + Debug, | ||
| { | ||
| lambda_runtime::run_concurrent(into_stream_service_cloneable(handler)).await | ||
| } | ||
|
|
||
| pin_project_lite::pin_project! { | ||
| #[non_exhaustive] | ||
| pub struct BodyStream<B> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: is this makefile target + configurable variable worth mentioning in the README?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, thank you. I added a short note in the README under RIE testing showing
make test-rie-lmi EXAMPLE=basic-lambda-concurrent