@@ -3,7 +3,7 @@ use super::execution_profile::ExecutionProfile;
33use super :: session:: Session ;
44use super :: session_builder:: SessionBuilder ;
55use crate as scylla;
6- use crate :: batch:: { Batch , BatchStatement } ;
6+ use crate :: batch:: { Batch , BatchStatement , BatchType } ;
77use crate :: cluster:: metadata:: Strategy :: NetworkTopologyStrategy ;
88use crate :: cluster:: metadata:: { CollectionType , ColumnKind , CqlType , NativeType , UserDefinedType } ;
99use crate :: deserialize:: DeserializeOwnedValue ;
@@ -32,7 +32,7 @@ use scylla_cql::types::serialize::value::SerializeValue;
3232use std:: collections:: { BTreeMap , HashMap } ;
3333use std:: collections:: { BTreeSet , HashSet } ;
3434use std:: sync:: atomic:: { AtomicBool , Ordering } ;
35- use std:: sync:: Arc ;
35+ use std:: sync:: { Arc , Mutex } ;
3636use tokio:: net:: TcpListener ;
3737use uuid:: Uuid ;
3838
@@ -1327,6 +1327,85 @@ async fn test_timestamp() {
13271327 assert_eq ! ( results, expected_results) ;
13281328}
13291329
1330+ #[ tokio:: test]
1331+ async fn test_timestamp_generator ( ) {
1332+ use crate :: policies:: timestamp_generator:: TimestampGenerator ;
1333+ use rand:: random;
1334+
1335+ setup_tracing ( ) ;
1336+ struct LocalTimestampGenerator {
1337+ generated_timestamps : Arc < Mutex < HashSet < i64 > > > ,
1338+ }
1339+
1340+ impl TimestampGenerator for LocalTimestampGenerator {
1341+ fn next_timestamp ( & self ) -> i64 {
1342+ let timestamp = random :: < i64 > ( ) . abs ( ) ;
1343+ self . generated_timestamps . lock ( ) . unwrap ( ) . insert ( timestamp) ;
1344+ timestamp
1345+ }
1346+ }
1347+
1348+ let timestamps = Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ;
1349+ let generator = LocalTimestampGenerator {
1350+ generated_timestamps : timestamps. clone ( ) ,
1351+ } ;
1352+
1353+ let session = create_new_session_builder ( )
1354+ . timestamp_generator ( Arc :: new ( generator) )
1355+ . build ( )
1356+ . await
1357+ . unwrap ( ) ;
1358+ let ks = unique_keyspace_name ( ) ;
1359+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
1360+ session
1361+ . ddl ( format ! (
1362+ "CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)" ,
1363+ ks
1364+ ) )
1365+ . await
1366+ . unwrap ( ) ;
1367+ let prepared = session
1368+ . prepare ( format ! (
1369+ "INSERT INTO {}.t_generator (a, b) VALUES (1, 1)" ,
1370+ ks
1371+ ) )
1372+ . await
1373+ . unwrap ( ) ;
1374+ session. execute_unpaged ( & prepared, [ ] ) . await . unwrap ( ) ;
1375+ let unprepared = Query :: new ( format ! (
1376+ "INSERT INTO {}.t_generator (a, b) VALUES (2, 2)" ,
1377+ ks
1378+ ) ) ;
1379+ session. query_unpaged ( unprepared, [ ] ) . await . unwrap ( ) ;
1380+ let mut batch = Batch :: new ( BatchType :: Unlogged ) ;
1381+ let stmt = session
1382+ . prepare ( format ! (
1383+ "INSERT INTO {}.t_generator (a, b) VALUES (3, 3)" ,
1384+ ks
1385+ ) )
1386+ . await
1387+ . unwrap ( ) ;
1388+ batch. append_statement ( stmt) ;
1389+ session. batch ( & batch, & ( ( ) , ) ) . await . unwrap ( ) ;
1390+
1391+ let query_rows_result = session
1392+ . query_unpaged (
1393+ format ! ( "SELECT a, b, WRITETIME(b) FROM {}.t_generator" , ks) ,
1394+ & [ ] ,
1395+ )
1396+ . await
1397+ . unwrap ( )
1398+ . into_rows_result ( )
1399+ . unwrap ( ) ;
1400+
1401+ let timestamps_locked = timestamps. lock ( ) . unwrap ( ) ;
1402+ assert ! ( query_rows_result
1403+ . rows:: <( i32 , i32 , i64 ) >( )
1404+ . unwrap( )
1405+ . map( |row_result| row_result. unwrap( ) )
1406+ . all( |( _a, _b, writetime) | timestamps_locked. contains( & writetime) ) ) ;
1407+ }
1408+
13301409#[ ignore = "works on remote Scylla instances only (local ones are too fast)" ]
13311410#[ tokio:: test]
13321411async fn test_request_timeout ( ) {
0 commit comments