Skip to content

Commit 3715f7a

Browse files
goffrieConvex, Inc.
authored andcommitted
Do not hold connections across pages in MySqlReader::_load_documents (#41682)
GitOrigin-RevId: 7029e5bc5db89a36ed367201fec3772f5adf2908
1 parent e293196 commit 3715f7a

File tree

1 file changed

+13
-11
lines changed

1 file changed

+13
-11
lines changed

crates/mysql/src/lib.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -669,10 +669,6 @@ impl<RT: Runtime> MySqlReader<RT> {
669669
) {
670670
anyhow::ensure!(page_size > 0); // 0 size pages loop forever.
671671
let timer = metrics::load_documents_timer(self.read_pool.cluster_name());
672-
let mut client = self
673-
.read_pool
674-
.acquire("load_documents", &self.db_name)
675-
.await?;
676672
let mut num_returned = 0;
677673
let mut last_ts = match order {
678674
Order::Asc => Timestamp::MIN,
@@ -681,7 +677,12 @@ impl<RT: Runtime> MySqlReader<RT> {
681677
let mut last_tablet_id_param = Self::initial_id_param(order);
682678
let mut last_id_param = Self::initial_id_param(order);
683679
loop {
684-
let mut rows_loaded = 0;
680+
// Avoid holding connections across yield points, to limit lifetime
681+
// and improve fairness.
682+
let mut client = self
683+
.read_pool
684+
.acquire("load_documents", &self.db_name)
685+
.await?;
685686

686687
let query = match order {
687688
Order::Asc => sql::load_docs_by_ts_page_asc(
@@ -711,17 +712,19 @@ impl<RT: Runtime> MySqlReader<RT> {
711712
params.push(self.instance_name.to_string().into());
712713
}
713714
params.push((page_size as i64).into());
714-
let row_stream = client
715+
let rows: Vec<_> = client
715716
.query_stream(query, params, page_size as usize)
717+
.await?
718+
.try_collect()
716719
.await?;
720+
drop(client);
717721

718722
retention_validator
719723
.validate_document_snapshot(range.min_timestamp_inclusive())
720724
.await?;
721725

722-
futures::pin_mut!(row_stream);
723-
724-
while let Some(row) = row_stream.try_next().await? {
726+
let rows_loaded = rows.len();
727+
for row in rows {
725728
let prev_rev_value: Option<Vec<u8>> = if include_prev_rev {
726729
row.get_opt(6).context("missing column")??
727730
} else {
@@ -737,7 +740,6 @@ impl<RT: Runtime> MySqlReader<RT> {
737740
ResolvedDocument::from_database(document_id.table(), value)
738741
})
739742
.transpose()?;
740-
rows_loaded += 1;
741743
last_ts = ts;
742744
last_tablet_id_param = internal_id_param(document_id.table().0);
743745
last_id_param = internal_doc_id_param(document_id);
@@ -751,7 +753,7 @@ impl<RT: Runtime> MySqlReader<RT> {
751753
}),
752754
}
753755
}
754-
if rows_loaded < page_size {
756+
if rows_loaded < page_size as usize {
755757
break;
756758
}
757759
}

0 commit comments

Comments
 (0)