Skip to content

Commit ce1d215

Browse files
committed
Refactor FeatureNotificationService
1 parent 2bec33d commit ce1d215

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,22 @@ import kotlinx.coroutines.launch
2626
import kotlin.time.Clock
2727
import kotlin.time.ExperimentalTime
2828

29-
/** Represents an event for notification service. */
30-
private sealed class Event
29+
/**
30+
* Represents an event for the notification service.
31+
*
32+
* @property timestamp A timestamp for the event.
33+
*/
34+
private sealed class NotificationEvent(open val timestamp: Long)
3135

3236
/**
3337
* Represents an event for a notification.
3438
*
35-
* @property timestamp A timestamp for the event.
3639
* @property notification The notification associated with the event.
3740
*/
38-
private class NotificationEvent(val timestamp: Long, val notification: Notification) : Event()
41+
private class SendEvent(override val timestamp: Long, val notification: Notification) : NotificationEvent(timestamp)
3942

4043
/** Represents an event marking the end of notification processing. */
41-
private class EndEvent : Event()
44+
private class EndEvent(override val timestamp: Long) : NotificationEvent(timestamp)
4245

4346
/**
4447
* Represents a job that handles session-specific notifications, processing events
@@ -48,7 +51,7 @@ private class EndEvent : Event()
4851
* based on the event type and the resource subscriptions associated with the session.
4952
* It allows subscribing to or unsubscribing from specific resource keys for granular
5053
* notification handling. The job can also be canceled to stop processing further events.
51-
* IDs less than or equal to this value will be skipped.
54+
* Notification with timestamps older than the starting timestamp are skipped.
5255
*/
5356
private class SessionNotificationJob {
5457
private val job: Job
@@ -58,14 +61,14 @@ private class SessionNotificationJob {
5861
constructor(
5962
session: ServerSession,
6063
scope: CoroutineScope,
61-
events: SharedFlow<Event>,
64+
events: SharedFlow<NotificationEvent>,
6265
fromTimestamp: Long,
6366
) {
6467
logger.info { "Starting notification job from timestamp $fromTimestamp for sessionId: ${session.sessionId} " }
6568
job = scope.launch {
6669
events.takeWhile { it !is EndEvent }.collect { event ->
6770
when (event) {
68-
is NotificationEvent -> {
71+
is SendEvent -> {
6972
if (event.timestamp >= fromTimestamp) {
7073
when (val notification = event.notification) {
7174
is PromptListChangedNotification -> {
@@ -186,18 +189,19 @@ internal class FeatureNotificationService(
186189
@OptIn(ExperimentalTime::class)
187190
private val clock: Clock = Clock.System,
188191
) {
192+
/** Coroutine scope used to handle asynchronous notifications. */
193+
private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
194+
195+
/** Flag indicating whether the service is closing. */
189196
private val closingService = atomic(false)
190197

191198
/** Shared flow used to emit events within the feature notification service. */
192-
private val notificationEvents = MutableSharedFlow<Event>(
199+
private val notificationEvents = MutableSharedFlow<NotificationEvent>(
193200
extraBufferCapacity = 100,
194201
replay = 0,
195202
onBufferOverflow = BufferOverflow.SUSPEND,
196203
)
197204

198-
/** Coroutine scope used to handle asynchronous notifications. */
199-
private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
200-
201205
/** Active emit jobs. */
202206
private val activeEmitJobs = atomic(persistentSetOf<Job>())
203207

@@ -270,6 +274,10 @@ internal class FeatureNotificationService(
270274
logger.info { "Subscribing session for notifications sessionId: ${session.sessionId}" }
271275

272276
val timestamp = getCurrentTimestamp()
277+
if (closingService.value) {
278+
logger.warn { "Skipping subscription notification as service is closing: ${session.sessionId}" }
279+
return
280+
}
273281

274282
sessionNotificationJobs.getAndUpdate {
275283
if (it.containsKey(session.sessionId)) {
@@ -350,7 +358,7 @@ internal class FeatureNotificationService(
350358
// Launching emit lazily to put it to the jobs queue before the completion
351359
val job = notificationScope.launch(start = CoroutineStart.LAZY) {
352360
logger.info { "Actually emitting notification $timestamp: $notification" }
353-
notificationEvents.emit(NotificationEvent(timestamp, notification))
361+
notificationEvents.emit(SendEvent(timestamp, notification))
354362
logger.info { "Notification emitted $timestamp: $notification" }
355363
}
356364

@@ -380,7 +388,7 @@ internal class FeatureNotificationService(
380388
// Emitting end event to complete all session notification jobs
381389
notificationScope.launch {
382390
logger.info { "Emitting end event" }
383-
notificationEvents.emit(EndEvent())
391+
notificationEvents.emit(EndEvent(getCurrentTimestamp()))
384392
logger.info { "End event emitted" }
385393
}.join()
386394

0 commit comments

Comments
 (0)