-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(grpc): Add tonic transport #2339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
66e6c10
b113136
957377c
13f1628
e69cb49
b01eca5
13bb667
e893238
ae686c8
7388013
398fdcf
169bd72
e6afa5f
da64ac5
4ac449f
58478ed
8cc55c5
5a1eb9a
4c89100
61c726a
bce705f
160f27c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * | ||
| * Copyright 2018 gRPC authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * | ||
| */ | ||
|
|
||
| syntax = "proto3"; | ||
|
|
||
| package grpc.examples.echo; | ||
|
|
||
| // EchoRequest is the request for echo. | ||
| message EchoRequest { | ||
| string message = 1; | ||
| } | ||
|
|
||
| // EchoResponse is the response for echo. | ||
| message EchoResponse { | ||
| string message = 1; | ||
| } | ||
|
|
||
| // Echo is the echo service. | ||
| service Echo { | ||
| // UnaryEcho is unary echo. | ||
| rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} | ||
| // ServerStreamingEcho is server side streaming. | ||
| rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} | ||
| // ClientStreamingEcho is client side streaming. | ||
| rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} | ||
| // BidirectionalStreamingEcho is bidi streaming. | ||
| rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,11 +2,16 @@ use super::{ | |
| channel::{InternalChannelController, WorkQueueTx}, | ||
| load_balancing::{self, ExternalSubchannel, Picker, Subchannel, SubchannelState}, | ||
| name_resolution::Address, | ||
| transport::{self, ConnectedTransport, Transport, TransportRegistry}, | ||
| transport::{self, Transport, TransportRegistry}, | ||
| ConnectivityState, | ||
| }; | ||
| use crate::{ | ||
| client::{channel::WorkQueueItem, subchannel}, | ||
| client::{ | ||
| channel::WorkQueueItem, | ||
| subchannel, | ||
| transport::{ConnectedTransport, TransportOptions}, | ||
| }, | ||
| rt::{Runtime, TaskHandle}, | ||
| service::{Request, Response, Service}, | ||
| }; | ||
| use core::panic; | ||
|
|
@@ -18,13 +23,13 @@ use std::{ | |
| sync::{Arc, Mutex, RwLock, Weak}, | ||
| }; | ||
| use tokio::{ | ||
| sync::{mpsc, watch, Notify}, | ||
| sync::{mpsc, oneshot, watch, Notify}, | ||
| task::{AbortHandle, JoinHandle}, | ||
| time::{Duration, Instant}, | ||
| }; | ||
| use tonic::async_trait; | ||
|
|
||
| type SharedService = Arc<dyn ConnectedTransport>; | ||
| type SharedService = Arc<dyn Service>; | ||
|
|
||
| pub trait Backoff: Send + Sync { | ||
| fn backoff_until(&self) -> Instant; | ||
|
|
@@ -52,11 +57,11 @@ enum InternalSubchannelState { | |
| } | ||
|
|
||
| struct InternalSubchannelConnectingState { | ||
| abort_handle: Option<AbortHandle>, | ||
| abort_handle: Option<Box<dyn TaskHandle>>, | ||
|
||
| } | ||
|
|
||
| struct InternalSubchannelReadyState { | ||
| abort_handle: Option<AbortHandle>, | ||
| abort_handle: Option<Box<dyn TaskHandle>>, | ||
| svc: SharedService, | ||
| } | ||
|
|
||
|
|
@@ -178,6 +183,7 @@ pub(crate) struct InternalSubchannel { | |
| unregister_fn: Option<Box<dyn FnOnce(SubchannelKey) + Send + Sync>>, | ||
| state_machine_event_sender: mpsc::UnboundedSender<SubchannelStateMachineEvent>, | ||
| inner: Mutex<InnerSubchannel>, | ||
| runtime: Arc<dyn Runtime>, | ||
| } | ||
|
|
||
| struct InnerSubchannel { | ||
|
|
@@ -204,7 +210,7 @@ impl Service for InternalSubchannel { | |
|
|
||
| enum SubchannelStateMachineEvent { | ||
| ConnectionRequested, | ||
| ConnectionSucceeded(SharedService), | ||
| ConnectionSucceeded(SharedService, oneshot::Receiver<Result<(), String>>), | ||
| ConnectionTimedOut, | ||
| ConnectionFailed(String), | ||
| ConnectionTerminated, | ||
|
|
@@ -214,7 +220,7 @@ impl Debug for SubchannelStateMachineEvent { | |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| match self { | ||
| Self::ConnectionRequested => write!(f, "ConnectionRequested"), | ||
| Self::ConnectionSucceeded(_) => write!(f, "ConnectionSucceeded"), | ||
| Self::ConnectionSucceeded(_, _) => write!(f, "ConnectionSucceeded"), | ||
| Self::ConnectionTimedOut => write!(f, "ConnectionTimedOut"), | ||
| Self::ConnectionFailed(_) => write!(f, "ConnectionFailed"), | ||
| Self::ConnectionTerminated => write!(f, "ConnectionTerminated"), | ||
|
|
@@ -229,6 +235,7 @@ impl InternalSubchannel { | |
| transport: Arc<dyn Transport>, | ||
| backoff: Arc<dyn Backoff>, | ||
| unregister_fn: Box<dyn FnOnce(SubchannelKey) + Send + Sync>, | ||
| runtime: Arc<dyn Runtime>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious: Given that we will have the same runtime for all the different gRPC components that require a runtime, did we consider something like a singleton that is initialized at init time, and all the components can use a getter to retrieve and use the singleton instead of the runtime being passed to every component that needs it?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Different grpc channels could theoretically use different runtimes. Maybe that isn't something we need to support, but it's pretty easily attained - it just requires passing around the runtime a bit more than if it were global.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. C++ passes the event engine through channel args. In my opinion passing the runtime through a function param allows for cleaner dependency injection. It also enforces that the runtime is set during channel creation, before RPCs are made. Having a singleton runtime will force all gRPC channels in a binary to use the same runtime. I don't know if this is a con though. We can discuss this in the team meeting.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fine, thoug all of these |
||
| ) -> Arc<InternalSubchannel> { | ||
| println!("creating new internal subchannel for: {:?}", &key); | ||
| let (tx, mut rx) = mpsc::unbounded_channel::<SubchannelStateMachineEvent>(); | ||
|
|
@@ -244,23 +251,24 @@ impl InternalSubchannel { | |
| backoff_task: None, | ||
| disconnect_task: None, | ||
| }), | ||
| runtime: runtime.clone(), | ||
| }); | ||
|
|
||
| // This long running task implements the subchannel state machine. When | ||
| // the subchannel is dropped, the channel from which this task reads is | ||
| // closed, and therefore this task exits because rx.recv() returns None | ||
| // in that case. | ||
| let arc_to_self = Arc::clone(&isc); | ||
| tokio::task::spawn(async move { | ||
| runtime.spawn(Box::pin(async move { | ||
| println!("starting subchannel state machine for: {:?}", &key); | ||
| while let Some(m) = rx.recv().await { | ||
| println!("subchannel {:?} received event {:?}", &key, &m); | ||
| match m { | ||
| SubchannelStateMachineEvent::ConnectionRequested => { | ||
| arc_to_self.move_to_connecting(); | ||
| } | ||
| SubchannelStateMachineEvent::ConnectionSucceeded(svc) => { | ||
| arc_to_self.move_to_ready(svc); | ||
| SubchannelStateMachineEvent::ConnectionSucceeded(svc, rx) => { | ||
| arc_to_self.move_to_ready(svc, rx); | ||
| } | ||
| SubchannelStateMachineEvent::ConnectionTimedOut => { | ||
| arc_to_self.move_to_transient_failure("connect timeout expired".into()); | ||
|
|
@@ -277,7 +285,7 @@ impl InternalSubchannel { | |
| } | ||
| } | ||
| println!("exiting work queue task in subchannel"); | ||
| }); | ||
| })); | ||
| isc | ||
| } | ||
|
|
||
|
|
@@ -345,30 +353,34 @@ impl InternalSubchannel { | |
| let transport = self.transport.clone(); | ||
| let address = self.address().address; | ||
| let state_machine_tx = self.state_machine_event_sender.clone(); | ||
| let connect_task = tokio::task::spawn(async move { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have some kind of
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ultimately (pre-1.0) we want to not have any tokio runtime crates/features listed in Cargo.toml, except if you are using a tokio feature flag. That would prevent such a thing.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spent some time looking into this. I found two approaches:
@LucioFranco would like to get your thoughts on this.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cargo features can be enabled even if a (transitive) dependency enabled the feature. I wasn't seein any compilation failures even after removing the tokio:rt feature from the cargo tree -i tokio -e features --edges=normal -p grpc --no-default-featuresBuffer has a constructor that uses tokio as the default executor. We're not using this constructor though.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a default private feature for the tokio runtime that enables the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We discussed this yesterday, for now this is fine, we can rely on tokio initially until we make some more overall progress. |
||
| // TODO: All these options to be configured by users. | ||
| let transport_opts = TransportOptions::default(); | ||
| let runtime = self.runtime.clone(); | ||
|
|
||
| let connect_task = self.runtime.spawn(Box::pin(async move { | ||
| tokio::select! { | ||
| _ = tokio::time::sleep(min_connect_timeout) => { | ||
| let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTimedOut); | ||
| } | ||
| result = transport.connect(address.to_string().clone()) => { | ||
| result = transport.connect(address.to_string().clone(), runtime, &transport_opts) => { | ||
| match result { | ||
| Ok(s) => { | ||
| let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionSucceeded(Arc::from(s))); | ||
| let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionSucceeded(Arc::from(s.service), s.disconnection_listener)); | ||
| } | ||
| Err(e) => { | ||
| let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionFailed(e)); | ||
| } | ||
| } | ||
| }, | ||
| } | ||
| }); | ||
| })); | ||
| let mut inner = self.inner.lock().unwrap(); | ||
| inner.state = InternalSubchannelState::Connecting(InternalSubchannelConnectingState { | ||
| abort_handle: Some(connect_task.abort_handle()), | ||
| abort_handle: Some(connect_task), | ||
| }); | ||
| } | ||
|
|
||
| fn move_to_ready(&self, svc: SharedService) { | ||
| fn move_to_ready(&self, svc: SharedService, closed_rx: oneshot::Receiver<Result<(), String>>) { | ||
| let svc2 = svc.clone(); | ||
| { | ||
| let mut inner = self.inner.lock().unwrap(); | ||
|
|
@@ -383,17 +395,19 @@ impl InternalSubchannel { | |
| }); | ||
|
|
||
| let state_machine_tx = self.state_machine_event_sender.clone(); | ||
| let disconnect_task = tokio::task::spawn(async move { | ||
| let task_handle = self.runtime.spawn(Box::pin(async move { | ||
| // TODO(easwars): Does it make sense for disconnected() to return an | ||
| // error string containing information about why the connection | ||
| // terminated? But what can we do with that error other than logging | ||
| // it, which the transport can do as well? | ||
| svc.disconnected().await; | ||
| if let Err(e) = closed_rx.await { | ||
arjan-bal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| eprintln!("Transport closed with error: {}", e.to_string()) | ||
| }; | ||
| let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTerminated); | ||
| }); | ||
| })); | ||
| let mut inner = self.inner.lock().unwrap(); | ||
| inner.state = InternalSubchannelState::Ready(InternalSubchannelReadyState { | ||
| abort_handle: Some(disconnect_task.abort_handle()), | ||
| abort_handle: Some(task_handle), | ||
| svc: svc2.clone(), | ||
| }); | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.