|  | 
|  | 1 | +use crate::utils::{test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; | 
|  | 2 | +use scylla::client::session::Session; | 
|  | 3 | +use scylla::client::session_builder::SessionBuilder; | 
|  | 4 | +use scylla::frame::Compression; | 
|  | 5 | +use scylla::statement::query::Query; | 
|  | 6 | + | 
|  | 7 | +use scylla_proxy::{ | 
|  | 8 | +    Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, ShardAwareness, | 
|  | 9 | +    WorkerError, | 
|  | 10 | +}; | 
|  | 11 | +use std::sync::Arc; | 
|  | 12 | +use tokio::sync::mpsc; | 
|  | 13 | + | 
|  | 14 | +/// Tests the compression functionality of the Scylla driver by performing a series of operations | 
|  | 15 | +/// on a 3-node cluster with optional compression and verifying the total frame size of the requests. | 
|  | 16 | +/// | 
|  | 17 | +/// # Arguments | 
|  | 18 | +/// | 
|  | 19 | +/// * `compression` - An optional `Compression` enum value specifying the type of compression to use. | 
|  | 20 | +/// * `text_size` - The size of the text to be inserted into the test table. | 
|  | 21 | +/// * `expected_frame_total_size_range` - A range specifying the expected total size of the frames. | 
|  | 22 | +/// | 
|  | 23 | +/// # Panics | 
|  | 24 | +/// | 
|  | 25 | +/// This function will panic if the total frame size does not fall within the expected range or if | 
|  | 26 | +/// any of the operations (such as creating keyspace, table, or inserting/querying data) fail. | 
|  | 27 | +async fn test_compression( | 
|  | 28 | +    compression: Option<Compression>, | 
|  | 29 | +    text_size: usize, | 
|  | 30 | +    expected_frame_total_size_range: std::ops::Range<usize>, | 
|  | 31 | +) { | 
|  | 32 | +    let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { | 
|  | 33 | + | 
|  | 34 | +        let request_rule = |tx| { | 
|  | 35 | +            RequestRule( | 
|  | 36 | +                    Condition::or(Condition::RequestOpcode(RequestOpcode::Query), | 
|  | 37 | +                        Condition::RequestOpcode(RequestOpcode::Execute)).and( | 
|  | 38 | +                    Condition::not(Condition::ConnectionRegisteredAnyEvent)), | 
|  | 39 | +                RequestReaction::noop().with_feedback_when_performed(tx), | 
|  | 40 | +            ) | 
|  | 41 | +        }; | 
|  | 42 | + | 
|  | 43 | +        let (request_tx, mut request_rx) = mpsc::unbounded_channel(); | 
|  | 44 | +        for running_node in running_proxy.running_nodes.iter_mut() { | 
|  | 45 | +            running_node.change_request_rules(Some(vec![request_rule(request_tx.clone())])); | 
|  | 46 | +        } | 
|  | 47 | + | 
|  | 48 | +        let session: Session = SessionBuilder::new() | 
|  | 49 | +            .known_node(proxy_uris[0].as_str()) | 
|  | 50 | +            .address_translator(Arc::new(translation_map)) | 
|  | 51 | +            .compression(compression) | 
|  | 52 | +            .build() | 
|  | 53 | +            .await | 
|  | 54 | +            .unwrap(); | 
|  | 55 | + | 
|  | 56 | +        let ks = unique_keyspace_name(); | 
|  | 57 | +        session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); | 
|  | 58 | +        session.use_keyspace(ks, false).await.unwrap(); | 
|  | 59 | +        session | 
|  | 60 | +            .ddl("CREATE TABLE test (k text PRIMARY KEY, t text, i int, f float)") | 
|  | 61 | +            .await | 
|  | 62 | +            .unwrap(); | 
|  | 63 | + | 
|  | 64 | +        let q = Query::from("INSERT INTO test (k, t, i, f) VALUES (?, ?, ?, ?)"); | 
|  | 65 | +        let large_string = "a".repeat(text_size); | 
|  | 66 | +        session.query_unpaged(q.clone(), ("key", large_string.as_str(), 42_i32, 24.03_f32)).await.unwrap(); | 
|  | 67 | + | 
|  | 68 | +        let result: Vec<(String, String, i32, f32)> = session | 
|  | 69 | +            .query_unpaged("SELECT k, t, i, f FROM test WHERE k = 'key'", &[]) | 
|  | 70 | +            .await | 
|  | 71 | +            .unwrap() | 
|  | 72 | +            .into_rows_result() | 
|  | 73 | +            .unwrap() | 
|  | 74 | +            .rows::<(String, String, i32, f32)>() | 
|  | 75 | +            .unwrap() | 
|  | 76 | +            .collect::<Result<_, _>>() | 
|  | 77 | +            .unwrap(); | 
|  | 78 | + | 
|  | 79 | +        assert_eq!(result, vec![(String::from("key"), large_string, 42_i32, 24.03_f32)]); | 
|  | 80 | + | 
|  | 81 | + | 
|  | 82 | +        let mut total_frame_size = 0; | 
|  | 83 | +        while let Ok((request_frame, _shard)) = request_rx.try_recv() { | 
|  | 84 | +            total_frame_size += request_frame.body.len(); | 
|  | 85 | +        } | 
|  | 86 | +        println!("Total frame size: {}", total_frame_size); | 
|  | 87 | +        assert!(expected_frame_total_size_range.contains(&total_frame_size)); | 
|  | 88 | + | 
|  | 89 | +        running_proxy | 
|  | 90 | + | 
|  | 91 | +    }).await; | 
|  | 92 | + | 
|  | 93 | +    match res { | 
|  | 94 | +        Ok(()) => (), | 
|  | 95 | +        Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), | 
|  | 96 | +        Err(err) => panic!("{}", err), | 
|  | 97 | +    } | 
|  | 98 | +} | 
|  | 99 | + | 
|  | 100 | +#[tokio::test] | 
|  | 101 | +#[cfg(not(scylla_cloud_tests))] | 
|  | 102 | +async fn should_execute_queries_without_compression() { | 
|  | 103 | +    test_compression(None, 1_000, 1_000..3_000).await; | 
|  | 104 | +} | 
|  | 105 | + | 
|  | 106 | +#[tokio::test] | 
|  | 107 | +#[cfg(not(scylla_cloud_tests))] | 
|  | 108 | +async fn should_execute_queries_without_compression_10mb() { | 
|  | 109 | +    test_compression(None, 1_000_000, 1_000_000..1_002_000).await; | 
|  | 110 | +} | 
|  | 111 | + | 
|  | 112 | +#[tokio::test] | 
|  | 113 | +#[cfg(not(scylla_cloud_tests))] | 
|  | 114 | +async fn should_execute_queries_with_snappy_compression() { | 
|  | 115 | +    test_compression(Some(Compression::Snappy), 1_000, 1_000..2_000).await; | 
|  | 116 | +} | 
|  | 117 | + | 
|  | 118 | +#[tokio::test] | 
|  | 119 | +#[cfg(not(scylla_cloud_tests))] | 
|  | 120 | +async fn should_execute_queries_with_snappy_compression_10mb() { | 
|  | 121 | +    test_compression(Some(Compression::Snappy), 1_000_000, 45_000..50_000).await; | 
|  | 122 | +} | 
|  | 123 | + | 
|  | 124 | +#[tokio::test] | 
|  | 125 | +#[cfg(not(scylla_cloud_tests))] | 
|  | 126 | +async fn should_execute_queries_with_lz4_compression() { | 
|  | 127 | +    test_compression(Some(Compression::Lz4), 1_000, 1_000..2_000).await; | 
|  | 128 | +} | 
|  | 129 | + | 
|  | 130 | +#[tokio::test] | 
|  | 131 | +#[cfg(not(scylla_cloud_tests))] | 
|  | 132 | +async fn should_execute_queries_with_lz4_compression_10mb() { | 
|  | 133 | +    test_compression(Some(Compression::Lz4), 1_000_000, 5_000..10_000).await; | 
|  | 134 | +} | 
0 commit comments