Skip to content

Commit ccabf0c

Browse files
prnvbnslvrtrnabonander
authored
feat(inserter): add callback function support (#307)
Co-authored-by: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Co-authored-by: Austin Bonander <austin.bonander@gmail.com>
1 parent d2751d3 commit ccabf0c

File tree

1 file changed

+31
-9
lines changed

1 file changed

+31
-9
lines changed

src/inserter.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ pub struct Inserter<T> {
3232
ticks: Ticks,
3333
pending: Quantities,
3434
in_transaction: bool,
35+
36+
#[allow(clippy::type_complexity)]
37+
on_commit: Option<Box<dyn FnMut(&Quantities) + Send + 'static>>,
3538
}
3639

3740
/// Statistics about pending or inserted data.
@@ -70,6 +73,7 @@ where
7073
ticks: Ticks::default(),
7174
pending: Quantities::ZERO,
7275
in_transaction: false,
76+
on_commit: None,
7377
}
7478
}
7579

@@ -199,6 +203,19 @@ where
199203
self.ticks.reschedule();
200204
}
201205

206+
/// Registers a callback that will be invoked after each successful batch commit.
207+
///
208+
/// The callback receives the committed [`Quantities`]. It is invoked only
209+
/// when a batch actually commits (i.e., non-zero transactions), and only
210+
/// after the commit completes successfully.
211+
pub fn with_commit_callback(
212+
mut self,
213+
callback: impl FnMut(&Quantities) + Send + 'static,
214+
) -> Self {
215+
self.on_commit = Some(Box::new(callback));
216+
self
217+
}
218+
202219
/// How much time we have until the next tick.
203220
///
204221
/// `None` if the period isn't configured.
@@ -258,21 +275,16 @@ where
258275

259276
/// Ends the current `INSERT` unconditionally.
260277
pub async fn force_commit(&mut self) -> Result<Quantities> {
261-
self.in_transaction = false;
262-
263-
let quantities = mem::replace(&mut self.pending, Quantities::ZERO);
264-
let result = self.insert().await;
278+
let quantities = self.insert().await?;
265279
self.ticks.reschedule();
266-
result?;
267280
Ok(quantities)
268281
}
269282

270283
/// Ends the current `INSERT` and whole `Inserter` unconditionally.
271284
///
272285
/// If it isn't called, the current `INSERT` is aborted.
273286
pub async fn end(mut self) -> Result<Quantities> {
274-
self.insert().await?;
275-
Ok(self.pending)
287+
self.insert().await
276288
}
277289

278290
fn limits_reached(&self) -> bool {
@@ -281,11 +293,21 @@ where
281293
|| self.ticks.reached()
282294
}
283295

284-
async fn insert(&mut self) -> Result<()> {
296+
async fn insert(&mut self) -> Result<Quantities> {
297+
self.in_transaction = false;
298+
let quantities = mem::replace(&mut self.pending, Quantities::ZERO);
299+
285300
if let Some(insert) = self.insert.take() {
286301
insert.end().await?;
287302
}
288-
Ok(())
303+
304+
if let Some(cb) = &mut self.on_commit
305+
&& quantities.transactions > 0
306+
{
307+
(cb)(&quantities);
308+
}
309+
310+
Ok(quantities)
289311
}
290312

291313
#[cold]

0 commit comments

Comments
 (0)