@@ -6,6 +6,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.PromptListChangedNotification
66import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification
77import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification
88import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams
9+ import io.modelcontextprotocol.kotlin.sdk.types.ServerNotification
910import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification
1011import kotlinx.atomicfu.atomic
1112import kotlinx.atomicfu.getAndUpdate
@@ -18,6 +19,7 @@ import kotlinx.coroutines.Job
1819import kotlinx.coroutines.SupervisorJob
1920import kotlinx.coroutines.cancel
2021import kotlinx.coroutines.channels.BufferOverflow
22+ import kotlinx.coroutines.channels.Channel
2123import kotlinx.coroutines.flow.MutableSharedFlow
2224import kotlinx.coroutines.flow.SharedFlow
2325import kotlinx.coroutines.flow.takeWhile
@@ -58,6 +60,17 @@ private class SessionNotificationJob {
5860 private val resourceSubscriptions = atomic(persistentMapOf<FeatureKey , Long >())
5961 private val logger = KotlinLogging .logger {}
6062
63+ /* *
64+ * Constructor for the SessionNotificationJob, responsible for processing notification events
65+ * and dispatching appropriate notifications to the provided server session. The job operates
66+ * within the given coroutine scope and begins handling events starting from the specified
67+ * timestamp.
68+ *
69+ * @param session The server session where notifications will be dispatched.
70+ * @param scope The coroutine scope in which this job operates.
71+ * @param events A shared flow of notification events that the job listens to.
72+ * @param fromTimestamp The timestamp from which the job starts processing events.
73+ */
6174 constructor (
6275 session: ServerSession ,
6376 scope: CoroutineScope ,
@@ -71,23 +84,12 @@ private class SessionNotificationJob {
7184 is SendEvent -> {
7285 if (event.timestamp >= fromTimestamp) {
7386 when (val notification = event.notification) {
74- is PromptListChangedNotification -> {
75- logger.info {
76- " Sending prompt list changed notification for sessionId: ${session.sessionId} "
77- }
78- session.notification(notification)
79- }
80-
81- is ResourceListChangedNotification -> {
87+ is PromptListChangedNotification ,
88+ is ResourceListChangedNotification ,
89+ is ToolListChangedNotification ,
90+ -> {
8291 logger.info {
83- " Sending resourse list changed notification for sessionId: ${session.sessionId} "
84- }
85- session.notification(notification)
86- }
87-
88- is ToolListChangedNotification -> {
89- logger.info {
90- " Sending tool list changed notification for sessionId: ${session.sessionId} "
92+ " Sending list changed notification for sessionId: ${session.sessionId} "
9193 }
9294 session.notification(notification)
9395 }
@@ -186,6 +188,7 @@ private class SessionNotificationJob {
186188 * - Resource updates pertaining to specific resources.
187189 */
188190internal class FeatureNotificationService (
191+ notificationBufferCapacity : Int = Channel .UNLIMITED ,
189192 @OptIn(ExperimentalTime ::class )
190193 private val clock : Clock = Clock .System ,
191194) {
@@ -197,8 +200,7 @@ internal class FeatureNotificationService(
197200
198201 /* * Shared flow used to emit events within the feature notification service. */
199202 private val notificationEvents = MutableSharedFlow <NotificationEvent >(
200- extraBufferCapacity = 100 ,
201- replay = 0 ,
203+ extraBufferCapacity = notificationBufferCapacity,
202204 onBufferOverflow = BufferOverflow .SUSPEND ,
203205 )
204206
@@ -210,57 +212,27 @@ internal class FeatureNotificationService(
210212
211213 private val logger = KotlinLogging .logger {}
212214
213- /* * Listener for tool feature events. */
214- private val toolListChangedListener: FeatureListener by lazy {
215+ private fun featureListener (notificationProvider : (FeatureKey ) -> ServerNotification ): FeatureListener =
215216 object : FeatureListener {
216217 override fun onFeatureUpdated (featureKey : FeatureKey ) {
217- logger.info { " Emitting tool list changed notification" }
218- emit(ToolListChangedNotification ())
218+ val notification = notificationProvider(featureKey)
219+ logger.info { " Emitting notification: ${notification.method.value} " }
220+ emit(notification)
219221 }
220222 }
221- }
223+
224+ /* * Listener for tool feature events. */
225+ internal val toolListChangedListener: FeatureListener = featureListener { ToolListChangedNotification () }
222226
223227 /* * Listener for prompt feature events. */
224- private val promptListChangeListener: FeatureListener by lazy {
225- object : FeatureListener {
226- override fun onFeatureUpdated (featureKey : FeatureKey ) {
227- logger.info { " Emitting prompt list changed notification" }
228- emit(PromptListChangedNotification ())
229- }
230- }
231- }
228+ internal val promptListChangedListener: FeatureListener = featureListener { PromptListChangedNotification () }
232229
233230 /* * Listener for resource feature events. */
234- private val resourceListChangedListener: FeatureListener by lazy {
235- object : FeatureListener {
236- override fun onFeatureUpdated (featureKey : FeatureKey ) {
237- logger.info { " Emitting resource list changed notification" }
238- emit(ResourceListChangedNotification ())
239- }
240- }
241- }
242-
243- /* * Listener for resource update events. */
244- private val resourceUpdatedListener: FeatureListener by lazy {
245- object : FeatureListener {
246- override fun onFeatureUpdated (featureKey : FeatureKey ) {
247- logger.info { " Emitting resource updated notification for feature key: $featureKey " }
248- emit(ResourceUpdatedNotification (ResourceUpdatedNotificationParams (uri = featureKey)))
249- }
250- }
251- }
252-
253- /* * Listener for the tool list changed events. */
254- internal fun getToolListChangedListener (): FeatureListener = toolListChangedListener
255-
256- /* * Listener for the prompt list changed events. */
257- internal fun getPromptListChangedListener (): FeatureListener = promptListChangeListener
258-
259- /* * Listener for the resource list changed events. */
260- internal fun getResourceListChangedListener (): FeatureListener = resourceListChangedListener
231+ internal val resourceListChangedListener: FeatureListener = featureListener { ResourceListChangedNotification () }
261232
262233 /* * Listener for resource update events. */
263- internal fun getResourceUpdateListener (): FeatureListener = resourceUpdatedListener
234+ internal val resourceUpdatedListener: FeatureListener =
235+ featureListener { ResourceUpdatedNotification (ResourceUpdatedNotificationParams (uri = it)) }
264236
265237 /* *
266238 * Subscribes session to list changed notifications for all features and resource update notifications.
0 commit comments