Skip to content

Commit f4b3658

Browse files
authored
Merge pull request #1461 from Lorak-mmk/eager-metadata-deserialization
Eager metadata deserialization
2 parents ff31720 + 0d86e59 commit f4b3658

File tree

9 files changed

+275
-77
lines changed

9 files changed

+275
-77
lines changed

scylla-cql/src/frame/frame_errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ pub enum CqlResultParseError {
235235
PreparedParseError(#[from] PreparedParseError),
236236
#[error("RESULT:Rows response deserialization failed: {0}")]
237237
RawRowsParseError(#[from] RawRowsAndPagingStateResponseParseError),
238+
#[error("RESULT:Rows result metadata response deserialization failed: {0}")]
239+
ResultMetadataParseError(#[from] ResultMetadataAndRowsCountParseError),
238240
}
239241

240242
#[non_exhaustive]

scylla-cql/src/frame/response/mod.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub use error::Error;
1313
pub use supported::Supported;
1414

1515
use crate::frame::TryFromPrimitiveError;
16+
use crate::frame::frame_errors::ResultMetadataAndRowsCountParseError;
1617
use crate::frame::protocol_features::ProtocolFeatures;
1718
use crate::frame::response::result::ResultMetadata;
1819

@@ -208,6 +209,28 @@ impl Response {
208209
Ok(response)
209210
}
210211

212+
pub fn deserialize_metadata(
213+
self,
214+
) -> Result<ResponseWithDeserializedMetadata, ResultMetadataAndRowsCountParseError> {
215+
let result = match self {
216+
Self::Error(e) => ResponseWithDeserializedMetadata::Error(e),
217+
Self::Ready => ResponseWithDeserializedMetadata::Ready,
218+
Self::Result(res) => {
219+
ResponseWithDeserializedMetadata::Result(res.deserialize_metadata()?)
220+
}
221+
Self::Authenticate(auth) => ResponseWithDeserializedMetadata::Authenticate(auth),
222+
Self::AuthSuccess(auth_succ) => {
223+
ResponseWithDeserializedMetadata::AuthSuccess(auth_succ)
224+
}
225+
Self::AuthChallenge(auth_chal) => {
226+
ResponseWithDeserializedMetadata::AuthChallenge(auth_chal)
227+
}
228+
Self::Supported(sup) => ResponseWithDeserializedMetadata::Supported(sup),
229+
Self::Event(eve) => ResponseWithDeserializedMetadata::Event(eve),
230+
};
231+
Ok(result)
232+
}
233+
211234
/// Converts this response into a `NonErrorResponse`, returning an error if it is an `Error` response.
212235
pub fn into_non_error_response(self) -> Result<NonErrorResponse, error::Error> {
213236
let non_error_response = match self {
@@ -225,6 +248,68 @@ impl Response {
225248
}
226249
}
227250

251+
/// A CQL response that has been received from the server.
252+
#[derive(Debug)]
253+
pub enum ResponseWithDeserializedMetadata {
254+
/// ERROR response, returned by the server when an error occurs.
255+
Error(Error),
256+
/// READY response, indicating that the server is ready to process requests,
257+
/// typically after a connection is established.
258+
Ready,
259+
/// RESULT response, containing the result of a statement execution.
260+
Result(result::ResultWithDeserializedMetadata),
261+
/// AUTHENTICATE response, indicating that the server requires authentication.
262+
Authenticate(authenticate::Authenticate),
263+
/// AUTH_SUCCESS response, indicating that the authentication was successful.
264+
AuthSuccess(authenticate::AuthSuccess),
265+
/// AUTH_CHALLENGE response, indicating that the server requires further authentication.
266+
AuthChallenge(authenticate::AuthChallenge),
267+
/// SUPPORTED response, containing the features supported by the server.
268+
Supported(Supported),
269+
/// EVENT response, containing an event that occurred on the server.
270+
Event(event::Event),
271+
}
272+
273+
impl ResponseWithDeserializedMetadata {
274+
/// Returns the kind of this response.
275+
pub fn to_response_kind(&self) -> CqlResponseKind {
276+
match self {
277+
Self::Error(_) => CqlResponseKind::Error,
278+
Self::Ready => CqlResponseKind::Ready,
279+
Self::Result(_) => CqlResponseKind::Result,
280+
Self::Authenticate(_) => CqlResponseKind::Authenticate,
281+
Self::AuthSuccess(_) => CqlResponseKind::AuthSuccess,
282+
Self::AuthChallenge(_) => CqlResponseKind::AuthChallenge,
283+
Self::Supported(_) => CqlResponseKind::Supported,
284+
Self::Event(_) => CqlResponseKind::Event,
285+
}
286+
}
287+
288+
/// Converts this response into a `NonErrorResponse`, returning an error if it is an `Error` response.
289+
pub fn into_non_error_response(
290+
self,
291+
) -> Result<NonErrorResponseWithDeserializedMetadata, error::Error> {
292+
let non_error_response = match self {
293+
Self::Error(e) => return Err(e),
294+
Self::Ready => NonErrorResponseWithDeserializedMetadata::Ready,
295+
Self::Result(res) => NonErrorResponseWithDeserializedMetadata::Result(res),
296+
Self::Authenticate(auth) => {
297+
NonErrorResponseWithDeserializedMetadata::Authenticate(auth)
298+
}
299+
Self::AuthSuccess(auth_succ) => {
300+
NonErrorResponseWithDeserializedMetadata::AuthSuccess(auth_succ)
301+
}
302+
Self::AuthChallenge(auth_chal) => {
303+
NonErrorResponseWithDeserializedMetadata::AuthChallenge(auth_chal)
304+
}
305+
Self::Supported(sup) => NonErrorResponseWithDeserializedMetadata::Supported(sup),
306+
Self::Event(eve) => NonErrorResponseWithDeserializedMetadata::Event(eve),
307+
};
308+
309+
Ok(non_error_response)
310+
}
311+
}
312+
228313
/// A CQL response that has been received from the server, excluding error responses.
229314
/// This is used to handle responses that are not errors, allowing for easier processing
230315
/// of valid responses without need to handle error case any later.
@@ -260,3 +345,41 @@ impl NonErrorResponse {
260345
}
261346
}
262347
}
348+
349+
/// A CQL response that has been received from the server, excluding error responses.
350+
/// This is used to handle responses that are not errors, allowing for easier processing
351+
/// of valid responses without need to handle error case any later.
352+
/// The difference from [NonErrorResponse] is that Result::Rows variant holds [result::DeserializedMetadataAndRawRows]
353+
/// instead of [result::RawMetadataAndRawRows].
354+
#[derive(Debug)]
355+
pub enum NonErrorResponseWithDeserializedMetadata {
356+
/// See [`Response::Ready`].
357+
Ready,
358+
/// See [`Response::Result`].
359+
Result(result::ResultWithDeserializedMetadata),
360+
/// See [`Response::Authenticate`].
361+
Authenticate(authenticate::Authenticate),
362+
/// See [`Response::AuthSuccess`].
363+
AuthSuccess(authenticate::AuthSuccess),
364+
/// See [`Response::AuthChallenge`].
365+
AuthChallenge(authenticate::AuthChallenge),
366+
/// See [`Response::Supported`].
367+
Supported(Supported),
368+
/// See [`Response::Event`].
369+
Event(event::Event),
370+
}
371+
372+
impl NonErrorResponseWithDeserializedMetadata {
373+
/// Returns the kind of this non-error response.
374+
pub fn to_response_kind(&self) -> CqlResponseKind {
375+
match self {
376+
Self::Ready => CqlResponseKind::Ready,
377+
Self::Result(_) => CqlResponseKind::Result,
378+
Self::Authenticate(_) => CqlResponseKind::Authenticate,
379+
Self::AuthSuccess(_) => CqlResponseKind::AuthSuccess,
380+
Self::AuthChallenge(_) => CqlResponseKind::AuthChallenge,
381+
Self::Supported(_) => CqlResponseKind::Supported,
382+
Self::Event(_) => CqlResponseKind::Event,
383+
}
384+
}
385+
}

scylla-cql/src/frame/response/result.rs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ impl<'a> ResultMetadata<'a> {
445445
/// of `ResultMetadata`:
446446
/// 1. owning it in a borrowed form, self-borrowed from the RESULT:Rows frame;
447447
/// 2. sharing ownership of metadata cached in PreparedStatement.
448-
#[derive(Debug)]
448+
#[derive(Debug, Clone)]
449449
pub enum ResultMetadataHolder {
450450
/// [ResultMetadata] that is self-borrowed from the RESULT:Rows frame.
451451
SelfBorrowed(SelfBorrowedMetadataContainer),
@@ -661,11 +661,12 @@ use super::custom_type_parser::CustomTypeParser;
661661
/// RESULT:Rows response, in partially serialized form.
662662
///
663663
/// Paging state and metadata are deserialized, rows remain serialized.
664-
#[derive(Debug)]
664+
#[derive(Debug, Clone)]
665665
pub struct DeserializedMetadataAndRawRows {
666666
metadata: ResultMetadataHolder,
667667
rows_count: usize,
668668
raw_rows: Bytes,
669+
raw_metadata_and_rows_bytes_size: usize,
669670
}
670671

671672
impl DeserializedMetadataAndRawRows {
@@ -695,6 +696,12 @@ impl DeserializedMetadataAndRawRows {
695696
self.raw_rows.len()
696697
}
697698

699+
/// Returns the serialized size of the raw metadata and raw rows.
700+
#[inline]
701+
pub fn metadata_and_rows_bytes_size(&self) -> usize {
702+
self.raw_metadata_and_rows_bytes_size
703+
}
704+
698705
/// Creates an empty [DeserializedMetadataAndRawRows].
699706
// Preferred to implementing Default, because users shouldn't be encouraged to create
700707
// empty DeserializedMetadataAndRawRows.
@@ -706,6 +713,7 @@ impl DeserializedMetadataAndRawRows {
706713
),
707714
rows_count: 0,
708715
raw_rows: Bytes::new(),
716+
raw_metadata_and_rows_bytes_size: 0,
709717
}
710718
}
711719

@@ -752,6 +760,47 @@ pub enum Result {
752760
SchemaChange(SchemaChange),
753761
}
754762

763+
impl Result {
764+
pub fn deserialize_metadata(
765+
self,
766+
) -> StdResult<ResultWithDeserializedMetadata, ResultMetadataAndRowsCountParseError> {
767+
let res = match self {
768+
Result::Void => ResultWithDeserializedMetadata::Void,
769+
Result::Rows((metadata, paging_state)) => ResultWithDeserializedMetadata::Rows((
770+
metadata.deserialize_metadata()?,
771+
paging_state,
772+
)),
773+
Result::SetKeyspace(set_keyspace) => {
774+
ResultWithDeserializedMetadata::SetKeyspace(set_keyspace)
775+
}
776+
Result::Prepared(prepared) => ResultWithDeserializedMetadata::Prepared(prepared),
777+
Result::SchemaChange(schema_change) => {
778+
ResultWithDeserializedMetadata::SchemaChange(schema_change)
779+
}
780+
};
781+
782+
Ok(res)
783+
}
784+
}
785+
786+
/// Represents the result of a CQL `RESULT` response.
787+
#[derive(Debug)]
788+
pub enum ResultWithDeserializedMetadata {
789+
/// A result with no associated data.
790+
Void,
791+
/// A result with metadata and rows.
792+
Rows((DeserializedMetadataAndRawRows, PagingStateResponse)),
793+
/// A result indicating that a keyspace was set as an effect
794+
/// of the executed request.
795+
SetKeyspace(SetKeyspace),
796+
/// A result indicating that a statement was prepared
797+
/// as an effect of the `PREPARE` request.
798+
Prepared(Prepared),
799+
/// A result indicating that a schema change occurred
800+
/// as an effect of the executed request.
801+
SchemaChange(SchemaChange),
802+
}
803+
755804
fn deser_type_generic<'frame, 'result, StrT: Into<Cow<'result, str>>>(
756805
buf: &mut &'frame [u8],
757806
read_string: fn(&mut &'frame [u8]) -> StdResult<StrT, LowLevelDeserializationError>,
@@ -1070,6 +1119,7 @@ impl RawMetadataAndRawRows {
10701119
pub fn deserialize_metadata(
10711120
self,
10721121
) -> StdResult<DeserializedMetadataAndRawRows, ResultMetadataAndRowsCountParseError> {
1122+
let raw_metadata_and_rows_bytes_size = self.metadata_and_rows_bytes_size();
10731123
let (metadata_deserialized, row_count_and_raw_rows) = match self.cached_metadata {
10741124
Some(cached) if self.no_metadata => {
10751125
// Server sent no metadata, but we have metadata cached. This means that we asked the server
@@ -1120,6 +1170,7 @@ impl RawMetadataAndRawRows {
11201170
metadata: metadata_deserialized,
11211171
rows_count,
11221172
raw_rows: frame_slice.to_bytes(),
1173+
raw_metadata_and_rows_bytes_size,
11231174
})
11241175
}
11251176
}
@@ -1450,9 +1501,9 @@ mod test_utils {
14501501
}
14511502

14521503
impl DeserializedMetadataAndRawRows {
1504+
#[doc(hidden)]
14531505
#[inline]
1454-
#[cfg(test)]
1455-
pub(crate) fn new_for_test(
1506+
pub fn new_for_test(
14561507
metadata: ResultMetadata<'static>,
14571508
rows_count: usize,
14581509
raw_rows: Bytes,
@@ -1461,6 +1512,7 @@ mod test_utils {
14611512
metadata: ResultMetadataHolder::SharedCached(Arc::new(metadata)),
14621513
rows_count,
14631514
raw_rows,
1515+
raw_metadata_and_rows_bytes_size: 0,
14641516
}
14651517
}
14661518
}

scylla/src/client/pager.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use scylla_cql::deserialize::row::{ColumnIterator, DeserializeRow};
1616
use scylla_cql::deserialize::{DeserializationError, TypeCheckError};
1717
use scylla_cql::frame::frame_errors::ResultMetadataAndRowsCountParseError;
1818
use scylla_cql::frame::request::query::PagingState;
19-
use scylla_cql::frame::response::NonErrorResponse;
20-
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
19+
use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadata;
20+
use scylla_cql::frame::response::result::DeserializedMetadataAndRawRows;
2121
use scylla_cql::frame::types::SerialConsistency;
2222
use scylla_cql::serialize::row::SerializedValues;
2323
use std::result::Result;
@@ -58,7 +58,7 @@ macro_rules! ready_some_ok {
5858
}
5959

6060
struct ReceivedPage {
61-
rows: RawMetadataAndRawRows,
61+
rows: DeserializedMetadataAndRawRows,
6262
tracing_id: Option<Uuid>,
6363
request_coordinator: Option<Coordinator>,
6464
}
@@ -75,7 +75,7 @@ pub(crate) struct PreparedPagerConfig {
7575
// A separate module is used here so that the parent module cannot construct
7676
// SendAttemptedProof directly.
7777
mod checked_channel_sender {
78-
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
78+
use scylla_cql::frame::response::result::DeserializedMetadataAndRawRows;
7979
use std::marker::PhantomData;
8080
use tokio::sync::mpsc;
8181
use uuid::Uuid;
@@ -119,7 +119,7 @@ mod checked_channel_sender {
119119
Result<(), mpsc::error::SendError<ResultPage>>,
120120
) {
121121
let empty_page = ReceivedPage {
122-
rows: RawMetadataAndRawRows::mock_empty(),
122+
rows: DeserializedMetadataAndRawRows::mock_empty(),
123123
tracing_id,
124124
request_coordinator,
125125
};
@@ -350,7 +350,9 @@ where
350350
match query_response {
351351
Ok(NonErrorQueryResponse {
352352
response:
353-
NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))),
353+
NonErrorResponseWithDeserializedMetadata::Result(
354+
result::ResultWithDeserializedMetadata::Rows((rows, paging_state_response)),
355+
),
354356
tracing_id,
355357
..
356358
}) => {
@@ -404,7 +406,7 @@ where
404406
Err(err)
405407
}
406408
Ok(NonErrorQueryResponse {
407-
response: NonErrorResponse::Result(_),
409+
response: NonErrorResponseWithDeserializedMetadata::Result(_),
408410
tracing_id,
409411
..
410412
}) => {
@@ -549,7 +551,9 @@ where
549551
let result = (self.fetcher)(paging_state).await?;
550552
let response = result.into_non_error_query_response()?;
551553
match response.response {
552-
NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => {
554+
NonErrorResponseWithDeserializedMetadata::Result(
555+
result::ResultWithDeserializedMetadata::Rows((rows, paging_state_response)),
556+
) => {
553557
let (proof, send_result) = self
554558
.sender
555559
.send(Ok(ReceivedPage {
@@ -574,7 +578,7 @@ where
574578
}
575579
}
576580
}
577-
NonErrorResponse::Result(_) => {
581+
NonErrorResponseWithDeserializedMetadata::Result(_) => {
578582
// We have most probably sent a modification statement (e.g. INSERT or UPDATE),
579583
// so let's return an empty iterator as suggested in #631.
580584

@@ -674,11 +678,7 @@ impl QueryPager {
674678

675679
let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
676680

677-
let raw_rows_with_deserialized_metadata =
678-
received_page.rows.deserialize_metadata().map_err(|err| {
679-
NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err))
680-
})?;
681-
s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata);
681+
s.current_page = RawRowLendingIterator::new(received_page.rows);
682682

683683
if let Some(tracing_id) = received_page.tracing_id {
684684
s.tracing_ids.push(tracing_id);
@@ -1044,10 +1044,9 @@ If you are using this API, you are probably doing something wrong."
10441044
}
10451045
};
10461046
let page_received = page_received_res?;
1047-
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
10481047

10491048
Ok(Self {
1050-
current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),
1049+
current_page: RawRowLendingIterator::new(page_received.rows),
10511050
page_receiver: receiver,
10521051
tracing_ids: if let Some(tracing_id) = page_received.tracing_id {
10531052
vec![tracing_id]

0 commit comments

Comments
 (0)