Skip to content

Commit 8cc55c5

Browse files
committed
Fix clippy
1 parent 58478ed commit 8cc55c5

File tree

4 files changed

+28
-39
lines changed

4 files changed

+28
-39
lines changed

grpc/src/client/transport/tonic/mod.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const DEFAULT_BUFFER_SIZE: usize = 1024;
4242
pub(crate) type BoxError = Box<dyn Error + Send + Sync>;
4343

4444
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
45+
type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
4546

4647
pub(crate) fn reg() {
4748
GLOBAL_TRANSPORT_REGISTRY.add_transport(TCP_IP_NETWORK_TYPE, TransportBuilder {});
@@ -109,13 +110,12 @@ fn convert_response(res: Result<TonicResponse<Streaming<Bytes>>, Status>) -> Grp
109110
}
110111
};
111112
let (metadata, stream, extensions) = response.into_parts();
112-
let message_stream: Pin<Box<dyn Stream<Item = Result<Box<dyn Message>, Status>> + Send>> =
113-
Box::pin(stream.map(|msg| {
114-
msg.map(|b| {
115-
let msg: Box<dyn Message> = Box::new(b);
116-
msg
117-
})
118-
}));
113+
let message_stream: BoxStream<Box<dyn Message>> = Box::pin(stream.map(|msg| {
114+
msg.map(|b| {
115+
let msg: Box<dyn Message> = Box::new(b);
116+
msg
117+
})
118+
}));
119119
TonicResponse::from_parts(metadata, message_stream, extensions)
120120
}
121121

@@ -233,7 +233,6 @@ impl TowerService<HttpRequest<Body>> for SendRequestWrapper {
233233

234234
fn call(&mut self, req: http::Request<Body>) -> Self::Future {
235235
let fut = self.inner.send_request(req);
236-
237236
Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(Body::new)) })
238237
}
239238
}

grpc/src/client/transport/tonic/test.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,26 +75,19 @@ pub async fn tonic_transport_rpc() {
7575
let bytes = Bytes::from(request.encode_to_vec());
7676

7777
println!("Sent request: {request:?}");
78-
if tx.send(Box::new(bytes)).await.is_err() {
79-
panic!("Receiver dropped");
80-
}
78+
assert!(tx.send(Box::new(bytes)).await.is_ok(), "Receiver dropped");
8179

8280
// Wait for the reply
83-
match inbound
81+
let resp = inbound
8482
.next()
8583
.await
8684
.expect("server unexpectedly closed the stream!")
87-
{
88-
Ok(resp) => {
89-
let bytes = (resp as Box<dyn Any>).downcast::<Bytes>().unwrap();
90-
let echo_reponse = EchoResponse::decode(bytes).unwrap();
91-
println!("Got response: {echo_reponse:?}");
92-
assert_eq!(echo_reponse.message, message);
93-
}
94-
Err(status) => {
95-
panic!("Error from server: {status:?}");
96-
}
97-
}
85+
.expect("server returned error");
86+
87+
let bytes = (resp as Box<dyn Any>).downcast::<Bytes>().unwrap();
88+
let echo_response = EchoResponse::decode(bytes).unwrap();
89+
println!("Got response: {echo_response:?}");
90+
assert_eq!(echo_response.message, message);
9891
}
9992
});
10093

grpc/src/inmemory/mod.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
use std::{
2-
collections::HashMap,
3-
ops::Add,
4-
sync::{
5-
atomic::{AtomicU32, Ordering},
6-
Arc, LazyLock,
7-
},
8-
};
1+
use std::sync::atomic::{AtomicU32, Ordering};
2+
use std::sync::{Arc, LazyLock, Mutex};
3+
use std::{collections::HashMap, ops::Add};
94

105
use crate::{
116
client::{
@@ -19,15 +14,16 @@ use crate::{
1914
server,
2015
service::{Request, Response, Service},
2116
};
22-
use tokio::sync::{mpsc, oneshot, Mutex, Notify};
17+
use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex, Notify};
2318
use tonic::async_trait;
2419

2520
pub struct Listener {
2621
id: String,
2722
s: Box<mpsc::Sender<Option<server::Call>>>,
28-
r: Arc<Mutex<mpsc::Receiver<Option<server::Call>>>>,
23+
r: Arc<AsyncMutex<mpsc::Receiver<Option<server::Call>>>>,
2924
// List of notifiers to call when closed.
30-
closed_tx: Arc<std::sync::Mutex<Vec<oneshot::Sender<Result<(), String>>>>>,
25+
#[allow(clippy::type_complexity)]
26+
closed_tx: Arc<Mutex<Vec<oneshot::Sender<Result<(), String>>>>>,
3127
}
3228

3329
static ID: AtomicU32 = AtomicU32::new(0);
@@ -38,8 +34,8 @@ impl Listener {
3834
let s = Arc::new(Self {
3935
id: format!("{}", ID.fetch_add(1, Ordering::Relaxed)),
4036
s: Box::new(tx),
41-
r: Arc::new(Mutex::new(rx)),
42-
closed_tx: Arc::new(std::sync::Mutex::new(Vec::new())),
37+
r: Arc::new(AsyncMutex::new(rx)),
38+
closed_tx: Arc::new(Mutex::new(Vec::new())),
4339
});
4440
LISTENERS.lock().unwrap().insert(s.id.clone(), s.clone());
4541
s
@@ -89,8 +85,7 @@ impl crate::server::Listener for Arc<Listener> {
8985
}
9086
}
9187

92-
static LISTENERS: LazyLock<std::sync::Mutex<HashMap<String, Arc<Listener>>>> =
93-
LazyLock::new(std::sync::Mutex::default);
88+
static LISTENERS: LazyLock<Mutex<HashMap<String, Arc<Listener>>>> = LazyLock::new(Mutex::default);
9489

9590
struct ClientTransport {}
9691

grpc/src/rt/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub(crate) mod hyper_wrapper;
2929
#[cfg(feature = "_runtime-tokio")]
3030
pub(crate) mod tokio;
3131

32+
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
33+
3234
/// An abstraction over an asynchronous runtime.
3335
///
3436
/// The `Runtime` trait defines the core functionality required for
@@ -57,7 +59,7 @@ pub(super) trait Runtime: Send + Sync {
5759
&self,
5860
target: SocketAddr,
5961
opts: TcpOptions,
60-
) -> Pin<Box<dyn Future<Output = Result<Box<dyn TcpStream>, String>> + Send>>;
62+
) -> BoxFuture<Result<Box<dyn TcpStream>, String>>;
6163
}
6264

6365
/// A future that resolves after a specified duration.

0 commit comments

Comments
 (0)