@@ -21,10 +21,11 @@ mod common;
2121async fn serve (
2222 request : Request < Incoming > ,
2323 compression : Compression ,
24+ use_rbwnat : bool ,
2425) -> Response < impl Body < Data = Bytes , Error = Infallible > > {
2526 common:: skip_incoming ( request) . await ;
2627
27- let write_schema = async move {
28+ let maybe_schema = if use_rbwnat {
2829 let schema = vec ! [
2930 Column :: new( "a" . to_string( ) , DataTypeNode :: UInt64 ) ,
3031 Column :: new( "b" . to_string( ) , DataTypeNode :: Int64 ) ,
@@ -42,12 +43,15 @@ async fn serve(
4243 _ => unreachable ! ( ) ,
4344 } ;
4445
45- Ok ( Frame :: data ( buffer) )
46+ Some ( buffer)
47+ } else {
48+ None
4649 } ;
4750
48- let chunk = prepare_chunk ( ) ;
49- let stream =
50- stream:: once ( write_schema) . chain ( stream:: repeat ( chunk) . map ( |chunk| Ok ( Frame :: data ( chunk) ) ) ) ;
51+ let stream = stream:: iter ( maybe_schema)
52+ . chain ( stream:: repeat ( prepare_chunk ( ) ) )
53+ . map ( |chunk| Ok ( Frame :: data ( chunk) ) ) ;
54+
5155 Response :: new ( StreamBody :: new ( stream) )
5256}
5357
@@ -75,8 +79,8 @@ fn prepare_chunk() -> Bytes {
7579const ADDR : SocketAddr = SocketAddr :: V4 ( SocketAddrV4 :: new ( Ipv4Addr :: LOCALHOST , 6523 ) ) ;
7680
7781fn select ( c : & mut Criterion ) {
78- async fn start_server ( compression : Compression ) -> common:: ServerHandle {
79- common:: start_server ( ADDR , move |req| serve ( req, compression) ) . await
82+ async fn start_server ( compression : Compression , use_rbwnat : bool ) -> common:: ServerHandle {
83+ common:: start_server ( ADDR , move |req| serve ( req, compression, use_rbwnat ) ) . await
8084 }
8185
8286 let runner = common:: start_runner ( ) ;
@@ -89,8 +93,16 @@ fn select(c: &mut Criterion) {
8993 d : u32 ,
9094 }
9195
92- async fn select_rows ( client : Client , iters : u64 , compression : Compression ) -> Result < Duration > {
93- let _server = start_server ( compression) . await ;
96+ async fn select_rows (
97+ client : Client ,
98+ iters : u64 ,
99+ compression : Compression ,
100+ validation : bool ,
101+ ) -> Result < Duration > {
102+ let client = client
103+ . with_compression ( compression)
104+ . with_validation ( validation) ;
105+ let _server = start_server ( compression, validation) . await ;
94106
95107 let mut sum = SomeRow :: default ( ) ;
96108 let start = Instant :: now ( ) ;
@@ -119,7 +131,8 @@ fn select(c: &mut Criterion) {
119131 min_size : u64 ,
120132 compression : Compression ,
121133 ) -> Result < Duration > {
122- let _server = start_server ( compression) . await ;
134+ let client = client. with_compression ( compression) ;
135+ let _server = start_server ( compression, false ) . await ;
123136
124137 let start = Instant :: now ( ) ;
125138 let mut cursor = client
@@ -137,23 +150,30 @@ fn select(c: &mut Criterion) {
137150
138151 let mut group = c. benchmark_group ( "rows" ) ;
139152 group. throughput ( Throughput :: Bytes ( size_of :: < SomeRow > ( ) as u64 ) ) ;
140- group. bench_function ( "uncompressed" , |b| {
153+ group. bench_function ( "validation=off/ uncompressed" , |b| {
141154 b. iter_custom ( |iters| {
142- let compression = Compression :: None ;
143- let client = Client :: default ( )
144- . with_url ( format ! ( "http://{ADDR}" ) )
145- . with_compression ( compression) ;
146- runner. run ( select_rows ( client, iters, compression) )
155+ let client = Client :: default ( ) . with_url ( format ! ( "http://{ADDR}" ) ) ;
156+ runner. run ( select_rows ( client, iters, Compression :: None , false ) )
147157 } )
148158 } ) ;
149159 #[ cfg( feature = "lz4" ) ]
150- group. bench_function ( "lz4" , |b| {
160+ group. bench_function ( "validation=off/lz4" , |b| {
161+ b. iter_custom ( |iters| {
162+ let client = Client :: default ( ) . with_url ( format ! ( "http://{ADDR}" ) ) ;
163+ runner. run ( select_rows ( client, iters, Compression :: Lz4 , false ) )
164+ } )
165+ } ) ;
166+ group. bench_function ( "validation=on/uncompressed" , |b| {
167+ b. iter_custom ( |iters| {
168+ let client = Client :: default ( ) . with_url ( format ! ( "http://{ADDR}" ) ) ;
169+ runner. run ( select_rows ( client, iters, Compression :: None , true ) )
170+ } )
171+ } ) ;
172+ #[ cfg( feature = "lz4" ) ]
173+ group. bench_function ( "validation=on/lz4" , |b| {
151174 b. iter_custom ( |iters| {
152- let compression = Compression :: Lz4 ;
153- let client = Client :: default ( )
154- . with_url ( format ! ( "http://{ADDR}" ) )
155- . with_compression ( compression) ;
156- runner. run ( select_rows ( client, iters, compression) )
175+ let client = Client :: default ( ) . with_url ( format ! ( "http://{ADDR}" ) ) ;
176+ runner. run ( select_rows ( client, iters, Compression :: Lz4 , true ) )
157177 } )
158178 } ) ;
159179 group. finish ( ) ;
@@ -163,21 +183,15 @@ fn select(c: &mut Criterion) {
163183 group. throughput ( Throughput :: Bytes ( MIB ) ) ;
164184 group. bench_function ( "uncompressed" , |b| {
165185 b. iter_custom ( |iters| {
166- let compression = Compression :: None ;
167- let client = Client :: default ( )
168- . with_url ( format ! ( "http://{ADDR}" ) )
169- . with_compression ( compression) ;
170- runner. run ( select_bytes ( client, iters * MIB , compression) )
186+ let client = Client :: default ( ) . with_url ( format ! ( "http://{ADDR}" ) ) ;
187+ runner. run ( select_bytes ( client, iters * MIB , Compression :: None ) )
171188 } )
172189 } ) ;
173190 #[ cfg( feature = "lz4" ) ]
174191 group. bench_function ( "lz4" , |b| {
175192 b. iter_custom ( |iters| {
176- let compression = Compression :: None ;
177- let client = Client :: default ( )
178- . with_url ( format ! ( "http://{ADDR}" ) )
179- . with_compression ( compression) ;
180- runner. run ( select_bytes ( client, iters * MIB , compression) )
193+ let client = Client :: default ( ) . with_url ( format ! ( "http://{ADDR}" ) ) ;
194+ runner. run ( select_bytes ( client, iters * MIB , Compression :: Lz4 ) )
181195 } )
182196 } ) ;
183197 group. finish ( ) ;
0 commit comments