Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions grpc/examples/inmemory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::any::Any;

use grpc::service::{Message, Request, Response, Service};
use grpc::service::{Message, MessageAllocator, Request, Response, Service};
use grpc::{client::ChannelOptions, inmemory};
use tokio_stream::StreamExt;
use tonic::async_trait;
Expand All @@ -10,12 +10,37 @@ struct Handler {}
#[derive(Debug)]
struct MyReqMessage(String);

impl Message for MyReqMessage {
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
Err("not implemented".to_string())
}

fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
Err("not implemented".to_string())
}
}

#[derive(Debug)]
struct MyResMessage(String);

impl Message for MyResMessage {
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
Err("not implemented".to_string())
}

fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
Err("not implemented".to_string())
}
}

#[async_trait]
impl Service for Handler {
async fn call(&self, method: String, request: Request) -> Response {
async fn call(
&self,
method: String,
request: Request,
_: Box<dyn MessageAllocator>,
) -> Response {
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(req) = stream.next().await {
Expand All @@ -30,6 +55,15 @@ impl Service for Handler {
}
}

#[derive(Debug, Default)]
struct MyResMessageAllocator {}

impl MessageAllocator for MyResMessageAllocator {
fn allocate(&self) -> Box<dyn Message> {
Box::new(MyResMessage(String::new()))
}
}

#[tokio::main]
async fn main() {
inmemory::reg();
Expand All @@ -55,7 +89,13 @@ async fn main() {
};

let req = Request::new(Box::pin(outbound));
let res = chan.call("/some/method".to_string(), req).await;
let res = chan
.call(
"/some/method".to_string(),
req,
Box::new(MyResMessageAllocator {}),
)
.await;
let mut res = res.into_inner();

while let Some(resp) = res.next().await {
Expand Down
46 changes: 43 additions & 3 deletions grpc/examples/multiaddr.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::any::Any;

use grpc::service::{Message, Request, Response, Service};
use grpc::service::{Message, MessageAllocator, Request, Response, Service};
use grpc::{client::ChannelOptions, inmemory};
use tokio_stream::StreamExt;
use tonic::async_trait;
Expand All @@ -15,9 +15,43 @@ struct MyReqMessage(String);
#[derive(Debug)]
struct MyResMessage(String);

impl Message for MyReqMessage {
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
Err("not implemented".to_string())
}

fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
Err("not implemented".to_string())
}
}

#[derive(Debug, Default)]
struct MyResMessageAllocator {}

impl Message for MyResMessage {
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
Err("not implemented".to_string())
}

fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
Err("not implemented".to_string())
}
}

impl MessageAllocator for MyResMessageAllocator {
fn allocate(&self) -> Box<dyn Message> {
Box::new(MyResMessage(String::new()))
}
}

#[async_trait]
impl Service for Handler {
async fn call(&self, method: String, request: Request) -> Response {
async fn call(
&self,
method: String,
request: Request,
_: Box<dyn MessageAllocator>,
) -> Response {
let id = self.id.clone();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
Expand Down Expand Up @@ -79,7 +113,13 @@ async fn main() {
};

let req = Request::new(Box::pin(outbound));
let res = chan.call("/some/method".to_string(), req).await;
let res = chan
.call(
"/some/method".to_string(),
req,
Box::new(MyResMessageAllocator {}),
)
.await;
let mut res = res.into_inner();

while let Some(resp) = res.next().await {
Expand Down
25 changes: 20 additions & 5 deletions grpc/src/client/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use serde_json::json;
use tonic::async_trait;
use url::Url; // NOTE: http::Uri requires non-empty authority portion of URI

use crate::attributes::Attributes;
use crate::rt;
use crate::service::{Request, Response, Service};
use crate::{attributes::Attributes, service::MessageAllocator};
use crate::{client::ConnectivityState, rt::Runtime};
use crate::{credentials::Credentials, rt::default_runtime};

Expand Down Expand Up @@ -204,9 +204,14 @@ impl Channel {
s.clone().unwrap()
}

pub async fn call(&self, method: String, request: Request) -> Response {
pub async fn call(
&self,
method: String,
request: Request,
response_allocator: Box<dyn MessageAllocator>,
) -> Response {
let ac = self.get_or_create_active_channel();
ac.call(method, request).await
ac.call(method, request, response_allocator).await
}
}

Expand Down Expand Up @@ -302,7 +307,12 @@ impl ActiveChannel {
})
}

async fn call(&self, method: String, request: Request) -> Response {
async fn call(
&self,
method: String,
request: Request,
response_allocator: Box<dyn MessageAllocator>,
) -> Response {
// TODO: pre-pick tasks (e.g. deadlines, interceptors, retry)
let mut i = self.picker.iter();
loop {
Expand All @@ -314,7 +324,12 @@ impl ActiveChannel {
if let Some(sc) = (pr.subchannel.as_ref() as &dyn Any)
.downcast_ref::<ExternalSubchannel>()
{
return sc.isc.as_ref().unwrap().call(method, request).await;
return sc
.isc
.as_ref()
.unwrap()
.call(method, request, response_allocator)
.await;
} else {
panic!("picked subchannel is not an implementation provided by the channel");
}
Expand Down
10 changes: 10 additions & 0 deletions grpc/src/client/load_balancing/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ pub(crate) fn new_request() -> Request {
)))
}

impl Message for EmptyMessage {
fn encode(&self, buf: &mut bytes::BytesMut) -> Result<(), String> {
Ok(())
}

fn decode(&mut self, buf: &bytes::Bytes) -> Result<(), String> {
Ok(())
}
}

// A test subchannel that forwards connect calls to a channel.
// This allows tests to verify when a subchannel is asked to connect.
pub(crate) struct TestSubchannel {
Expand Down
2 changes: 1 addition & 1 deletion grpc/src/client/name_resolution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
};

mod backoff;
mod dns;
pub mod dns;
mod registry;
pub use registry::global_registry;
use url::Url;
Expand Down
11 changes: 8 additions & 3 deletions grpc/src/client/subchannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
transport::{ConnectedTransport, TransportOptions},
},
rt::{BoxedTaskHandle, Runtime},
service::{Request, Response, Service},
service::{MessageAllocator, Request, Response, Service},
};
use core::panic;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -192,7 +192,12 @@ struct InnerSubchannel {

#[async_trait]
impl Service for InternalSubchannel {
async fn call(&self, method: String, request: Request) -> Response {
async fn call(
&self,
method: String,
request: Request,
response_allocator: Box<dyn MessageAllocator>,
) -> Response {
let svc = self.inner.lock().unwrap().state.connected_transport();
if svc.is_none() {
// TODO(easwars): Change the signature of this method to return a
Expand All @@ -201,7 +206,7 @@ impl Service for InternalSubchannel {
}

let svc = svc.unwrap().clone();
return svc.call(method, request).await;
return svc.call(method, request, response_allocator).await;
}
}

Expand Down
43 changes: 26 additions & 17 deletions grpc/src/client/transport/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use crate::rt::BoxedTaskHandle;
use crate::rt::Runtime;
use crate::rt::TcpOptions;
use crate::service::Message;
use crate::service::MessageAllocator;
use crate::service::Request as GrpcRequest;
use crate::service::Response as GrpcResponse;
use crate::{client::name_resolution::TCP_IP_NETWORK_TYPE, service::Service};
use bytes::Bytes;
use bytes::BytesMut;
use http::uri::PathAndQuery;
use http::Request as HttpRequest;
use http::Response as HttpResponse;
Expand Down Expand Up @@ -63,7 +65,12 @@ impl Drop for TonicTransport {

#[async_trait]
impl Service for TonicTransport {
async fn call(&self, method: String, request: GrpcRequest) -> GrpcResponse {
async fn call(
&self,
method: String,
request: GrpcRequest,
response_allocator: Box<dyn MessageAllocator>,
) -> GrpcResponse {
let Ok(path) = PathAndQuery::from_maybe_shared(method) else {
let err = Status::internal("Failed to parse path");
return create_error_response(err);
Expand All @@ -78,7 +85,7 @@ impl Service for TonicTransport {
};
let request = convert_request(request);
let response = grpc.streaming(request, path, BytesCodec {}).await;
convert_response(response)
convert_response(response, response_allocator)
}
}

Expand All @@ -88,23 +95,22 @@ fn create_error_response(status: Status) -> GrpcResponse {
TonicResponse::new(Box::pin(stream))
}

fn convert_request(req: GrpcRequest) -> TonicRequest<Pin<Box<dyn Stream<Item = Bytes> + Send>>> {
fn convert_request(req: GrpcRequest) -> TonicRequest<BoxStream<Bytes>> {
let (metadata, extensions, stream) = req.into_parts();

let bytes_stream = Box::pin(stream.filter_map(|msg| {
if let Ok(bytes) = (msg as Box<dyn Any>).downcast::<Bytes>() {
Some(*bytes)
} else {
// If it fails, log the error and return None to filter it out.
eprintln!("A message could not be downcast to Bytes and was skipped.");
None
}
let bytes_stream = Box::pin(stream.map(|msg| {
let mut buf = BytesMut::with_capacity(msg.encoded_message_size_hint().unwrap_or(0));
msg.encode(&mut buf).map_err(Status::internal)?;
Ok(buf.freeze())
}));

TonicRequest::from_parts(metadata, extensions, bytes_stream as _)
}

fn convert_response(res: Result<TonicResponse<Streaming<Bytes>>, Status>) -> GrpcResponse {
fn convert_response(
res: Result<TonicResponse<Streaming<Bytes>>, Status>,
allocator: Box<dyn MessageAllocator>,
) -> GrpcResponse {
let response = match res {
Ok(s) => s,
Err(e) => {
Expand All @@ -113,11 +119,14 @@ fn convert_response(res: Result<TonicResponse<Streaming<Bytes>>, Status>) -> Grp
}
};
let (metadata, stream, extensions) = response.into_parts();
let message_stream: BoxStream<Box<dyn Message>> = Box::pin(stream.map(|msg| {
msg.map(|b| {
let msg: Box<dyn Message> = Box::new(b);
msg
})
let allocator: Arc<dyn MessageAllocator> = Arc::from(allocator);
let allocator_copy = allocator.clone();
let message_stream: BoxStream<Box<dyn Message>> = Box::pin(stream.map(move |msg| {
let allocator = allocator_copy.clone();
let buf = msg?;
let mut msg = allocator.allocate();
msg.decode(&buf).map_err(Status::internal)?;
Ok(msg)
}));
TonicResponse::from_parts(metadata, message_stream, extensions)
}
Expand Down
Loading
Loading