Skip to content

Commit 6f5a603

Browse files
committed
errors: introduce SchemaAgreementError
Few things happen here. 1. Introducing new error type SchemaAgreementTimeout This is rather a rename of `SchemaVersionFetch` error type. We extend the type by additional variants. 2. Removing ProtocolError::SchemaVersionFetch variant 3. Narrowing the error type of schema agreement methods Thanks to that, public API methods (e.g. Session::await_schema_agreement) can return a self-contained error type providing more context than previously returned ExecutionError. 4. Replacing ExecutionError::SchemaAgreementTimeout with SchemaAgreementError variant SchemaAgreementError dependency is still needed, because of auto await schema agreement configuration. By default, the driver awaits schema agreement on schema altering queries. 5. Adjusting the example Previously, the example would (unjustifiably) match against ExecutionError::RequestTimeout error. Even before this PR, it should match against ExecutionError::SchemaAgreementTimeout. Thanks to this commit, we were able to spot and fix this issue.
1 parent ba2d164 commit 6f5a603

File tree

4 files changed

+36
-28
lines changed

4 files changed

+36
-28
lines changed

examples/schema_agreement.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Result};
22
use futures::TryStreamExt as _;
33
use scylla::client::session::Session;
44
use scylla::client::session_builder::SessionBuilder;
5-
use scylla::errors::ExecutionError;
5+
use scylla::errors::SchemaAgreementError;
66
use std::env;
77
use std::time::Duration;
88

@@ -27,7 +27,9 @@ async fn main() -> Result<()> {
2727

2828
match session.await_schema_agreement().await {
2929
Ok(_schema_version) => println!("Schema is in agreement in time"),
30-
Err(ExecutionError::RequestTimeout(_)) => println!("Schema is NOT in agreement in time"),
30+
Err(SchemaAgreementError::Timeout(_)) => {
31+
println!("Schema is NOT in agreement in time")
32+
}
3133
Err(err) => bail!(err),
3234
};
3335
session

scylla/src/client/session.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
1515
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
1616
use crate::errors::{
1717
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
18-
RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
18+
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
1919
};
2020
use crate::frame::response::result;
2121
#[cfg(feature = "ssl")]
@@ -1938,7 +1938,7 @@ impl Session {
19381938
last_error.map(Result::Err)
19391939
}
19401940

1941-
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, ExecutionError> {
1941+
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
19421942
loop {
19431943
tokio::time::sleep(self.schema_agreement_interval).await;
19441944
if let Some(agreed_version) = self.check_schema_agreement().await? {
@@ -1947,18 +1947,18 @@ impl Session {
19471947
}
19481948
}
19491949

1950-
pub async fn await_schema_agreement(&self) -> Result<Uuid, ExecutionError> {
1950+
pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
19511951
timeout(
19521952
self.schema_agreement_timeout,
19531953
self.await_schema_agreement_indefinitely(),
19541954
)
19551955
.await
1956-
.unwrap_or(Err(ExecutionError::SchemaAgreementTimeout(
1956+
.unwrap_or(Err(SchemaAgreementError::Timeout(
19571957
self.schema_agreement_timeout,
19581958
)))
19591959
}
19601960

1961-
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, ExecutionError> {
1961+
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
19621962
let cluster_state = self.get_cluster_state();
19631963
let connections_iter = cluster_state.iter_working_connections()?;
19641964

scylla/src/errors.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ pub enum ExecutionError {
8080
)]
8181
RequestTimeout(std::time::Duration),
8282

83-
/// Schema agreement timed out.
84-
#[error("Schema agreement exceeded {}ms", std::time::Duration::as_millis(.0))]
85-
SchemaAgreementTimeout(std::time::Duration),
86-
8783
/// 'USE KEYSPACE <>' request failed.
8884
#[error("'USE KEYSPACE <>' request failed: {0}")]
8985
UseKeyspaceError(#[from] UseKeyspaceError),
86+
87+
/// Failed to await automatic schema agreement.
88+
#[error("Failed to await schema agreement: {0}")]
89+
SchemaAgreementError(#[from] SchemaAgreementError),
9090
}
9191

9292
impl From<SerializationError> for ExecutionError {
@@ -162,10 +162,6 @@ pub enum NewSessionError {
162162
#[derive(Error, Debug, Clone)]
163163
#[non_exhaustive]
164164
pub enum ProtocolError {
165-
/// A protocol error appeared during schema version fetch.
166-
#[error("Schema version fetch protocol error: {0}")]
167-
SchemaVersionFetch(#[from] SchemaVersionFetchError),
168-
169165
/// Unable extract a partition key based on prepared statement's metadata.
170166
#[error("Unable extract a partition key based on prepared statement's metadata")]
171167
PartitionKeyExtraction,
@@ -198,17 +194,33 @@ pub enum UseKeyspaceError {
198194
RequestTimeout(std::time::Duration),
199195
}
200196

201-
/// A protocol error that occurred during schema version fetch.
197+
/// An error that occurred when awating schema agreement.
202198
#[derive(Error, Debug, Clone)]
203199
#[non_exhaustive]
204-
pub enum SchemaVersionFetchError {
200+
pub enum SchemaAgreementError {
201+
/// Failed to find a node with working connection pool.
202+
#[error("Failed to find a node with working connection pool: {0}")]
203+
ConnectionPoolError(#[from] ConnectionPoolError),
204+
205+
/// Failed to execute schema version query on one of the connections.
206+
///
207+
/// The driver attempts to fetch schema version on all connections in the pool (for all nodes).
208+
/// It expects all of the requests to succeed. If at least one request fails, schema version
209+
/// fetch is considered failed. This variant contains an error from one of the failing request attempts.
210+
#[error("Failed to execute schema version query: {0}")]
211+
RequestError(#[from] RequestAttemptError),
212+
205213
/// Failed to convert schema version query result into rows result.
206214
#[error("Failed to convert schema version query result into rows result: {0}")]
207215
TracesEventsIntoRowsResultError(IntoRowsResultError),
208216

209217
/// Failed to deserialize a single row from schema version query response.
210218
#[error(transparent)]
211219
SingleRowError(SingleRowError),
220+
221+
/// Schema agreement timed out.
222+
#[error("Schema agreement exceeded {}ms", std::time::Duration::as_millis(.0))]
223+
Timeout(std::time::Duration),
212224
}
213225

214226
/// An error that occurred during tracing info fetch.

scylla/src/network/connection.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use crate::cluster::NodeAddr;
1010
use crate::errors::{
1111
BadKeyspaceName, BrokenConnectionError, BrokenConnectionErrorKind, ConnectionError,
1212
ConnectionSetupRequestError, ConnectionSetupRequestErrorKind, CqlEventHandlingError, DbError,
13-
ExecutionError, InternalRequestError, ProtocolError, RequestAttemptError, ResponseParseError,
14-
SchemaVersionFetchError, TranslationError, UseKeyspaceError,
13+
InternalRequestError, RequestAttemptError, ResponseParseError, SchemaAgreementError,
14+
TranslationError, UseKeyspaceError,
1515
};
1616
use crate::frame::protocol_features::ProtocolFeatures;
1717
use crate::frame::{
@@ -1261,20 +1261,14 @@ impl Connection {
12611261
}
12621262
}
12631263

1264-
pub(crate) async fn fetch_schema_version(&self) -> Result<Uuid, ExecutionError> {
1264+
pub(crate) async fn fetch_schema_version(&self) -> Result<Uuid, SchemaAgreementError> {
12651265
let (version_id,) = self
12661266
.query_unpaged(LOCAL_VERSION)
12671267
.await?
12681268
.into_rows_result()
1269-
.map_err(|err| {
1270-
ExecutionError::ProtocolError(ProtocolError::SchemaVersionFetch(
1271-
SchemaVersionFetchError::TracesEventsIntoRowsResultError(err),
1272-
))
1273-
})?
1269+
.map_err(SchemaAgreementError::TracesEventsIntoRowsResultError)?
12741270
.single_row::<(Uuid,)>()
1275-
.map_err(|err| {
1276-
ProtocolError::SchemaVersionFetch(SchemaVersionFetchError::SingleRowError(err))
1277-
})?;
1271+
.map_err(SchemaAgreementError::SingleRowError)?;
12781272

12791273
Ok(version_id)
12801274
}

0 commit comments

Comments
 (0)