Skip to content

Commit 893e426

Browse files
committed
Implement activity tracking for personal sessions
1 parent 11a852e commit 893e426

File tree

5 files changed

+95
-3
lines changed

5 files changed

+95
-3
lines changed

crates/handlers/src/activity_tracker/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ mod worker;
1010
use std::net::IpAddr;
1111

1212
use chrono::{DateTime, Utc};
13-
use mas_data_model::{BrowserSession, Clock, CompatSession, Session};
13+
use mas_data_model::{
14+
BrowserSession, Clock, CompatSession, Session, personal::session::PersonalSession,
15+
};
1416
use mas_storage::BoxRepositoryFactory;
1517
use tokio_util::{sync::CancellationToken, task::TaskTracker};
1618
use ulid::Ulid;
@@ -115,7 +117,7 @@ impl ActivityTracker {
115117
pub async fn record_personal_access_token_session(
116118
&self,
117119
clock: &dyn Clock,
118-
session: &Session,
120+
session: &PersonalSession,
119121
ip: Option<IpAddr>,
120122
) {
121123
let res = self

crates/handlers/src/activity_tracker/worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,9 @@ impl Worker {
257257
repo.compat_session()
258258
.record_batch_activity(compat_sessions)
259259
.await?;
260-
// TODO: personal sessions: record
260+
repo.personal_session()
261+
.record_batch_activity(personal_sessions)
262+
.await?;
261263

262264
repo.save().await?;
263265
self.pending_records.clear();

crates/storage-pg/.sqlx/query-64b6e274e2bed6814f5ae41ddf57093589f7d1b2b8458521b635546b8012041e.json

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/src/personal/session.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,56 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
361361
.try_into()
362362
.map_err(DatabaseError::to_invalid_operation)
363363
}
364+
365+
#[tracing::instrument(
366+
name = "db.personal_session.record_batch_activity",
367+
skip_all,
368+
fields(
369+
db.query.text,
370+
),
371+
err,
372+
)]
373+
async fn record_batch_activity(
374+
&mut self,
375+
mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
376+
) -> Result<(), Self::Error> {
377+
// Sort the activity by ID, so that when batching the updates, Postgres
378+
// locks the rows in a stable order, preventing deadlocks
379+
activities.sort_unstable();
380+
let mut ids = Vec::with_capacity(activities.len());
381+
let mut last_activities = Vec::with_capacity(activities.len());
382+
let mut ips = Vec::with_capacity(activities.len());
383+
384+
for (id, last_activity, ip) in activities {
385+
ids.push(Uuid::from(id));
386+
last_activities.push(last_activity);
387+
ips.push(ip);
388+
}
389+
390+
let res = sqlx::query!(
391+
r#"
392+
UPDATE personal_sessions
393+
SET last_active_at = GREATEST(t.last_active_at, personal_sessions.last_active_at)
394+
, last_active_ip = COALESCE(t.last_active_ip, personal_sessions.last_active_ip)
395+
FROM (
396+
SELECT *
397+
FROM UNNEST($1::uuid[], $2::timestamptz[], $3::inet[])
398+
AS t(personal_session_id, last_active_at, last_active_ip)
399+
) AS t
400+
WHERE personal_sessions.personal_session_id = t.personal_session_id
401+
"#,
402+
&ids,
403+
&last_activities,
404+
&ips as &[Option<IpAddr>],
405+
)
406+
.traced()
407+
.execute(&mut *self.conn)
408+
.await?;
409+
410+
DatabaseError::ensure_affected_rows(&res, ids.len().try_into().unwrap_or(u64::MAX))?;
411+
412+
Ok(())
413+
}
364414
}
365415

366416
impl Filter for PersonalSessionFilter<'_> {

crates/storage/src/personal/session.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
44
// Please see LICENSE files in the repository root for full details.
55

6+
use std::net::IpAddr;
7+
68
use async_trait::async_trait;
79
use chrono::{DateTime, Utc};
810
use mas_data_model::{
@@ -109,6 +111,21 @@ pub trait PersonalSessionRepository: Send + Sync {
109111
///
110112
/// Returns [`Self::Error`] if the underlying repository fails
111113
async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result<usize, Self::Error>;
114+
115+
/// Record a batch of [`PersonalSession`] activity
116+
///
117+
/// # Parameters
118+
///
119+
/// * `activity`: A list of tuples containing the session ID, the last
120+
/// activity timestamp and the IP address of the client
121+
///
122+
/// # Errors
123+
///
124+
/// Returns [`Self::Error`] if the underlying repository fails
125+
async fn record_batch_activity(
126+
&mut self,
127+
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
128+
) -> Result<(), Self::Error>;
112129
}
113130

114131
repository_impl!(PersonalSessionRepository:
@@ -137,6 +154,11 @@ repository_impl!(PersonalSessionRepository:
137154
) -> Result<Page<PersonalSession>, Self::Error>;
138155

139156
async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result<usize, Self::Error>;
157+
158+
async fn record_batch_activity(
159+
&mut self,
160+
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
161+
) -> Result<(), Self::Error>;
140162
);
141163

142164
/// Filter parameters for listing personal sessions

0 commit comments

Comments
 (0)