Skip to content

Commit c67e9fd

Browse files
committed
Support notifications
1 parent 29d89ab commit c67e9fd

File tree

5 files changed

+267
-15
lines changed

5 files changed

+267
-15
lines changed

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
208208
* A handler to invoke for any request types that do not have their own handler installed.
209209
*/
210210
public var fallbackRequestHandler: (
211-
suspend (request: JSONRPCRequest, extra: RequestHandlerExtra) -> RequestResult?
211+
suspend (request: JSONRPCRequest, extra: RequestHandlerExtra) -> RequestResult?
212212
)? =
213213
null
214214

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Features.kt renamed to kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Feature.kt

File renamed without changes.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package io.modelcontextprotocol.kotlin.sdk.server
2+
3+
import io.github.oshai.kotlinlogging.KotlinLogging
4+
import io.modelcontextprotocol.kotlin.sdk.types.Notification
5+
import io.modelcontextprotocol.kotlin.sdk.types.PromptListChangedNotification
6+
import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification
7+
import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification
8+
import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams
9+
import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification
10+
import kotlinx.coroutines.CoroutineScope
11+
import kotlinx.coroutines.Dispatchers
12+
import kotlinx.coroutines.Job
13+
import kotlinx.coroutines.SupervisorJob
14+
import kotlinx.coroutines.flow.MutableSharedFlow
15+
import kotlinx.coroutines.launch
16+
17+
internal class FeatureNotificationService {
18+
private val notifications = MutableSharedFlow<Notification>()
19+
private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
20+
21+
private val notificationSessionFeatureJobs: MutableMap<ServerSessionKey, Job> = mutableMapOf()
22+
private val notificationSessionResourceJobs: MutableMap<Pair<ServerSessionKey, FeatureKey>, Job> = mutableMapOf()
23+
24+
private val logger = KotlinLogging.logger {}
25+
26+
private val toolFeatureListener: FeatureListener by lazy {
27+
object : FeatureListener {
28+
override fun onListChanged() {
29+
logger.debug { "Emitting tool list changed notification" }
30+
emit(ToolListChangedNotification())
31+
}
32+
33+
override fun onFeatureUpdated(featureKey: FeatureKey) {
34+
logger.debug { "Skipping update for tool feature key: $featureKey" }
35+
}
36+
}
37+
}
38+
39+
private val promptFeatureListener: FeatureListener by lazy {
40+
object : FeatureListener {
41+
override fun onListChanged() {
42+
logger.debug { "Emitting prompt list changed notification" }
43+
emit(PromptListChangedNotification())
44+
}
45+
46+
override fun onFeatureUpdated(featureKey: FeatureKey) {
47+
logger.debug { "Skipping update for prompt feature key: $featureKey" }
48+
}
49+
}
50+
}
51+
52+
private val resourceFeatureListener: FeatureListener by lazy {
53+
object : FeatureListener {
54+
override fun onListChanged() {
55+
logger.debug { "Emitting resource list changed notification" }
56+
emit(ResourceListChangedNotification())
57+
}
58+
59+
override fun onFeatureUpdated(featureKey: FeatureKey) {
60+
logger.debug { "Emitting resource updated notification for feature key: $featureKey" }
61+
emit(ResourceUpdatedNotification(ResourceUpdatedNotificationParams(uri = featureKey)))
62+
}
63+
}
64+
}
65+
66+
internal fun getToolFeatureListener(): FeatureListener = toolFeatureListener
67+
internal fun getPromptFeatureListener(): FeatureListener = promptFeatureListener
68+
internal fun geResourceFeatureListener(): FeatureListener = resourceFeatureListener
69+
70+
internal fun subscribeToListChangedNotification(session: ServerSession) {
71+
logger.debug { "Subscribing to list changed notifications for sessionId: ${session.sessionId}" }
72+
notificationSessionFeatureJobs[session.sessionId] = notificationScope.launch {
73+
notifications.collect { notification ->
74+
when (notification) {
75+
is PromptListChangedNotification -> session.notification(notification)
76+
is ResourceListChangedNotification -> session.notification(notification)
77+
is ToolListChangedNotification -> session.notification(notification)
78+
else -> logger.debug {
79+
"Notification not handled for sessionId ${session.sessionId}: $notification"
80+
}
81+
}
82+
}
83+
}
84+
logger.debug { "Subscribed to list changed notifications for sessionId: ${session.sessionId}" }
85+
}
86+
87+
internal fun unsubscribeFromListChangedNotification(session: ServerSession) {
88+
logger.debug { "Unsubscribing from list changed notifications for sessionId: ${session.sessionId}" }
89+
notificationSessionFeatureJobs[session.sessionId]?.cancel()
90+
notificationSessionFeatureJobs.remove(session.sessionId)
91+
logger.debug { "Unsubscribed from list changed notifications for sessionId: ${session.sessionId}" }
92+
}
93+
94+
internal fun subscribeToResourceUpdateNotifications(session: ServerSession, resourceKey: FeatureKey) {
95+
logger.debug { "Subscribing to resource update notifications for sessionId: ${session.sessionId}" }
96+
notificationSessionResourceJobs[session.sessionId to resourceKey] = notificationScope.launch {
97+
notifications.collect { notification ->
98+
when (notification) {
99+
is ResourceUpdatedNotification -> {
100+
if (notification.params.uri == resourceKey) {
101+
session.notification(notification)
102+
}
103+
}
104+
105+
else -> logger.debug {
106+
"Notification not handled for session for sessionId ${session.sessionId}: $notification"
107+
}
108+
}
109+
}
110+
}
111+
logger.debug { "Subscribed to resource update notifications for sessionId: ${session.sessionId}" }
112+
}
113+
114+
internal fun unsubscribeFromResourceUpdateNotifications(session: ServerSession, resourceKey: FeatureKey) {
115+
logger.debug { "Unsubscribing from resourcec update notifications for sessionId: ${session.sessionId}" }
116+
notificationSessionResourceJobs[session.sessionId to resourceKey]?.cancel()
117+
notificationSessionResourceJobs.remove(session.sessionId to resourceKey)
118+
logger.debug { "Unsubscribed from resourcec update notifications for sessionId: ${session.sessionId}" }
119+
}
120+
121+
122+
private fun emit(notification: Notification) {
123+
notificationScope.launch {
124+
notifications.emit(notification)
125+
}
126+
}
127+
}

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,19 @@ import kotlinx.atomicfu.atomic
55
import kotlinx.atomicfu.getAndUpdate
66
import kotlinx.atomicfu.update
77
import kotlinx.collections.immutable.minus
8+
import kotlinx.collections.immutable.persistentListOf
89
import kotlinx.collections.immutable.persistentMapOf
910
import kotlinx.collections.immutable.toPersistentSet
1011

12+
13+
/**
14+
* A listener interface for receiving notifications about feature changes in registry.
15+
*/
16+
internal interface FeatureListener {
17+
fun onListChanged()
18+
fun onFeatureUpdated(featureKey: String)
19+
}
20+
1121
/**
1222
* A generic registry for managing features of a specified type. This class provides thread-safe
1323
* operations for adding, removing, and retrieving features from the registry.
@@ -33,15 +43,29 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
3343
internal val values: Map<FeatureKey, T>
3444
get() = registry.value
3545

46+
private val listeners = atomic(persistentListOf<FeatureListener>())
47+
48+
internal fun addListener(listener: FeatureListener) {
49+
listeners.update { it.add(listener) }
50+
}
51+
52+
internal fun removeListener(listener: FeatureListener) {
53+
listeners.update { it.remove(listener) }
54+
}
55+
3656
/**
3757
* Adds the specified feature to the registry.
3858
*
3959
* @param feature The feature to be added to the registry.
4060
*/
4161
internal fun add(feature: T) {
4262
logger.info { "Adding $featureType: \"${feature.key}\"" }
43-
registry.update { current -> current.put(feature.key, feature) }
63+
val oldMap = registry.getAndUpdate { current -> current.put(feature.key, feature) }
64+
val oldFeature = oldMap[feature.key]
4465
logger.info { "Added $featureType: \"${feature.key}\"" }
66+
67+
notifyFeatureUpdated(oldFeature, feature)
68+
notifyListChanged()
4569
}
4670

4771
/**
@@ -52,8 +76,14 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
5276
*/
5377
internal fun addAll(features: List<T>) {
5478
logger.info { "Adding ${featureType}s: ${features.size}" }
55-
registry.update { current -> current.putAll(features.associateBy { it.key }) }
79+
val oldMap = registry.getAndUpdate { current -> current.putAll(features.associateBy { it.key }) }
80+
for (feature in features) {
81+
val oldFeature = oldMap[feature.key]
82+
notifyFeatureUpdated(oldFeature, feature)
83+
}
5684
logger.info { "Added ${featureType}s: ${features.size}" }
85+
86+
notifyListChanged()
5787
}
5888

5989
/**
@@ -66,15 +96,20 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
6696
logger.info { "Removing $featureType: \"$key\"" }
6797
val oldMap = registry.getAndUpdate { current -> current.remove(key) }
6898

69-
val removed = key in oldMap
99+
val removedFeature = oldMap[key]
100+
val removed = removedFeature != null
70101
logger.info {
71102
if (removed) {
72103
"Removed $featureType: \"$key\""
73104
} else {
74105
"$featureType not found: \"$key\""
75106
}
76107
}
77-
return key in oldMap
108+
109+
notifyFeatureUpdated(removedFeature, null)
110+
notifyListChanged()
111+
112+
return removed
78113
}
79114

80115
/**
@@ -87,7 +122,12 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
87122
logger.info { "Removing ${featureType}s: ${keys.size}" }
88123
val oldMap = registry.getAndUpdate { current -> current - keys.toPersistentSet() }
89124

90-
val removedCount = keys.count { it in oldMap }
125+
val removedFeatures = keys.mapNotNull { oldMap[it] }
126+
val removedCount = removedFeatures.size
127+
removedFeatures.forEach {
128+
notifyFeatureUpdated(it, null)
129+
}
130+
91131
logger.info {
92132
if (removedCount > 0) {
93133
"Removed ${featureType}s: $removedCount"
@@ -108,13 +148,24 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
108148
internal fun get(key: FeatureKey): T? {
109149
logger.info { "Getting $featureType: \"$key\"" }
110150
val feature = registry.value[key]
111-
logger.info {
112-
if (feature != null) {
113-
"Got $featureType: \"$key\""
114-
} else {
115-
"$featureType not found: \"$key\""
116-
}
151+
if (feature != null) {
152+
logger.info { "Got $featureType: \"$key\"" }
153+
} else {
154+
logger.info { "$featureType not found: \"$key\"" }
117155
}
156+
118157
return feature
119158
}
159+
160+
private fun notifyListChanged() {
161+
logger.info { "Notifying listeners of list change" }
162+
listeners.value.forEach { it.onListChanged() }
163+
}
164+
165+
private fun notifyFeatureUpdated(oldFeature: T?, newFeature: T?) {
166+
logger.info { "Notifying listeners of feature update" }
167+
val featureKey = (oldFeature?.key ?: newFeature?.key) ?: return
168+
169+
listeners.value.forEach { it.onFeatureUpdated(featureKey) }
170+
}
120171
}

0 commit comments

Comments
 (0)