Skip to content

Commit 4f39945

Browse files
authored
Update tokio-rustls & rustls-pemfile (#269)
* tokio-rustls: 0.24.1 -> 0.26.1 rustls::client::builder::ConfigBuilder accepts directly CertificateDer and PrivateKeyDer. replace with_safe_defaults * rustls-pemfile: 1.0.4 -> 2.2.0
1 parent 6bb33c9 commit 4f39945

File tree

3 files changed

+102
-65
lines changed

3 files changed

+102
-65
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ members = [
2121

2222

2323
[dependencies]
24-
tokio-rustls = {version="0.24.1", features=["dangerous_configuration"]}
25-
rustls-pemfile = "1.0.4"
24+
tokio-rustls = { version = "0.26.1" }
25+
rustls-pemfile = "2.2.0"
2626
rabbitmq-stream-protocol = { version = "0.7", path = "protocol" }
2727
tokio = { version = "1.29.1", features = ["full"] }
2828
tokio-util = { version = "0.7.3", features = ["codec"] }

src/client/mod.rs

Lines changed: 67 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ use pin_project::pin_project;
1919
use rabbitmq_stream_protocol::commands::exchange_command_versions::{
2020
ExchangeCommandVersionsRequest, ExchangeCommandVersionsResponse,
2121
};
22-
use rustls::PrivateKey;
23-
use rustls::ServerName;
2422
use std::{fs::File, io::BufReader, path::Path};
2523
use tokio::io::AsyncRead;
2624
use tokio::io::AsyncWrite;
2725
use tokio::io::ReadBuf;
2826
use tokio::sync::RwLock;
2927
use tokio::{net::TcpStream, sync::Notify};
3028
use tokio_rustls::client::TlsStream;
29+
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
3130
use tokio_rustls::rustls::ClientConfig;
3231
use tokio_rustls::{rustls, TlsConnector};
3332

@@ -547,7 +546,7 @@ impl Client {
547546
}
548547

549548
let connector = TlsConnector::from(Arc::new(config));
550-
let domain = ServerName::try_from(broker.host.as_str()).unwrap();
549+
let domain = ServerName::try_from(broker.host.clone()).unwrap();
551550
let conn = connector.connect(domain, stream).await?;
552551
GenericTcpStream::SecureTcp(conn)
553552
} else {
@@ -765,29 +764,32 @@ impl Client {
765764
let mut roots = rustls::RootCertStore::empty();
766765
let cert_bytes = std::fs::read(root_ca_cert.unwrap());
767766

768-
let root_cert_store = rustls_pemfile::certs(&mut cert_bytes.unwrap().as_ref()).unwrap();
767+
let root_cert_store: Result<Vec<_>, _> =
768+
rustls_pemfile::certs(&mut cert_bytes.unwrap().as_ref()).collect();
769+
let root_cert_store = root_cert_store.unwrap();
769770

770771
root_cert_store
771-
.iter()
772-
.for_each(|cert| roots.add(&rustls::Certificate(cert.to_vec())).unwrap());
772+
.into_iter()
773+
.for_each(|cert| roots.add(cert).unwrap());
773774
Ok(roots)
774775
}
775776

776777
async fn build_client_certificates(
777778
client_cert: &Path,
778-
) -> std::io::Result<Vec<rustls::Certificate>> {
779+
) -> std::io::Result<Vec<CertificateDer<'static>>> {
779780
let mut pem = BufReader::new(File::open(client_cert)?);
780-
let certs = rustls_pemfile::certs(&mut pem)?;
781-
let certs = certs.into_iter().map(rustls::Certificate);
782-
Ok(certs.collect())
781+
rustls_pemfile::certs(&mut pem)
782+
.map(|c| c.map(CertificateDer::into_owned))
783+
.collect()
783784
}
784785

785786
async fn build_client_private_keys(
786787
client_private_key: &Path,
787-
) -> std::io::Result<Vec<PrivateKey>> {
788+
) -> std::io::Result<Vec<PrivateKeyDer<'static>>> {
788789
let mut pem = BufReader::new(File::open(client_private_key)?);
789-
let keys = rustls_pemfile::pkcs8_private_keys(&mut pem)?;
790-
let keys = keys.into_iter().map(PrivateKey);
790+
let keys: Result<Vec<_>, _> = rustls_pemfile::pkcs8_private_keys(&mut pem).collect();
791+
let keys = keys?;
792+
let keys = keys.into_iter().map(PrivateKeyDer::from);
791793
Ok(keys.collect())
792794
}
793795

@@ -800,7 +802,6 @@ impl Client {
800802

801803
if client_certificate_path.is_empty() {
802804
config = ClientConfig::builder()
803-
.with_safe_defaults()
804805
.with_root_certificates(roots.clone())
805806
.with_no_client_auth();
806807
} else {
@@ -812,7 +813,6 @@ impl Client {
812813
Self::build_client_private_keys(Path::new(&broker.tls.get_client_keys_path()))
813814
.await?;
814815
config = ClientConfig::builder()
815-
.with_safe_defaults()
816816
.with_root_certificates(roots.clone())
817817
.with_client_auth_cert(client_certs, client_keys.into_iter().next().unwrap())
818818
.unwrap();
@@ -823,30 +823,70 @@ impl Client {
823823

824824
async fn build_tls_client_configuration_untrusted() -> Result<ClientConfig, ClientError> {
825825
mod danger {
826-
use std::time::SystemTime;
827-
828-
use tokio_rustls::rustls;
829-
use tokio_rustls::rustls::client::{ServerCertVerified, ServerCertVerifier};
826+
use rustls::client::danger::HandshakeSignatureValid;
827+
use rustls::client::danger::ServerCertVerified;
828+
use tokio_rustls::rustls::{
829+
self, client::danger::ServerCertVerifier, pki_types::ServerName,
830+
};
830831

832+
#[derive(Debug)]
831833
pub struct NoCertificateVerification {}
832834

833835
impl ServerCertVerifier for NoCertificateVerification {
836+
fn verify_tls12_signature(
837+
&self,
838+
_: &[u8],
839+
_: &rustls::pki_types::CertificateDer<'_>,
840+
_: &rustls::DigitallySignedStruct,
841+
) -> Result<HandshakeSignatureValid, rustls::Error> {
842+
Ok(HandshakeSignatureValid::assertion())
843+
}
844+
845+
fn verify_tls13_signature(
846+
&self,
847+
_: &[u8],
848+
_: &rustls::pki_types::CertificateDer<'_>,
849+
_: &rustls::DigitallySignedStruct,
850+
) -> Result<HandshakeSignatureValid, rustls::Error> {
851+
Ok(HandshakeSignatureValid::assertion())
852+
}
853+
854+
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
855+
use rustls::SignatureScheme;
856+
// I know know if this is correct
857+
vec![
858+
SignatureScheme::RSA_PKCS1_SHA1,
859+
SignatureScheme::ECDSA_SHA1_Legacy,
860+
SignatureScheme::RSA_PKCS1_SHA256,
861+
SignatureScheme::ECDSA_NISTP256_SHA256,
862+
SignatureScheme::RSA_PKCS1_SHA384,
863+
SignatureScheme::ECDSA_NISTP384_SHA384,
864+
SignatureScheme::RSA_PKCS1_SHA512,
865+
SignatureScheme::ECDSA_NISTP521_SHA512,
866+
SignatureScheme::RSA_PSS_SHA256,
867+
SignatureScheme::RSA_PSS_SHA384,
868+
SignatureScheme::RSA_PSS_SHA512,
869+
SignatureScheme::ED25519,
870+
SignatureScheme::ED448,
871+
]
872+
}
873+
834874
fn verify_server_cert(
835875
&self,
836-
_end_entity: &rustls::Certificate,
837-
_intermediates: &[rustls::Certificate],
838-
_server_name: &rustls::ServerName,
839-
_scts: &mut dyn Iterator<Item = &[u8]>,
840-
_ocsp_response: &[u8],
841-
_now: SystemTime,
842-
) -> Result<ServerCertVerified, rustls::Error> {
876+
_: &rustls::pki_types::CertificateDer<'_>,
877+
_: &[rustls::pki_types::CertificateDer<'_>],
878+
_: &ServerName<'_>,
879+
_: &[u8],
880+
_: rustls::pki_types::UnixTime,
881+
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error>
882+
{
843883
Ok(ServerCertVerified::assertion())
844884
}
845885
}
846886
}
847887

848888
let config = ClientConfig::builder()
849-
.with_safe_defaults()
889+
.dangerous()
850890
.with_custom_certificate_verifier(Arc::new(danger::NoCertificateVerification {}))
851891
.with_no_client_auth();
852892

tests/integration/consumer_test.rs

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ use rabbitmq_stream_client::{
1111
types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer},
1212
Consumer, FilterConfiguration, NoDedup, Producer,
1313
};
14-
use std::collections::HashMap;
1514

1615
use crate::producer_test::routing_key_strategy_value_extractor;
17-
use rabbitmq_stream_client::types::MessageContext;
1816
use rabbitmq_stream_client::types::{
1917
HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy,
2018
};
@@ -97,15 +95,15 @@ async fn super_stream_consumer_test() {
9795
for n in 0..message_count {
9896
let msg = Message::builder().body(format!("message{}", n)).build();
9997
let _ = super_stream_producer
100-
.send(msg, |confirmation_status| async move {})
98+
.send(msg, |_confirmation_status| async move {})
10199
.await
102100
.unwrap();
103101
}
104102

105103
let mut received_messages = 0;
106104
let handle = super_stream_consumer.handle();
107105

108-
while let _ = super_stream_consumer.next().await.unwrap() {
106+
while let Some(_) = super_stream_consumer.next().await {
109107
received_messages = received_messages + 1;
110108
if received_messages == 10 {
111109
break;
@@ -499,7 +497,6 @@ async fn consumer_test_with_filtering() {
499497
#[tokio::test(flavor = "multi_thread")]
500498
async fn super_stream_consumer_test_with_filtering() {
501499
let env = TestEnvironment::create_super_stream().await;
502-
let reference: String = Faker.fake();
503500

504501
let message_count = 10;
505502
let mut super_stream_producer = env
@@ -722,12 +719,12 @@ async fn super_stream_single_active_consumer_test() {
722719
for n in 0..message_count {
723720
let msg = Message::builder().body(format!("message{}", n)).build();
724721
let _ = super_stream_producer
725-
.send(msg, |confirmation_status| async move {})
722+
.send(msg, |_confirmation_status| async move {})
726723
.await
727724
.unwrap();
728725
}
729726

730-
let mut received_messages = Arc::new(AtomicU32::new(1));
727+
let received_messages = Arc::new(AtomicU32::new(1));
731728
let handle_consumer_1 = super_stream_consumer.handle();
732729
let handle_consumer_2 = super_stream_consumer_2.handle();
733730
let handle_consumer_3 = super_stream_consumer_3.handle();
@@ -737,7 +734,7 @@ async fn super_stream_single_active_consumer_test() {
737734
task::spawn(async move {
738735
let received_messages_int = received_message_outer.clone();
739736
let notify_received_messages_inner = notify_received_messages_outer.clone();
740-
while let _ = super_stream_consumer.next().await.unwrap() {
737+
while let Some(_) = super_stream_consumer.next().await {
741738
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
742739
if message_count == rec_msg {
743740
notify_received_messages_inner.notify_one();
@@ -751,7 +748,7 @@ async fn super_stream_single_active_consumer_test() {
751748
task::spawn(async move {
752749
let received_messages_int = received_message_outer.clone();
753750
let notify_received_messages_inner = notify_received_messages_outer.clone();
754-
while let _ = super_stream_consumer_2.next().await.unwrap() {
751+
while let Some(_) = super_stream_consumer_2.next().await {
755752
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
756753
if message_count == rec_msg {
757754
notify_received_messages_inner.notify_one();
@@ -765,7 +762,7 @@ async fn super_stream_single_active_consumer_test() {
765762
task::spawn(async move {
766763
let received_messages_int = received_message_outer.clone();
767764
let notify_received_messages_inner = notify_received_messages_outer.clone();
768-
while let _ = super_stream_consumer_3.next().await.unwrap() {
765+
while let Some(_) = super_stream_consumer_3.next().await {
769766
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
770767
if message_count == rec_msg {
771768
notify_received_messages_inner.notify_one();
@@ -804,31 +801,31 @@ async fn super_stream_single_active_consumer_test_with_callback() {
804801

805802
let notify_received_messages = Arc::new(Notify::new());
806803

807-
let mut result_stream_name_1 = Arc::new(Mutex::new(String::from("")));
808-
let mut result_stream_name_2 = Arc::new(Mutex::new(String::from("")));
809-
let mut result_stream_name_3 = Arc::new(Mutex::new(String::from("")));
804+
let result_stream_name_1 = Arc::new(Mutex::new(String::from("")));
805+
let result_stream_name_2 = Arc::new(Mutex::new(String::from("")));
806+
let result_stream_name_3 = Arc::new(Mutex::new(String::from("")));
810807

811-
let mut result_name_1 = Arc::new(Mutex::new(String::from("")));
812-
let mut result_name_2 = Arc::new(Mutex::new(String::from("")));
813-
let mut result_name_3 = Arc::new(Mutex::new(String::from("")));
808+
let result_name_1 = Arc::new(Mutex::new(String::from("")));
809+
let result_name_2 = Arc::new(Mutex::new(String::from("")));
810+
let result_name_3 = Arc::new(Mutex::new(String::from("")));
814811

815-
let mut result_stream_name_outer = result_stream_name_1.clone();
816-
let mut result_stream_name_2_outer = result_stream_name_2.clone();
817-
let mut result_stream_name_3_outer = result_stream_name_3.clone();
812+
let result_stream_name_outer = result_stream_name_1.clone();
813+
let result_stream_name_2_outer = result_stream_name_2.clone();
814+
let result_stream_name_3_outer = result_stream_name_3.clone();
818815

819-
let mut result_name_1_outer = result_name_1.clone();
820-
let mut result_name_2_outer = result_name_2.clone();
821-
let mut result_name_3_outer = result_name_3.clone();
816+
let result_name_1_outer = result_name_1.clone();
817+
let result_name_2_outer = result_name_2.clone();
818+
let result_name_3_outer = result_name_3.clone();
822819

823820
let mut super_stream_consumer: SuperStreamConsumer = env
824821
.env
825822
.super_stream_consumer()
826823
.name(super_stream_consumer_name)
827824
.enable_single_active_consumer(true)
828825
.offset(OffsetSpecification::First)
829-
.consumer_update(move |active, message_context| {
830-
let mut result_stream_name_int = result_stream_name_outer.clone();
831-
let mut result_consumer_name_int = result_name_1_outer.clone();
826+
.consumer_update(move |_active, message_context| {
827+
let result_stream_name_int = result_stream_name_outer.clone();
828+
let result_consumer_name_int = result_name_1_outer.clone();
832829
async move {
833830
*result_stream_name_int.lock().unwrap() = message_context.stream().clone();
834831
*result_consumer_name_int.lock().unwrap() = message_context.name().clone();
@@ -846,9 +843,9 @@ async fn super_stream_single_active_consumer_test_with_callback() {
846843
.name("super-stream-with-sac-enabled")
847844
.enable_single_active_consumer(true)
848845
.offset(OffsetSpecification::First)
849-
.consumer_update(move |active, message_context| {
850-
let mut result_stream_name_int = result_stream_name_2_outer.clone();
851-
let mut result_consumer_name_int = result_name_2_outer.clone();
846+
.consumer_update(move |_active, message_context| {
847+
let result_stream_name_int = result_stream_name_2_outer.clone();
848+
let result_consumer_name_int = result_name_2_outer.clone();
852849
async move {
853850
*result_stream_name_int.lock().unwrap() = message_context.stream().clone();
854851
*result_consumer_name_int.lock().unwrap() = message_context.name().clone();
@@ -865,9 +862,9 @@ async fn super_stream_single_active_consumer_test_with_callback() {
865862
.name("super-stream-with-sac-enabled")
866863
.enable_single_active_consumer(true)
867864
.offset(OffsetSpecification::First)
868-
.consumer_update(move |active, message_context| {
869-
let mut result_stream_name_int = result_stream_name_3_outer.clone();
870-
let mut result_consumer_name_int = result_name_3_outer.clone();
865+
.consumer_update(move |_active, message_context| {
866+
let result_stream_name_int = result_stream_name_3_outer.clone();
867+
let result_consumer_name_int = result_name_3_outer.clone();
871868
async move {
872869
*result_stream_name_int.lock().unwrap() = message_context.stream().clone();
873870
*result_consumer_name_int.lock().unwrap() = message_context.name().clone();
@@ -881,12 +878,12 @@ async fn super_stream_single_active_consumer_test_with_callback() {
881878
for n in 0..message_count {
882879
let msg = Message::builder().body(format!("message{}", n)).build();
883880
let _ = super_stream_producer
884-
.send(msg, |confirmation_status| async move {})
881+
.send(msg, |_confirmation_status| async move {})
885882
.await
886883
.unwrap();
887884
}
888885

889-
let mut received_messages = Arc::new(AtomicU32::new(1));
886+
let received_messages = Arc::new(AtomicU32::new(1));
890887
let handle_consumer_1 = super_stream_consumer.handle();
891888
let handle_consumer_2 = super_stream_consumer_2.handle();
892889
let handle_consumer_3 = super_stream_consumer_3.handle();
@@ -896,7 +893,7 @@ async fn super_stream_single_active_consumer_test_with_callback() {
896893
task::spawn(async move {
897894
let received_messages_int = received_message_outer.clone();
898895
let notify_received_messages_inner = notify_received_messages_outer.clone();
899-
while let _ = super_stream_consumer.next().await.unwrap() {
896+
while let Some(_) = super_stream_consumer.next().await {
900897
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
901898
if message_count == rec_msg {
902899
notify_received_messages_inner.notify_one();
@@ -910,7 +907,7 @@ async fn super_stream_single_active_consumer_test_with_callback() {
910907
task::spawn(async move {
911908
let received_messages_int = received_message_outer.clone();
912909
let notify_received_messages_inner = notify_received_messages_outer.clone();
913-
while let _ = super_stream_consumer_2.next().await.unwrap() {
910+
while let Some(_) = super_stream_consumer_2.next().await {
914911
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
915912
if message_count == rec_msg {
916913
notify_received_messages_inner.notify_one();
@@ -924,7 +921,7 @@ async fn super_stream_single_active_consumer_test_with_callback() {
924921
task::spawn(async move {
925922
let received_messages_int = received_message_outer.clone();
926923
let notify_received_messages_inner = notify_received_messages_outer.clone();
927-
while let _ = super_stream_consumer_3.next().await.unwrap() {
924+
while let Some(_) = super_stream_consumer_3.next().await {
928925
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
929926
if message_count == rec_msg {
930927
notify_received_messages_inner.notify_one();

0 commit comments

Comments
 (0)