Skip to content

Commit 957377c

Browse files
committed
Merge remote-tracking branch 'source/master' into grpc-tonic-transport-1
2 parents b113136 + d4e4a2a commit 957377c

File tree

24 files changed

+2146
-193
lines changed

24 files changed

+2146
-193
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ members = [
3333
resolver = "2"
3434

3535
[workspace.package]
36-
rust-version = "1.75"
36+
rust-version = "1.86"
3737

3838
[workspace.lints.rust]
3939
missing_debug_implementations = "warn"

grpc/Cargo.toml

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,38 @@ authors = ["gRPC Authors"]
66
license = "MIT"
77

88
[dependencies]
9-
url = "2.5.0"
10-
tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] }
11-
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen"] }
12-
futures-core = "0.3.31"
13-
serde_json = "1.0.140"
14-
serde = "1.0.219"
15-
hickory-resolver = { version = "0.25.1", optional = true }
16-
rand = "0.9"
17-
parking_lot = "0.12.4"
189
bytes = "1.10.1"
1910
futures = "0.3.31"
20-
hyper = { version = "1.6.0", features = ["client", "http2"] }
21-
pin-project-lite = "0.2.16"
22-
tokio-stream = "0.1.17"
23-
http = "1.1.0"
2411
tower = { version = "0.5.2", features = ["buffer", "limit", "util"] }
2512
tower-service = "0.3.3"
2613
socket2 = "0.5.10"
14+
futures-core = "0.3.31"
15+
futures-util = "0.3.31"
16+
hickory-resolver = { version = "0.25.1", optional = true }
17+
http = "1.1.0"
18+
http-body = "1.0.1"
19+
hyper = { version = "1.6.0", features = ["client", "http2"] }
20+
hyper-util = "0.1.14"
21+
once_cell = "1.19.0"
22+
parking_lot = "0.12.4"
23+
pin-project-lite = "0.2.16"
24+
rand = "0.9"
25+
serde = { version = "1.0.219", features = ["derive"] }
26+
serde_json = "1.0.140"
27+
tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] }
28+
tokio-stream = "0.1.17"
29+
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen", "transport"] }
30+
url = "2.5.0"
2731

2832
[dev-dependencies]
29-
hickory-server = "0.25.2"
30-
prost = "0.14"
3133
async-stream = "0.3.6"
3234
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["prost", "server", "router"] }
35+
hickory-server = "0.25.2"
36+
prost = "0.14"
3337

3438
[build-dependencies]
3539
tonic-build = { path = "../tonic-build" }
40+
prost = "0.14"
3641

3742
[features]
3843
default = ["dns"]
@@ -42,4 +47,5 @@ dns = ["dep:hickory-resolver"]
4247
allowed_external_types = [
4348
"tonic::*",
4449
"futures_core::stream::Stream",
50+
"tokio::sync::oneshot::Sender",
4551
]

grpc/examples/inmemory.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use std::any::Any;
2+
3+
use futures_util::stream::StreamExt;
4+
use grpc::service::{Message, Request, Response, Service};
5+
use grpc::{client::ChannelOptions, inmemory};
6+
use tonic::async_trait;
7+
8+
struct Handler {}
9+
10+
#[derive(Debug)]
11+
struct MyReqMessage(String);
12+
13+
#[derive(Debug)]
14+
struct MyResMessage(String);
15+
16+
#[async_trait]
17+
impl Service for Handler {
18+
async fn call(&self, method: String, request: Request) -> Response {
19+
let mut stream = request.into_inner();
20+
let output = async_stream::try_stream! {
21+
while let Some(req) = stream.next().await {
22+
yield Box::new(MyResMessage(format!(
23+
"Server: responding to: {}; msg: {}",
24+
method, (req as Box<dyn Any>).downcast_ref::<MyReqMessage>().unwrap().0,
25+
))) as Box<dyn Message>;
26+
}
27+
};
28+
29+
Response::new(Box::pin(output))
30+
}
31+
}
32+
33+
#[tokio::main]
34+
async fn main() {
35+
inmemory::reg();
36+
37+
// Spawn the server.
38+
let lis = inmemory::Listener::new();
39+
let mut srv = grpc::server::Server::new();
40+
srv.set_handler(Handler {});
41+
let lis_clone = lis.clone();
42+
tokio::task::spawn(async move {
43+
srv.serve(&lis_clone).await;
44+
println!("serve returned for listener 1!");
45+
});
46+
47+
println!("Creating channel for {}", lis.target());
48+
let chan_opts = ChannelOptions::default();
49+
let chan = grpc::client::Channel::new(lis.target().as_str(), None, chan_opts);
50+
51+
let outbound = async_stream::stream! {
52+
yield Box::new(MyReqMessage("My Request 1".to_string())) as Box<dyn Message>;
53+
yield Box::new(MyReqMessage("My Request 2".to_string()));
54+
yield Box::new(MyReqMessage("My Request 3".to_string()));
55+
};
56+
57+
let req = Request::new(Box::pin(outbound));
58+
let res = chan.call("/some/method".to_string(), req).await;
59+
let mut res = res.into_inner();
60+
61+
while let Some(resp) = res.next().await {
62+
println!(
63+
"CALL RESPONSE: {}",
64+
(resp.unwrap() as Box<dyn Any>)
65+
.downcast_ref::<MyResMessage>()
66+
.unwrap()
67+
.0,
68+
);
69+
}
70+
lis.close().await;
71+
}

grpc/examples/multiaddr.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use std::any::Any;
2+
3+
use futures_util::StreamExt;
4+
use grpc::service::{Message, Request, Response, Service};
5+
use grpc::{client::ChannelOptions, inmemory};
6+
use tonic::async_trait;
7+
8+
struct Handler {
9+
id: String,
10+
}
11+
12+
#[derive(Debug)]
13+
struct MyReqMessage(String);
14+
15+
#[derive(Debug)]
16+
struct MyResMessage(String);
17+
18+
#[async_trait]
19+
impl Service for Handler {
20+
async fn call(&self, method: String, request: Request) -> Response {
21+
let id = self.id.clone();
22+
let mut stream = request.into_inner();
23+
let output = async_stream::try_stream! {
24+
while let Some(req) = stream.next().await {
25+
yield Box::new(MyResMessage(format!(
26+
"Server {}: responding to: {}; msg: {}",
27+
id, method, (req as Box<dyn Any>).downcast_ref::<MyReqMessage>().unwrap().0,
28+
))) as Box<dyn Message>;
29+
}
30+
};
31+
32+
Response::new(Box::pin(output))
33+
}
34+
}
35+
36+
#[tokio::main]
37+
async fn main() {
38+
inmemory::reg();
39+
40+
// Spawn the first server.
41+
let lis1 = inmemory::Listener::new();
42+
let mut srv = grpc::server::Server::new();
43+
srv.set_handler(Handler { id: lis1.id() });
44+
let lis1_clone = lis1.clone();
45+
tokio::task::spawn(async move {
46+
srv.serve(&lis1_clone).await;
47+
println!("serve returned for listener 1!");
48+
});
49+
50+
// Spawn the second server.
51+
let lis2 = inmemory::Listener::new();
52+
let mut srv = grpc::server::Server::new();
53+
srv.set_handler(Handler { id: lis2.id() });
54+
let lis2_clone = lis2.clone();
55+
tokio::task::spawn(async move {
56+
srv.serve(&lis2_clone).await;
57+
println!("serve returned for listener 2!");
58+
});
59+
60+
// Spawn the third server.
61+
let lis3 = inmemory::Listener::new();
62+
let mut srv = grpc::server::Server::new();
63+
srv.set_handler(Handler { id: lis3.id() });
64+
let lis3_clone = lis3.clone();
65+
tokio::task::spawn(async move {
66+
srv.serve(&lis3_clone).await;
67+
println!("serve returned for listener 3!");
68+
});
69+
70+
let target = String::from("inmemory:///dummy");
71+
println!("Creating channel for {target}");
72+
let chan_opts = ChannelOptions::default();
73+
let chan = grpc::client::Channel::new(target.as_str(), None, chan_opts);
74+
75+
let outbound = async_stream::stream! {
76+
yield Box::new(MyReqMessage("My Request 1".to_string())) as Box<dyn Message>;
77+
yield Box::new(MyReqMessage("My Request 2".to_string()));
78+
yield Box::new(MyReqMessage("My Request 3".to_string()));
79+
};
80+
81+
let req = Request::new(Box::pin(outbound));
82+
let res = chan.call("/some/method".to_string(), req).await;
83+
let mut res = res.into_inner();
84+
85+
while let Some(resp) = res.next().await {
86+
println!(
87+
"CALL RESPONSE: {}",
88+
(resp.unwrap() as Box<dyn Any>)
89+
.downcast_ref::<MyResMessage>()
90+
.unwrap()
91+
.0,
92+
);
93+
}
94+
95+
lis1.close().await;
96+
lis2.close().await;
97+
lis3.close().await;
98+
}

grpc/src/attributes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@
2424

2525
/// A key-value store for arbitrary configuration data between multiple
2626
/// pluggable components.
27-
#[derive(Debug, Default, Clone)]
27+
#[derive(Debug, Default, Clone, PartialEq, PartialOrd, Eq, Ord)]
2828
pub struct Attributes;

0 commit comments

Comments
 (0)